RabbitMQ Routings, Message Patterns

rabbitmq

RabbitMQ, dağıtık sistemler için çeşitli asenkron yapılar kullanır. Bu yazımızda hem routing ile ilgili detaylı konulara (Exchange to Exchange Binding, Alternate Exchange) hem de message pattern’lere (Work Queues, Round-Robin, Publish-Subscribe, Push-Pull, Request-Reply) göz atıyor olacağız.

Bu yazıda ilerlemeden önce RabbitMQ konusuna giriş yapmak adına RabbitMQ Nedir yazıma göz atabilirsiniz.

Exchange to Exchange Binding

Bir queue’yu nasıl bir exchange’e bind ediyorsak bir exchange’i de başka bir exchange’e bind edebiliriz, yönlendirme kuralları aynıdır. Exchange üzerinden çıkan mesaj, hedef exchange’e binding ayarlarına göre yönlendirilir ve hedef exchange bu mesajı bağlı queue’lere iletir.

RabbitMQ Exchange to Exchange Binding

Yukarıdaki örneği incelediğinizde route.rabbit route keyine sahip exchange’in diğer exchange’e bind edildiğini göreceksiniz. Bu route keye sahip bir mesaj bir exchange’ten diğerine yönlendirilerek eşleşen queue’ye yönlendirilir.

Producer projemizi oluşturup, RabbitMQ.Client isimli Nuget paketini kuralım.

dotnet new console -n ExchangeToExchange
var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("ex.dog", "direct", false, true);
channel.ExchangeDeclare("ex.rabbit", "direct", false, true);

channel.QueueDeclare("queue-dog", false, false, true);
channel.QueueDeclare("queue-rabbit", false, false, true);

channel.ExchangeBind("ex.rabbit", "ex.dog", "route.rabbit");

channel.QueueBind("queue-dog", "ex.dog", "route.dog");
channel.QueueBind("queue-rabbit", "ex.rabbit", "route.rabbit");

channel.BasicPublish("ex.dog", "route.dog", false, null, Encoding.UTF8.GetBytes("This is a message from dog."));
channel.BasicPublish("ex.dog", "route.rabbit", false, null, Encoding.UTF8.GetBytes("This is a message from rabbit."));

Console.WriteLine("Press any key to exit.");
Console.ReadKey();

Producer kodumuzu incelediğimizde önceki kodlardan farklı olarak ExchangeBind methoduyla ex.rabbit‘in ex.dog‘a route.rabbit route keyiyle birlikte bind edildiğini göreceksiniz. ex.dog exchange’i üzerinden ilettiğimiz route.rabbit route keyine sahip mesajın ex.rabbit exchange’ine ve oradan queue-rabbit queue’sine aktarıldığını göreceksiniz. Bu mesajı RabbitMQ Management UI üzerinden consume edebilirsiniz.

Alternate Exchange

Dağıtık sistemler üzerinde kurulan asenkron bir yapıda bir exchange’e yönlendirilen bazı mesajlar bağlı queue’lara yönlendirilmek üzere uygun olmayabilir. Bunlar unrouted message olarak adlandırılmaktadır. Bu mesajlar exchange tarafından yok sayılır ve mesaj kaybolur. İşte bu durumlarda bu mesajları toplamak için Alternate Exchange kullanılabilir. Bir alternate exchange tanımlamak için -biraz sonra göreceğimiz gibi- exchange’e alternate-exchange keyi ve hangi exchange’in alternate exchange olarak kullanılacağı değer eklenmelidir, böylece unrouted mesajlar bu exchange’e yönlendirilecektir. Fanout Exchange bir filtreleme işlemi yapmadığından dolayı bu yapıya gayet uygundur.

RabbitMQ Alternate Exchange
Yukarıdaki örnekte de göreceğiniz üzere Exchange1’e yönlendirilen mesaj bağlı queue’lar ile eşleşmeyeceğinden dolayı fanout tipindeki alternate exchange olarak tanımlanmış Exchange2’ye yönlendirilecektir.

Producer kodumuzu yazıp gerekli paketi kuralım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare("queue.video", false, false, true);
channel.QueueDeclare("queue.image", false, false, true);
channel.QueueDeclare("queue.unrouted", false, false, true);

channel.ExchangeDeclare("ex.fanout", "fanout", true, false);
channel.ExchangeDeclare("ex.direct", "direct", true, false, new Dictionary<string, object>
{
    {"alternate-exchange", "ex.fanout"}
});

channel.QueueBind("queue.video", "ex.direct", "video");
channel.QueueBind("queue.image", "ex.direct", "image");
channel.QueueBind("queue.unrouted", "ex.fanout", string.Empty);

channel.BasicPublish("ex.direct", "video", false, null, Encoding.UTF8.GetBytes("This is video message."));
channel.BasicPublish("ex.direct", "image", false, null, Encoding.UTF8.GetBytes("This is an image message."));
channel.BasicPublish("ex.direct", "image", false, null, Encoding.UTF8.GetBytes("This is a text message."));

Exchange ex.fanout‘un ex.direct‘e alternate-exchange olarak atandığına dikkat edelim, böylece publish edilen text mesajı fanout exchange yönlendirilerek bağlı tüm queue’lara aktarılacaktır.

Push & Pull

Bir queue üzerinden mesajları consume etmek için iki yol mevcuttur, bunlar push ve pull’dur.

Şimdiye kadar yaptığımız örnekler tavsiye edilen yol olmakla birlikte push modelini kullanıyordu. Consumer uygulaması bir queue’ya subscribe olur ve mesajları dinler. Kuyrukta bekleyen mesajlarla birlikte kuyruğa yeni eklenen mesajlar doğrudan consumer uygulamasına push edilerek gönderilir.

Pull modelindeyse, consumer uygulaması queue’ya subscribe olmaz. Bunun yerine kuyruğu periyodik olarak kontrol ederek yeni mesajlara bakar. Kuyrukta yeni bir mesaj olması durumunda consumer uygulaması manuel fetch işlemi gerçekleştirir. Bu model her ne kadar tavsiye edilmese de Message Broker ile consumer uygulaması arasında doğrudan bir bağlantı olmadığı durumlarda izlenecek tek yoldur.

Örnek consumer projemizi oluşturup RabbitMQ.Client isimli Nuget paketini kuralım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

Bağlantımızı yaparak, bir channel oluşturuyoruz. Push modelini kullanan consumer kodlarına bakalım.

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
    var message = Encoding.UTF8.GetString(e.Body.ToArray());
    Console.WriteLine("Message received: {0}", message);
};

var consumerTag = channel.BasicConsume("queue.one", true, consumer);

Console.WriteLine("Subsribed. Press any key to unsubscribe.");
Console.ReadKey();

channel.BasicCancel(consumerTag);

Daha önceden aşina olduğumuz yapı, farklı olarak BasicCancel methodu yardımıyla subscribe olduğumuz queue’dan unsubscribe oluyoruz.

Pull modelini kullanan consumer kodlarına göz atalım.

while (true)
{
    var result = channel.BasicGet("queue.one", true);
    if (result != null)
    {
        var message = Encoding.UTF8.GetString(result.Body.ToArray());
        Console.WriteLine("Message received: {0}", message);
    }
    if (Console.KeyAvailable)
    {
        var keyInfo = Console.ReadKey();
        if (keyInfo.KeyChar == 'e' || keyInfo.KeyChar == 'E')
        {
            return;
        }
    }
    Thread.Sleep(5000);
}

5 saniye aralıklarla BasicGet methoduyla queue üzerinden tekil bir mesaj alınıyor, bir sonuç olmaması durumunda null dönüyor. Sonuç null değilse mesaj konsola basılıyor.

Work Queues

Work Queues, yoğun iş gücü gerektiren durumlarda görevlerin birden fazla worker arasında dağıtmasında kullanılır. Producer kuyruğa yeni bir görev ekler ve bu görevler worker uygulamaları arasında dağıtılır. Görevlerin dağıtılmasında pull veya push modeli kullanılabilir. Push modelinde Message Broker kuyruktaki mesajları çalışan worker’lara otomatik olarak dağıtır. Pull modelindeyse worker, yeni bir görev için hazır hale geldiğinde kuyruktan yeni bir mesaj alır. Work Queues aynı zamanda Competing Consumer Pattern olarak da anılır.

Round-robin

Task Queue kullanmanın avantajlarından bir tanesi görevlerin paralel şekilde worker’lara dağıtılmasıdır. Aynı zamanda sistemin yetersiz kaldığı durumlarda daha fazla worker dahil edilmesiyle ölçeklendirmenin de yapılabilmesidir. Message Broker default olarak her mesajı bir sonraki consumer’a gönderir, böylece her consumer aynı sayıda mesaj consume etmiş olur. Bu çoğu zaman güzel olsa da bazı senaryolarda sorunlara yol açacaktır. Bu soruna değinmeden önce bir kavrama göz atmamız gerekiyor.

Message Acknowledgment

Bir mesaj yönlendirildiğinde, yönlendirilen mesaj işlenmeden consumer ölürse bu mesajın kaybolmasına yol açacaktır. Çünkü gönderilen mesaj iletildiğinde silinmek üzere ayarlanmıştır. Bu senaryoda yalnızca mevcut mesaj değil, işlenmek üzere gönderilmiş diğer mesajların da kaybolmasına yol açar.

Bu durumun önüne geçmek için (bir mesajın asla kaybolmaması) mesaj bildirimlerini manuel olarak yönetmemiz gerekir. Bu işlem, autoAck property’si ile yapılır ve alınan mesajın başarıyla işlenmesi durumunda consumer’ın Message Broker’a bunu bildirmesiyle yapılır, böylece mesaj silinebilir. Bu senaryoda bir görev tamamlanamadan consumer’in ölmesi durumunda Message Broker bu durumu anlayarak mesajı yeniden kuyruğa atıp başka consumer’a -varsa- iletecektir. Bir mesaj alındıktan sonra default timeout 30 dakikadır, bu zaman değiştirilebilir. (Delivery Acknowledgement Timeout)

Round-robin‘e geri dönecek olursak, kuyrukta iki çeşit görevimiz olduğunu varsayalım, bunlardan bir tanesi uzun zaman alıyor, bir diğeri de çok kısa sürüyor. Message Broker ilgili mesajları sırayla dağıttığı için bir consumer üzerinde daha fazla iş yükü oluşacak ve bu kuyruktaki mesajların tüketilmesi bir diğer consumer’a göre daha uzun sürecek. Bu davranışı BasicQos methoduyla değiştirebiliriz. prefetchCount = 1 olarak ayarlanarak worker’a aynı anda birden fazla görev vermemesi istenir. İşte bu durumda da Message Ack‘leri devreye giriyor, bir işin tamamlandığını BasicAck methoduyla bildiriyoruz. Aynı zamanda reddedilmek üzere BasicNack ve BasicReject methodları da mevcut.

Console.Write("Enter the name for this worker: ");
var workerName = Console.ReadLine().Trim();

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.BasicQos(0, 1, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
    int.TryParse(Encoding.UTF8.GetString(e.Body.ToArray()), out int durationInSeconds);
    Console.Write("[{0}] task will be competed after {1} second(s). ", workerName, durationInSeconds);
    Thread.Sleep(durationInSeconds * 1000);

    channel.BasicAck(e.DeliveryTag, false);
    Console.WriteLine("Completed.");
};

channel.BasicConsume("queue.ack", false, consumer);

Console.WriteLine("Waiting for messages... Press any key to exit.");
Console.ReadKey();

channel.Close();
connection.Close();

Consumer uygulaması kodlarına yakından bakalım, senaryoya göre çalışan worker’a bir isim veriyor ve gelen mesaj -saniye cinsinden- kadar uygulamayı Thread.Sleep ile uyutuyoruz.

Bağlantı oluşturulup bir kanal açıldıktan sonra BasicQos ile aynı anda bir görev alacağımızı bildiriyoruz. BasicConsume methodunda autoAck‘i false olarak işaretleyip, acknowledgment‘i manuel yöneteceğimizi bildiriyoruz. Aynı şekilde mesaj Received olup, işlendikten sonra BasicAck ile başarılı sonuç dönüyoruz.

Publish & Subscribe

Bu pattern tüm abonelere aynı mesajı iletmek için kullanılır. Her subscriber için ayrı bir queue vardır, mesajlar bu queue’lardan her birine ulaştırılır, böylece her subscriber aynı mesajın kopyasını almış olur.

Publish – Subscribe pattern’i genellikle olay bildirimlerinde kullanılır. Örneğin; bir sipariş servisinde, sipariş oluşturuldukça oluşturulan mesaj dahilinde hem loglama hem de e-posta servisi kendi kuyruğundan bu sipariş ile ilgili işlemleri yapacaktır.

Publisher ve Subscriber projelerini oluşturarak, RabbitMQ.Client paketini kuralım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("ex.fanout", "fanout", false, true);

channel.QueueDeclare("queue.one", false, false, true);
channel.QueueDeclare("queue.two", false, false, true);

channel.QueueBind("queue.one", "ex.fanout", string.Empty);
channel.QueueBind("queue.two", "ex.fanout", string.Empty);

while (true)
{
    Console.Write("Enter message to be sent: ");
    var message = Console.ReadLine();

    if (message.ToLower() == "exit") break;

    channel.BasicPublish("ex.fanout", string.Empty, null, Encoding.UTF8.GetBytes(message));
}

channel.Close();
connection.Close();

Publisher uygulamasında iki adet queue oluşturarak bir fanout exchange’ine bind edildi ve sürekli mesaj gönderimi için döngü içerisine alındı.

Console.Write("Enter the queue name: ");
var queueName = Console.ReadLine();

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, args) =>
{
    var message = Encoding.UTF8.GetString(args.Body.ToArray());
    Console.WriteLine("Message received. [{0}]: {1}", queueName, message);
};

channel.BasicConsume(queueName, true, consumer);

Console.WriteLine("Subscribed to the queue '{0}'. Press any key to exit.", queueName);
Console.ReadKey();

channel.Close();
connection.Close();

Consumer uygulamasını iki terminal üzerinden çalıştıracağız, sırasıyla her biri için queue isimlerini (queue.one, queue.two) girerek subscribe olacağız. Publisher uygulamasından mesaj ürettikçe subscriber uygulamalarının mesaj kopyalarını aldığını göreceğiz.

Request & Reply

Bir messaging altyapısı kullanıldığında iletişim producer’dan consumer’a doğru tek yönlüdür, uygulama çift yönlü iletişim ile gönderdiği mesaja bir sonuç bekleyebilir. Request-Reply pattern bu durumda devreye girmektedir, sistemde biri request diğeri response olmak üzere en az iki adet queue tanımlanmalıdır. Bunun için RabbitMQ ile biri client diğeri RPC sunucu olan bir RPC sistemi oluşturacağız.

Replier projemizi oluşturup RabbitMQ.Client paketini kuralım.

private static int Fibonacci(int n)
{
    if (n == 0 || n == 1)
    {
        return n;
    }

    return Fibonacci(n - 1) + Fibonacci(n - 2);
}
var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare("rpc-queue", false, false, false);
channel.BasicQos(0, 1, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
    string response = null;

    var replyProps = channel.CreateBasicProperties();
    replyProps.CorrelationId = e.BasicProperties.CorrelationId;

    try
    {
        var message = Encoding.UTF8.GetString(e.Body.ToArray());
        int.TryParse(message, out int n);
        Console.WriteLine("[.] Fibonacci({0})", n);
        response = Fibonacci(n).ToString();
    }
    catch (Exception ex)
    {
        Console.WriteLine("[.] {0}", ex.Message);
        response = string.Empty;
    }
    finally
    {
        channel.BasicPublish(string.Empty, e.BasicProperties.ReplyTo, replyProps, Encoding.UTF8.GetBytes(response));
        channel.BasicAck(e.DeliveryTag, false);
    }
};
channel.BasicConsume("rpc-queue", false, consumer);

Console.WriteLine("Waiting for RPC requests... Press [enter] to exit.");
Console.ReadKey();

Requester projesini oluşturup, gerekli paketi kuralım.

public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> responseQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

    public RpcClient()
    {
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            VirtualHost = "/",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        props.CorrelationId = Guid.NewGuid().ToString();
        props.ReplyTo = replyQueueName;

        consumer.Received += (s, e) =>
        {
            var response = Encoding.UTF8.GetString(e.Body.ToArray());
            if (e.BasicProperties.CorrelationId == props.CorrelationId)
            {
                responseQueue.Add(response);
            }
        };

        channel.BasicConsume(replyQueueName, true, consumer);
    }

    public string Call(string message)
    {
        channel.BasicPublish(string.Empty, "rpc-queue", false, props, Encoding.UTF8.GetBytes(message));
        return responseQueue.Take();
    }

    public void Close()
    {
        connection.Close();
    }
}
var rpcClient = new RpcClient();

Console.WriteLine("[x] Requesting Fibonacci(30)");
var response = rpcClient.Call("30");

Console.WriteLine("[.] Response: {0}", response);
rpcClient.Close();

Requester projesi ayağa kalktığında anonim bir callback queue oluşturuyor. Bir mesaj rpc-queue‘ye gönderilirken mesaj yanında iki özellik daha ekleniyor. Bunlar ReplyTo (callback queue için) ve CorelationId (her request için bir guid). Replier projesi bu queue üzerinden bir istek geldiğinde gerekli işlemleri yaparak ReplyTo property’sindeki queue’ya göre sonucu Requester’a döner. Requester callback queue üzerinden gelen mesajları CorelationId‘ye bakarak karşılaştırmaktadır.

Channel & Queue & Message Priorities

Kuyruğa gönderilen her mesaj aynı aciliyet veya önem durumuna sahip olmayabilir. Bazı mesajların daha önce işlenmesi gerekirken bazılarının ise sonra işlenmesinde bir sorun olmayabilir. Örneğin; bir e-ticaret platformunda siparişlerin işlenmesiyle, kampanya bildirimleri aynı öneme sahip değildir.

Bir channel ve queue için öncelik x-max-priority keyi ile belirlenir. 0-255 arasında değer alabilir. 0, bir öncelik olmayacağını belirtirken 1 ila 10 arasındaki değerler önerilir.

Bir mesajın önceliği IBasicProperties.Priority üzerinden belirtilir, mesajların düzgünce sıralanabilmesi için kuyrukta bir süre kalması gerekmektedir.

Producer ve Consumer projelerini oluşturup gerekli paketleri kurup kodumuzu yazalım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare("ex.fanout", "fanout", false, true);

channel.QueueDeclare("queue.priorities", false, false, true, new Dictionary<string, object>
{
    {"x-max-priority", 2}
});

channel.QueueBind("queue.priorities", "ex.fanout", string.Empty);

SendMessageWithPriority(channel, "This a test message.", 1);
SendMessageWithPriority(channel, "This a test message.", 1);
SendMessageWithPriority(channel, "This a test message.", 1);

SendMessageWithPriority(channel, "This a test message.", 2);
SendMessageWithPriority(channel, "This a test message.", 2);

Console.Write("Press any key to exit.");
Console.ReadKey();

channel.QueueDelete("queue.priorities");
channel.ExchangeDelete("ex.fanout");

channel.Close();
connection.Close();
private static void SendMessageWithPriority(IModel channel, string message, byte priority)
{
    var basicProperties = channel.CreateBasicProperties();
    basicProperties.Priority = priority;
    channel.BasicPublish("ex.fanout", string.Empty, basicProperties, Encoding.UTF8.GetBytes(message));
    Console.WriteLine("Message '{0}' sent with {1} priority.", message, priority);
}

Consumer projesini yazalım.

var factory = new ConnectionFactory()
{
    HostName = "localhost",
    VirtualHost = "/",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.BasicQos(0, 1, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
    var message = Encoding.UTF8.GetString(e.Body.ToArray());
    Console.WriteLine("Message received: {0}", message);
    Thread.Sleep(1000);
    channel.BasicAck(e.DeliveryTag, false);
};

var consumerTag = channel.BasicConsume("queue.priorities", false, consumer);

Console.WriteLine("Subscribed to the queue. Press any key to exit.");
Console.ReadKey();

channel.BasicCancel(consumerTag);

channel.Close();
connection.Close();

Kaynak dosyasına buradan ulaşabilirsiniz.

You may also like...

4 Responses

  1. Dev dedi ki:

    Merhaba,
    Makale için teşekkür ederim. Güzel best practicelerden bahsetmişsiniz. Benim için faydalı oldu

  2. Bülent Özer dedi ki:

    Çok güzel içerikler, paylaştığınız için teşekkürler.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir