RabbitMQ Nedir? Queues ve Exhange Types
RabbitMQ, en basit tabiriyle iletişim halindeki sistemler arasındaki asenkron mesaj kuyruk sistemidir, bir mesaj oluşturulur ve oluşturulan mesaj kabul edilerek, ilgili yere (kuyruğa) tüketilmek üzere iletilir. Çoğu yerde postane örneği verilmektedir; gönderilmek istenen mektup posta kutusuna bırakıldıktan sonra mektup, postacı tarafından alıcısına ulaştırılır. Bu bağlamda RabbitMQ hem posta kutusu hem postane hem de postacıdır. Tek farkı kağıt yerine, FIFO (First In First Out) mantığında binary large object (BLOB) taşımasıdır. Yine bu örnekte terimlere geçmeden önce biz Producer, mektubu alan kişi Consumer rolündedir.
Erlang dilinde yazılmıştır. Açık kaynaklıdır ve AMQP 0-9-1, STOMP, MQTT, AMQP 1.0 protokollerini destekler. (Advanced Message Queue Protocol) Piyasadaki alternatifleri MSMQ, Apache Kafka, MS Azure Bus’tur ve multi-language ve cross-platform özellikleri de vardır.
Ön bilgileri verdikten sonra ilgili terimlere göz atarak ısınma turuna başlamış olalım.
- Producer: Kuyruğa mesaj gönderen, üreten uygulamadır.
- Consumer: Dinlediği kuyruktaki mesajları tüketerek işleyen uygulamadır.
- Queue: Mesajların eklendiği, tüketildiği ve tutulduğu listedir.
- Exchange: Producer ürettiği mesajı doğrudan alıcıya veya kuyruğa göndermez, arada bir mesaj yönlendiricisi vardır, bu yönlendirme işlemini gerçekleştiren yapıdır. Producer ürettiği mesajı exchange’e iletir, exchange gelen mesajı ilgili bilgilerle ilgili kuyruğa ekler ve dinleyen bir consumer varsa kuyruktan sıradaki mesajı işlemek üzere alır. Kısaca yaptığı görev belirtilen Routing Key‘e göre aldığı mesajı ilgili Queue‘ye iletmektir.
- Binding: Exchange ile Queue arasında kurulacak bağlantının yönlendirme kuralıdır. Exchange aldığı mesajları bu kurala göre ilgili kuyruklara dağıtır.
- Routing Key: Producer tarafından üretilerek Exchange’e iletilen mesajın, hangi kuyruğa yönlendirileceğinin işaretlenerek bildirilmesidir.
- Exhange Type: Mesajın kuyruğa hangi yönteme göre iletileceğinin belirtildiği tiptir.
Exchange Types
RabbitMQ üzerinde 4 adet exchange tipi tanımlıdır. Bunlar haricinde sistemde Default Exchange varsayılan olarak tanımlıdır ve Direct tipindedir.
- Direct Exchange
- Topic Exchange
- Fanout Exchange
- Header Exchange
Direct Exchange
Mesaj gönderilirken belirtilen routing key ile eşleşen Binding (Queue bir Exchange’e atanırken belirlenmiş key) ile ilgili Queue’ye aktarılır. Aynı zamanda default olarak kullanılan Exhange türüdür, her Queue otomatik olarak bu exchange türüne routing key ile atanır.
Yukarıdaki örneği incelediğimizde Producer tarafından üretilen mesaj log.error routing key ile exchange’e gönderildiğinde, eşleşen Binding keye ait Queue‘ye aktarıldığını göreceksiniz.
Örnek projeye başlamadan önce akılda soru işareti kalmaması açısından Queue property’lerine göz atalım.
Queue Properties
Name: Oluşturulacak queue’ye verilecek isim.
Durable: Varsayılan olarak Transient yani in-memory olarak tutulmaktadır. Kuyrukların broker restart olduğunda persistence olması isteniyorsa bu özellik true olarak ayarlanmalıdır.
Exclusive: Bu özellik true olarak ayarlanırsa yalnızca kendi oluşturulduğu connection tarafından kullanılabilir.
AutoDelete: Bu özellik true olarak ayarlanırsa son consumer kuyruktan unsubscribe olduğunda kuyruk silinir.
Konuyu pekiştirmek için bir örnek yapalım. İlk olarak Producer konsol uygulamamızı oluşturup RabbitMQ.Client
isimli paketi kuralım.
dotnet new console -n Producer
Kuyruğa mesaj üretecek kodumuzu yazalım.
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "/",
Port = 5672,
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare("ex.direct", "direct", true, true);
channel.QueueDeclare("log.error", true, false, false);
channel.QueueDeclare("log.warning", true, false, false);
channel.QueueDeclare("log.info", true, false, false);
channel.QueueBind("log.error", "ex.direct", "error");
channel.QueueBind("log.warning", "ex.direct", "warning");
channel.QueueBind("log.info", "ex.direct", "info");
channel.BasicPublish("ex.direct", "info", false, null, Encoding.UTF8.GetBytes("This is an info message."));
channel.BasicPublish("ex.direct", "error", false, null, Encoding.UTF8.GetBytes("This is an error message."));
channel.BasicPublish("ex.direct", "warning", false, null, Encoding.UTF8.GetBytes("This is a warning message."));
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
channel.QueueDelete("log.error");
channel.QueueDelete("log.info");
channel.QueueDelete("log.warning");
channel.ExchangeDelete("ex.direct");
channel.Close();
connection.Close();
ConnectionFactory sınıfıyla localhost adresine, varsayılan VHost üzerinde ve varsayılan yetkilendirme bilgileriyle bağlantı oluşturulmasını sağlayacak bir nesne örnekleniyor. Bu nesne aracılığıyla bir bağlantı açılıyor ve IConnection
arayüzü tipinde bir nesne elde ediyoruz. Açılan bağlantı aracılığıyla asıl mesajlaşma işlemlerini gerçekleştirecek bir channel oluşturuluyor ve IModel
arayüzü tipinde bir nesne elde ediyoruz.
Kodumuzu incelemeye devam ettiğimizde methodların IModel
arayüzü üzerinden çağırıldığını görüyoruz. ExchangeDeclare
methoduyla ex.direct
isminde ve direct
tipinde bir exchange oluşturuluyor. QueueDeclare
methoduyla log.error
, log.warning
, log.info
isimlerinde Queue’ler oluşturuluyor ve ardından oluşturmuş olduğumuz ex.direct
isimli exchange’e error
, warning
ve info
routing keyleriyle bağlama işlemini yapıyoruz. Göndereceğiz mesajları byte dizisine çevirdikten sonra BasicPublish
ile mesajlarımızı hangi routing key ve exchange’e gideceğini belirterek iletiyoruz. Exchange aldığı mesajı eşleşen kuyruğa aktarıyor.
Şimdi consumer projemizi yazalım, yine aynı şekilde projemizi oluşturduktan sonra RabbitMQ.Client
isimli Nuget Paketini kuralım.
dotnet new console -n Consumer
Kuyruktaki mesajları tüketecek kodumuzu yazalım.
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "/",
Port = 5672,
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
var message = Encoding.UTF8.GetString(e.Body.ToArray());
Console.WriteLine("Message received: {0}", message);
};
channel.BasicConsume("log.info", true, consumer);
channel.BasicConsume("log.error", true, consumer);
channel.BasicConsume("log.warning", true, consumer);
Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();
Bağlantı oluşturma, kanal açma gibi işlemler burada da aynı şekildedir. Sonrasında BasicConsume
ile ilgili kuyruğu dinleyeceğimizi bildiriyoruz. EventingBasicConsumer
ile oluşturacağımız consumer’ın Received
EventHandler’i üzerinden kuyruktaki mesajları byte’tan string’e çevirerek tüketiyoruz.
Topic Exchange
Topic exchange, direct exchange’in aksine birden fazla Queue’ya mesaj iletebilir. Yine aynı şekilde gönderilen mesajdaki routing key, eşleşen Binding keye sahip queue’ya aktarılır. Ancak binding key ile routing key, noktalarla ayrılmış şekilde kelimelerden oluşmalıdır. (Direct exchange’te böyle bir zorunluluk yok.) Kelime 255 byte’a kadar herhangi bir metin olabilir. Binding key tanımlanırken regex’ten de aşina olduğumuz iki özel durum söz konusudur.
- * (yıldız): Kullanıldığı yerde bir kelime yerine geçebilir.
- # (hash): Kullanıldığı yerde sıfır veya daha fazla kelime yerine geçebilir.
Producer tarafından üretilen mesajları tek tek inceleyelim.
- log: routing key ile eşleşen bir binding olmadığı için hiç bir queue’ya aktarılmayacak.
- log.error: tıpkı direct exchange gibi ilgili queue’ya aktarılacak.
- log.info:
log.*
binding keyine sahip queue’ya aktarılacak. - test.warning:
#.warning
binding keyine sahip queue’ya aktarılacak. - warning:
#.warning
binding keyine sahip queue’ya aktarılacak.
Producer uygulamamızı oluşturup RabbitMQ.Client
isimli paketi kuralım.
dotnet new console -n Producer
Kuyruğa mesaj üretecek kodumuzu yazalım.
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "/",
Port = 5672,
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare("ex.topic", "topic", false, true);
channel.QueueDeclare("log.error", false, false, true);
channel.QueueDeclare("logs.all", false, false, true);
channel.QueueDeclare("all.warnings", false, false, true);
channel.QueueBind("log.error", "ex.topic", "log.error");
channel.QueueBind("logs.all", "ex.topic", "log.*");
channel.QueueBind("all.warnings", "ex.topic", "#.warning");
channel.BasicPublish("ex.topic", "log", null, Encoding.UTF8.GetBytes("This is a log message"));
channel.BasicPublish("ex.topic", "log.error", null, Encoding.UTF8.GetBytes("This is an error message"));
channel.BasicPublish("ex.topic", "log.info", null, Encoding.UTF8.GetBytes("This is an info message"));
channel.BasicPublish("ex.topic", "test.warning", null, Encoding.UTF8.GetBytes("This is a test warning message"));
channel.BasicPublish("ex.topic", "warning", null, Encoding.UTF8.GetBytes("This is a warning message"));
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
channel.QueueDelete("log.error");
channel.QueueDelete("logs.all");
channel.QueueDelete("all.warnings");
channel.ExchangeDelete("ex.topic");
channel.Close();
connection.Close();
Burada farklılık gösteren tek kısım QueueBind
methodunda değinildiği üzere binding key’ler notasyona uygun şekilde belirtilmiştir. (log.error
, log.*
, #.warning
) Şimdi consumer projemize geçelim, yine aynı şekilde projemizi oluşturup gerekli paketi kuruyoruz.
dotnet new console -n Consumer
Mesajları tüketecek Consumer kodlarımızı yazalım.
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "/",
Port = 5672,
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
var message = Encoding.UTF8.GetString(e.Body.ToArray());
Console.WriteLine("Message received: {0}", message);
};
channel.BasicConsume("log.error", true, consumer);
channel.BasicConsume("logs.all", true, consumer);
channel.BasicConsume("all.warnings", true, consumer);
Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();
Görüldüğü üzere kodumuz bir öncekine göre aynı. BasicConsume
methodlarını teker teker yorum satırı haline getirerek mesaj akışı incelenebilir.
Fanout Exchange
Fanout Exchange’te gönderilen mesajlar kopyalanarak bağlanmış tüm queue’lara routing keye bakılmaksızın gönderilir. Bu exchange türünde routing keyin bir önemi yoktur.
Gönderilen mesajı tüm queue’lara kopyalanarak dağıtılacaktır. Producer projemizi oluşturup, gerekli paketi kurup kodumuzu yazalım.
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "/",
Port = 5672,
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare("ex.fanout", "fanout", false, true);
channel.QueueDeclare("queue.one", false, false, true);
channel.QueueDeclare("queue.two", false, false, true);
channel.QueueBind("queue.one", "ex.fanout", string.Empty);
channel.QueueBind("queue.two", "ex.fanout", string.Empty);
channel.BasicPublish("ex.fanout", string.Empty, false, null, Encoding.UTF8.GetBytes("This is a first fanout message."));
channel.BasicPublish("ex.fanout", string.Empty, false, null, Encoding.UTF8.GetBytes("This is a second fanout message."));
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
channel.QueueDelete("queue.one");
channel.QueueDelete("queue.two");
channel.ExchangeDelete("ex.fanout");
channel.Close();
connection.Close();
Burada farklı olan routing key ve binding key’ler gözardı edildiği için değer olarak boş string verilmiş olmasıdır.
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "/",
Port = 5672,
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, args) =>
{
var message = Encoding.UTF8.GetString(args.Body.ToArray());
Console.WriteLine("Message received: {0}", message);
};
channel.BasicConsume("queue.one", false, consumer);
channel.BasicConsume("queue.two", false, consumer);
Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();
Header Exchange
Header Exchange, Direct Exchange’e çok benzer, fakat iletişim routing key yerine header key-value’larına göre yapılmaktadır. Exchange, gelen mesajları hangi queue’ya yönlendireceğine karar verebilmesi için binding key ayrıca x-match
başlığını barındırmalıdır. Bu argüman all
veya any
değerini alabilir. all
değerini alması durumunda tüm değerlerinin eşleşmesi, any
olması durumunda bir değerin eşleşmesi yeterlidir.
Gönderilen mesajı, ilk queue tüm header değerleri eşleştiği ve x-match
değeri all
olduğu için alacaktır. Aynı şekilde x-match
değeri any
olan queue’da bu mesajı en az bir değer eşleştiği için alacaktır.
Producer ve consumer projelerimizi oluşturup paketleri kurarak kodumuzu yazmaya başlayalım.
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "/",
Port = 5672,
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare("ex.headers", "headers", false, true);
channel.QueueDeclare("queue.one", false, false, true);
channel.QueueDeclare("queue.two", false, false, true);
channel.QueueDeclare("queue.three", false, false, true);
channel.QueueBind("queue.one", "ex.headers", string.Empty, new Dictionary<string, object>
{
{"x-match", "all"},
{"op", "convert"},
{"format", "jpeg"}
});
channel.QueueBind("queue.two", "ex.headers", string.Empty, new Dictionary<string, object>
{
{"x-match", "all"},
{"op", "convert"},
{"format", "png"}
});
channel.QueueBind("queue.three", "ex.headers", string.Empty, new Dictionary<string, object>
{
{"x-match", "any"},
{"op", "convert"},
{"format", "bitmap"}
});
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>()
{
{"op", "convert"},
{"format", "jpeg"}
};
channel.BasicPublish("ex.headers", string.Empty, false, props, Encoding.UTF8.GetBytes("This is a jpeg convert operation."));
props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>
{
{"op", "convert"},
{"format", "gif"}
};
channel.BasicPublish("ex.headers", string.Empty, props, Encoding.UTF8.GetBytes("This is a gif convert operation."));
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
channel.QueueDelete("queue.one");
channel.QueueDelete("queue.two");
channel.QueueDelete("queue.three");
channel.ExchangeDelete("ex.headers");
channel.Close();
connection.Close();
Yine bağlantımızı oluşturup kanalımızı açtıktan sonra queue’larımızı oluşturduğumuzu ve exchange’e bind ederken routing key yerine header değerleri kullandığımıza dikkat edin.
var factory = new ConnectionFactory()
{
HostName = "localhost",
VirtualHost = "/",
Port = 5672,
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
var message = Encoding.UTF8.GetString(e.Body.ToArray());
Console.WriteLine("Message received: {0}", message);
};
channel.BasicConsume("queue.one", true, consumer);
//channel.BasicConsume("queue.two", true, consumer);
channel.BasicConsume("queue.three", true, consumer);
Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();
Bonus: RabbitMQ Management Dashboard
Şimdiye kadar RabbitMQ ile AMQP 0-9-1 protokolü üzerinden 5672 portuyla haberleştik, ancak 15672 portu üzerinden HTTP API istemcilerine ve Management UI sistemine erişebiliriz. Bunun için localhost:15672 adresini kullanmalıyız. Giriş yaparken daha önce de kullandığımız guest/guest default bilgilerini kullanarak sisteme giriş yapıyoruz. Bu ui aracılığıyla aşağıdaki gibi işlemleri gerçekleştirebiliriz.
- Mevcut queue mesajlarına bakabilir,
- Bağlantılara ve channel’lara göz atabilir,
- VHost’lara bakabilir yeni bir tane oluşturabilir,
- Exchange’lere bakabilir veya yeni bir tane oluşturabilir,
- Queue’lara bakabilir ve bir queue’yu mevcut exchange’e bind ederek yeni bir mesaj produce edip aynı şekilde consume edebilir,
- Yeni bir kullanıcı ekleyebilir, yetkilendirme işlemlerini yapabiliriz.
Örnek olması amacıyla bir exchange tanımlayalım ve oluşturacağımız queue’yu bind ederek bir mesaj oluşturup tüketelim.
Gerekli bilgileri doldurup Add exchange ile sisteme ekliyoruz.
Queue’umuzu oluşturduktan sonra daha önce oluşturduğumuz exchange’e bind edelim. Bunun için listeye eklenen queue üzerine tıklayarak detay sayfasına gidebiliriz.
Daha önce de değindiğim gibi oluşturulan her queue varsayılan olarak default exchange’e bind edilir. İşlemi tamamladıktan sonra artık bir mesaj yayınlayabiliriz, bunun için Exchanges sekmesi altından exchange detayına gitmemiz gerekiyor.
Mesajı consume edebilmek için Queues sekmesi altından detay sayfasına gitmemiz gerekiyor.
Mesajımızı consume ettik, burada yeni bir terim olan Ack Mode ile karşılaştığımıza dikkat edin. Bu mesajın nasıl işlendiğiyle ilgilidir, mesajın başarılı şekilde alındığı, reddedildiği bilgileridir, bu konu hakkında detaylı bilgiye RabbitMQ Routings, Message Patterns başlığı altından ulaşabilirsiniz.
Kaynak dosyasına buradan ulaşabilirsiniz.
Teşekkür ederim çok yardımcı oldu.