RabbitMQ Nedir? Queues ve Exhange Types

rabbitmq

RabbitMQ, en basit tabiriyle iletişim halindeki sistemler arasındaki asenkron mesaj kuyruk sistemidir, bir mesaj oluşturulur ve oluşturulan mesaj kabul edilerek, ilgili yere (kuyruğa) tüketilmek üzere iletilir. Çoğu yerde postane örneği verilmektedir; gönderilmek istenen mektup posta kutusuna bırakıldıktan sonra mektup, postacı tarafından alıcısına ulaştırılır. Bu bağlamda RabbitMQ hem posta kutusu hem postane hem de postacıdır. Tek farkı kağıt yerine, FIFO (First In First Out) mantığında binary large object (BLOB) taşımasıdır. Yine bu örnekte terimlere geçmeden önce biz Producer, mektubu alan kişi Consumer rolündedir.

Erlang dilinde yazılmıştır. Açık kaynaklıdır ve AMQP 0-9-1, STOMP, MQTT, AMQP 1.0 protokollerini destekler. (Advanced Message Queue Protocol) Piyasadaki alternatifleri MSMQ, Apache Kafka, MS Azure Bus’tur ve multi-language ve cross-platform özellikleri de vardır.

Ön bilgileri verdikten sonra ilgili terimlere göz atarak ısınma turuna başlamış olalım.

  • Producer: Kuyruğa mesaj gönderen, üreten uygulamadır.
  • Consumer: Dinlediği kuyruktaki mesajları tüketerek işleyen uygulamadır.
  • Queue: Mesajların eklendiği, tüketildiği ve tutulduğu listedir.
  • Exchange: Producer ürettiği mesajı doğrudan alıcıya veya kuyruğa göndermez, arada bir mesaj yönlendiricisi vardır, bu yönlendirme işlemini gerçekleştiren yapıdır. Producer ürettiği mesajı exchange’e iletir, exchange gelen mesajı ilgili bilgilerle ilgili kuyruğa ekler ve dinleyen bir consumer varsa kuyruktan sıradaki mesajı işlemek üzere alır. Kısaca yaptığı görev belirtilen Routing Key‘e göre aldığı mesajı ilgili Queue‘ye iletmektir.
  • Binding: Exchange ile Queue arasında kurulacak bağlantının yönlendirme kuralıdır. Exchange aldığı mesajları bu kurala göre ilgili kuyruklara dağıtır.
  • Routing Key: Producer tarafından üretilerek Exchange’e iletilen mesajın, hangi kuyruğa yönlendirileceğinin işaretlenerek bildirilmesidir.
  • Exhange Type: Mesajın kuyruğa hangi yönteme göre iletileceğinin belirtildiği tiptir.
rabbitmq-broker-exchange

Exchange Types

RabbitMQ üzerinde 4 adet exchange tipi tanımlıdır. Bunlar haricinde sistemde Default Exchange varsayılan olarak tanımlıdır ve Direct tipindedir.

  • Direct Exchange
  • Topic Exchange
  • Fanout Exchange
  • Header Exchange

Direct Exchange

Mesaj gönderilirken belirtilen routing key ile eşleşen Binding (Queue bir Exchange’e atanırken belirlenmiş key) ile ilgili Queue’ye aktarılır. Aynı zamanda default olarak kullanılan Exhange türüdür, her Queue otomatik olarak bu exchange türüne routing key ile atanır.

rabbitmq-direct-exchange

Yukarıdaki örneği incelediğimizde Producer tarafından üretilen mesaj log.error routing key ile exchange’e gönderildiğinde, eşleşen Binding keye ait Queue‘ye aktarıldığını göreceksiniz.

Örnek projeye başlamadan önce akılda soru işareti kalmaması açısından Queue property’lerine göz atalım.

Queue Properties

Name: Oluşturulacak queue’ye verilecek isim.

Durable: Varsayılan olarak Transient yani in-memory olarak tutulmaktadır. Kuyrukların broker restart olduğunda persistence olması isteniyorsa bu özellik true olarak ayarlanmalıdır.

Exclusive: Bu özellik true olarak ayarlanırsa yalnızca kendi oluşturulduğu connection tarafından kullanılabilir.

AutoDelete: Bu özellik true olarak ayarlanırsa son consumer kuyruktan unsubscribe olduğunda kuyruk silinir.

Konuyu pekiştirmek için bir örnek yapalım. İlk olarak Producer konsol uygulamamızı oluşturup RabbitMQ.Client isimli paketi kuralım.

dotnet new console -n Producer

Kuyruğa mesaj üretecek kodumuzu yazalım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("ex.direct", "direct", true, true);

channel.QueueDeclare("log.error", true, false, false);
channel.QueueDeclare("log.warning", true, false, false);
channel.QueueDeclare("log.info", true, false, false);

channel.QueueBind("log.error", "ex.direct", "error");
channel.QueueBind("log.warning", "ex.direct", "warning");
channel.QueueBind("log.info", "ex.direct", "info");

channel.BasicPublish("ex.direct", "info", false, null, Encoding.UTF8.GetBytes("This is an info message."));
channel.BasicPublish("ex.direct", "error", false, null, Encoding.UTF8.GetBytes("This is an error message."));
channel.BasicPublish("ex.direct", "warning", false, null, Encoding.UTF8.GetBytes("This is a warning message."));

Console.WriteLine("Press any key to exit.");
Console.ReadKey();

channel.QueueDelete("log.error");
channel.QueueDelete("log.info");
channel.QueueDelete("log.warning");
channel.ExchangeDelete("ex.direct");

channel.Close();
connection.Close();

ConnectionFactory sınıfıyla localhost adresine, varsayılan VHost üzerinde ve varsayılan yetkilendirme bilgileriyle bağlantı oluşturulmasını sağlayacak bir nesne örnekleniyor. Bu nesne aracılığıyla bir bağlantı açılıyor ve IConnection arayüzü tipinde bir nesne elde ediyoruz. Açılan bağlantı aracılığıyla asıl mesajlaşma işlemlerini gerçekleştirecek bir channel oluşturuluyor ve IModel arayüzü tipinde bir nesne elde ediyoruz.

Kodumuzu incelemeye devam ettiğimizde methodların IModel arayüzü üzerinden çağırıldığını görüyoruz. ExchangeDeclare methoduyla ex.direct isminde ve direct tipinde bir exchange oluşturuluyor. QueueDeclare methoduyla log.error, log.warning, log.info isimlerinde Queue’ler oluşturuluyor ve ardından oluşturmuş olduğumuz ex.direct isimli exchange’e error, warning ve info routing keyleriyle bağlama işlemini yapıyoruz. Göndereceğiz mesajları byte dizisine çevirdikten sonra BasicPublish ile mesajlarımızı hangi routing key ve exchange’e gideceğini belirterek iletiyoruz. Exchange aldığı mesajı eşleşen kuyruğa aktarıyor.

Şimdi consumer projemizi yazalım, yine aynı şekilde projemizi oluşturduktan sonra RabbitMQ.Client isimli Nuget Paketini kuralım.

dotnet new console -n Consumer

Kuyruktaki mesajları tüketecek kodumuzu yazalım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
    var message = Encoding.UTF8.GetString(e.Body.ToArray());
    Console.WriteLine("Message received: {0}", message);
};

channel.BasicConsume("log.info", true, consumer);
channel.BasicConsume("log.error", true, consumer);
channel.BasicConsume("log.warning", true, consumer);

Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();

Bağlantı oluşturma, kanal açma gibi işlemler burada da aynı şekildedir. Sonrasında BasicConsume ile ilgili kuyruğu dinleyeceğimizi bildiriyoruz. EventingBasicConsumer ile oluşturacağımız consumer’ın Received EventHandler’i üzerinden kuyruktaki mesajları byte’tan string’e çevirerek tüketiyoruz.

Topic Exchange

Topic exchange, direct exchange’in aksine birden fazla Queue’ya mesaj iletebilir. Yine aynı şekilde gönderilen mesajdaki routing key, eşleşen Binding keye sahip queue’ya aktarılır. Ancak binding key ile routing key, noktalarla ayrılmış şekilde kelimelerden oluşmalıdır. (Direct exchange’te böyle bir zorunluluk yok.) Kelime 255 byte’a kadar herhangi bir metin olabilir. Binding key tanımlanırken regex’ten de aşina olduğumuz iki özel durum söz konusudur.

  • * (yıldız): Kullanıldığı yerde bir kelime yerine geçebilir.
  • # (hash): Kullanıldığı yerde sıfır veya daha fazla kelime yerine geçebilir.
rabbitmq-topic-exchange

Producer tarafından üretilen mesajları tek tek inceleyelim.

  • log: routing key ile eşleşen bir binding olmadığı için hiç bir queue’ya aktarılmayacak.
  • log.error: tıpkı direct exchange gibi ilgili queue’ya aktarılacak.
  • log.info: log.* binding keyine sahip queue’ya aktarılacak.
  • test.warning: #.warning binding keyine sahip queue’ya aktarılacak.
  • warning: #.warning binding keyine sahip queue’ya aktarılacak.

Producer uygulamamızı oluşturup RabbitMQ.Client isimli paketi kuralım.

dotnet new console -n Producer

Kuyruğa mesaj üretecek kodumuzu yazalım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("ex.topic", "topic", false, true);

channel.QueueDeclare("log.error", false, false, true);
channel.QueueDeclare("logs.all", false, false, true);
channel.QueueDeclare("all.warnings", false, false, true);

channel.QueueBind("log.error", "ex.topic", "log.error");
channel.QueueBind("logs.all", "ex.topic", "log.*");
channel.QueueBind("all.warnings", "ex.topic", "#.warning");

channel.BasicPublish("ex.topic", "log", null, Encoding.UTF8.GetBytes("This is a log message"));
channel.BasicPublish("ex.topic", "log.error", null, Encoding.UTF8.GetBytes("This is an error message"));
channel.BasicPublish("ex.topic", "log.info", null, Encoding.UTF8.GetBytes("This is an info message"));
channel.BasicPublish("ex.topic", "test.warning", null, Encoding.UTF8.GetBytes("This is a test warning message"));
channel.BasicPublish("ex.topic", "warning", null, Encoding.UTF8.GetBytes("This is a warning message"));

Console.WriteLine("Press any key to exit.");
Console.ReadKey();

channel.QueueDelete("log.error");
channel.QueueDelete("logs.all");
channel.QueueDelete("all.warnings");
channel.ExchangeDelete("ex.topic");

channel.Close();
connection.Close();

Burada farklılık gösteren tek kısım QueueBind methodunda değinildiği üzere binding key’ler notasyona uygun şekilde belirtilmiştir. (log.error, log.*, #.warning) Şimdi consumer projemize geçelim, yine aynı şekilde projemizi oluşturup gerekli paketi kuruyoruz.

dotnet new console -n Consumer

Mesajları tüketecek Consumer kodlarımızı yazalım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
    var message = Encoding.UTF8.GetString(e.Body.ToArray());
    Console.WriteLine("Message received: {0}", message);
};

channel.BasicConsume("log.error", true, consumer);
channel.BasicConsume("logs.all", true, consumer);
channel.BasicConsume("all.warnings", true, consumer);

Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();

Görüldüğü üzere kodumuz bir öncekine göre aynı. BasicConsume methodlarını teker teker yorum satırı haline getirerek mesaj akışı incelenebilir.

Fanout Exchange

Fanout Exchange’te gönderilen mesajlar kopyalanarak bağlanmış tüm queue’lara routing keye bakılmaksızın gönderilir. Bu exchange türünde routing keyin bir önemi yoktur.

rabbitmq-fanout-exchange

Gönderilen mesajı tüm queue’lara kopyalanarak dağıtılacaktır. Producer projemizi oluşturup, gerekli paketi kurup kodumuzu yazalım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("ex.fanout", "fanout", false, true);

channel.QueueDeclare("queue.one", false, false, true);
channel.QueueDeclare("queue.two", false, false, true);

channel.QueueBind("queue.one", "ex.fanout", string.Empty);
channel.QueueBind("queue.two", "ex.fanout", string.Empty);

channel.BasicPublish("ex.fanout", string.Empty, false, null, Encoding.UTF8.GetBytes("This is a first fanout message."));
channel.BasicPublish("ex.fanout", string.Empty, false, null, Encoding.UTF8.GetBytes("This is a second fanout message."));

Console.WriteLine("Press any key to exit.");
Console.ReadKey();

channel.QueueDelete("queue.one");
channel.QueueDelete("queue.two");
channel.ExchangeDelete("ex.fanout");

channel.Close();
connection.Close();

Burada farklı olan routing key ve binding key’ler gözardı edildiği için değer olarak boş string verilmiş olmasıdır.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, args) =>
{
    var message = Encoding.UTF8.GetString(args.Body.ToArray());
    Console.WriteLine("Message received: {0}", message);
};

channel.BasicConsume("queue.one", false, consumer);
channel.BasicConsume("queue.two", false, consumer);
Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();

Header Exchange

Header Exchange, Direct Exchange’e çok benzer, fakat iletişim routing key yerine header key-value’larına göre yapılmaktadır. Exchange, gelen mesajları hangi queue’ya yönlendireceğine karar verebilmesi için binding key ayrıca x-match başlığını barındırmalıdır. Bu argüman all veya any değerini alabilir. all değerini alması durumunda tüm değerlerinin eşleşmesi, any olması durumunda bir değerin eşleşmesi yeterlidir.

rabbitmq-header-exchange

Gönderilen mesajı, ilk queue tüm header değerleri eşleştiği ve x-match değeri all olduğu için alacaktır. Aynı şekilde x-match değeri any olan queue’da bu mesajı en az bir değer eşleştiği için alacaktır.

Producer ve consumer projelerimizi oluşturup paketleri kurarak kodumuzu yazmaya başlayalım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("ex.headers", "headers", false, true);

channel.QueueDeclare("queue.one", false, false, true);
channel.QueueDeclare("queue.two", false, false, true);
channel.QueueDeclare("queue.three", false, false, true);

channel.QueueBind("queue.one", "ex.headers", string.Empty, new Dictionary<string, object>
{
    {"x-match", "all"},
    {"op", "convert"},
    {"format", "jpeg"}
});
channel.QueueBind("queue.two", "ex.headers", string.Empty, new Dictionary<string, object>
{
    {"x-match", "all"},
    {"op", "convert"},
    {"format", "png"}
});
channel.QueueBind("queue.three", "ex.headers", string.Empty, new Dictionary<string, object>
{
    {"x-match", "any"},
    {"op", "convert"},
    {"format", "bitmap"}
});

var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>()
{
    {"op", "convert"},
    {"format", "jpeg"}
};
channel.BasicPublish("ex.headers", string.Empty, false, props, Encoding.UTF8.GetBytes("This is a jpeg convert operation."));

props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>
{
    {"op", "convert"},
    {"format", "gif"}
};
channel.BasicPublish("ex.headers", string.Empty, props, Encoding.UTF8.GetBytes("This is a gif convert operation."));

Console.WriteLine("Press any key to exit.");
Console.ReadKey();

channel.QueueDelete("queue.one");
channel.QueueDelete("queue.two");
channel.QueueDelete("queue.three");
channel.ExchangeDelete("ex.headers");

channel.Close();
connection.Close();

Yine bağlantımızı oluşturup kanalımızı açtıktan sonra queue’larımızı oluşturduğumuzu ve exchange’e bind ederken routing key yerine header değerleri kullandığımıza dikkat edin.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
    var message = Encoding.UTF8.GetString(e.Body.ToArray());
    Console.WriteLine("Message received: {0}", message);
};

channel.BasicConsume("queue.one", true, consumer);
//channel.BasicConsume("queue.two", true, consumer);
channel.BasicConsume("queue.three", true, consumer);

Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();

Bonus: RabbitMQ Management Dashboard

Şimdiye kadar RabbitMQ ile AMQP 0-9-1 protokolü üzerinden 5672 portuyla haberleştik, ancak 15672 portu üzerinden HTTP API istemcilerine ve Management UI sistemine erişebiliriz. Bunun için localhost:15672 adresini kullanmalıyız. Giriş yaparken daha önce de kullandığımız guest/guest default bilgilerini kullanarak sisteme giriş yapıyoruz. Bu ui aracılığıyla aşağıdaki gibi işlemleri gerçekleştirebiliriz.

  • Mevcut queue mesajlarına bakabilir,
  • Bağlantılara ve channel’lara göz atabilir,
  • VHost’lara bakabilir yeni bir tane oluşturabilir,
  • Exchange’lere bakabilir veya yeni bir tane oluşturabilir,
  • Queue’lara bakabilir ve bir queue’yu mevcut exchange’e bind ederek yeni bir mesaj produce edip aynı şekilde consume edebilir,
  • Yeni bir kullanıcı ekleyebilir, yetkilendirme işlemlerini yapabiliriz.

Örnek olması amacıyla bir exchange tanımlayalım ve oluşturacağımız queue’yu bind ederek bir mesaj oluşturup tüketelim.

rabbit mq exchanges

Gerekli bilgileri doldurup Add exchange ile sisteme ekliyoruz.

RabbitMQ UI Queues

Queue’umuzu oluşturduktan sonra daha önce oluşturduğumuz exchange’e bind edelim. Bunun için listeye eklenen queue üzerine tıklayarak detay sayfasına gidebiliriz.

RabbitMQ Exchange Queue Binding

Daha önce de değindiğim gibi oluşturulan her queue varsayılan olarak default exchange’e bind edilir. İşlemi tamamladıktan sonra artık bir mesaj yayınlayabiliriz, bunun için Exchanges sekmesi altından exchange detayına gitmemiz gerekiyor.

RabbitMQ Management UI Publish Message

Mesajı consume edebilmek için Queues sekmesi altından detay sayfasına gitmemiz gerekiyor.

RabbitMQ Management UI Get Messages

Mesajımızı consume ettik, burada yeni bir terim olan Ack Mode ile karşılaştığımıza dikkat edin. Bu mesajın nasıl işlendiğiyle ilgilidir, mesajın başarılı şekilde alındığı, reddedildiği bilgileridir, bu konu hakkında detaylı bilgiye RabbitMQ Routings, Message Patterns başlığı altından ulaşabilirsiniz.

Kaynak dosyasına buradan ulaşabilirsiniz.

You may also like...

1 Response

  1. Halil Yiğit dedi ki:

    Teşekkür ederim çok yardımcı oldu.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir