MassTransit ile RabbitMQ Messaging

MassTransit RabbitMQ Messaging

Enterprise seviyede, çeşitli platformlarda dağıtık şekilde çalışan servisler birbiriyle iletişim halindedirler. Genellikle bu iletişim gevşek ve asenkron olarak bir message broker aracılığıyla sağlanır. Bu sayede iş parçacıklarından koparılan bağlılıklar sonucu istenilen flexibility ve scalability elde edilmektedir. Messaging yapıları kullanılırken exception handling, retry policy, rate limit, deadlock, transaction gibi yönetimi zor konularla karşı karşıya kalmaktayız. ESB (Enterprise Service Bus) kavramı tam olarak bu noktada karşımıza çıkmaktadır. Bu gibi konuları bir soyutlamayla arkaplanda bizim yerimize yaparak yönetimi kolaylamaktadır.

Daha ileriye gitmeden RabbitMQ konusunda bilgi edinmek için RabbitMQ Nedir yazıma göz atabilirsiniz.

ESB (Enterprise Service Bus) Nedir?

Dağıtık sistemler için temel hizmet ve altyapı fonksiyonlarını barındıran ve sistemin reliable olmasını hedefleyen bir mimaridir. Sistemlerin birbiriyle konuşabilmesini sağlar. Client API için, message broker ve uygulama arasına girerek transport işlemlerini higher abstraction ile çözmeyi hedefler.

.NET için şu anda genel olarak kullanılan EasyNetQ, NServiceBus ve MassTransit ESB’leri mevcuttur.

MassTransit Nedir?

.NET için Chris Patterson tarafından open-source olarak geliştirilmiştir. Lightweight bir message bus’tur. Distributed ortam problemlerini çözerek message-based communication sağlar. RabbitMQ, Amazon SQS, Azure Service Bus ve ActiveMQ gibi messaging yapılarını destekler.

İletilecek mesajlar class veya interface tipinde olmalıdır. Bu mesajlar property’leri read-only olmalı ve bünyesinde method barındırmamalıdır. Ben önerildiği üzere interface kullanıyor olacağım. Aynı şekilde bu mesajların, Producer ve Consumer için aynı namespace altında tanımlanmış olması gerekir. Çünkü arkaplanda tanımlanacak olan Exchange bu namespace ile isimlendirilecektir.

MassTransit ortamında Publish ve Send olmak üzere iki adet message type vardır. Bir Event yayınlayacağımız zaman Publish, bir Command göndereceğimiz zaman ise Send kullanıyoruz.

MassTransit varsayılan olarak Fanout Exchange kullanır.

MassTransit Publishing an Event

masstransit publish event

Bir Event yayınlanacağı zaman kullanılır. Publish-Subscribe Messaging Pattern‘ini kullanır. Mesaj yayınlandığında abonelerin tamamı mesajı alacaktır. Adından da anlaşılacağı üzere sistemde bir şeylerin olduğu anlamına gelir ve isimlendirme yapılırken geçmiş zaman kullanılması önerilir. (AccountUpdated, UserConfirmed, OrderSubmitted)

MassTransit Sending a Command

masstransit send command

Bir Command gönderileceği zaman kullanılır. Sistem üzerinde bir şeylerin yapılması gerektiğini bildirir. Bu tip üzerinde emir kipi kullanılması önerilir. (UpdateUser, ConfirmUser, SubmitOrder) Ayrıca bir Command gönderileceği zaman bir Endpoint belirtilerek mesajın hangi queue’ya aktarılacağının belirtilmesi gerekmektedir.

Vakit kaybetmeden boş bir solution oluşturarak projemize başlayalım. Senaryoya göre bir API üzerinden gönderilen bir sipariş bir servis tarafından alınarak consume edilecek ve event fırlatılarak kullanıcıya Notification servisinden bir e-posta gönderilecek ve Billing servisinden fatura oluşturulacak.

Oluşturduğumuz solution’a MessageContracts isminde bir class library oluşturalım MassTransit.RabbitMQ isimli Nuget paketini kuralım. Bu proje içerisinde bus configuration işlemleri ve mesajlarımızı tutacağız.

RabbitMQ credential bilgilerini tutmak için RabbitMQConstants sınıfımızı oluşturalım.

public static class RabbitMQConstants
{
    public const string Uri = "amqp://localhost";
    public const string Username = "guest";
    public const string Password = "guest";

    public const string OrderServiceQueueName = "order.service";
    public const string NotificationServiceQueueName = "notification.service";
    public const string BillingServiceQueueName = "billing.service";
}

ESB configuration işlemlerini gerçekleştirecek BusConfigurator sınıfımızı oluşturalım.

public static IBusControl ConfigureBus(Action<IRabbitMqBusFactoryConfigurator> registrationAction = null)
{
    return Bus.Factory.CreateUsingRabbitMq(configuration =>
    {
        configuration.Host(RabbitMQConstants.Uri, hostConfiguration =>
        {
            hostConfiguration.Username(RabbitMQConstants.Username);
            hostConfiguration.Password(RabbitMQConstants.Password);
        });

        registrationAction?.Invoke(configuration);
    });
}

Bu method bize IBusControl tipinde bir bus dönecek ve tüm bağlantı işlemlerini bu sınıf üzerinden gerçekleştireceğiz. Ayrıca IRabbitMqBusFactoryConfigurator tipindeki registrationAction parametresi üzerinden bir command send edileceği zaman queue name ve endpoint bilgileri; consumer uygulamalarının gelen mesajları consume edebilmeleri için gerekli ayarlamalar yapılacaktır.

Producer projesi için gerekli olacak ISubmitOrderCommand arayüzünü Commands klasörü altına oluşturalım.

public interface ISubmitOrderCommand
{
    int OrderId { get; }
    string OrderCode { get; }
}

Siparişleri produce edecek olan Api projemizi oluşturalım. MassTransit.RabbitMQ paketini kurup MessageContracts projesini referans olarak ekleyelim. Daha sonra Controller üzerinden alacağımız order submit modelini Models klasörü altına ISubmitOrderCommand arayüzünü implemente ederek oluşturalım.

public class SubmitOrder : ISubmitOrderCommand
{
    public int OrderId { get; set; }

    public string OrderCode { get; set; }
}
public class OrderController : ControllerBase
{
    private readonly ISendEndpoint _bus;

    public OrderController()
    {
        var bus = BusConfigurator.ConfigureBus();
        var sendToUri = new Uri($"{RabbitMQConstants.Uri}/{RabbitMQConstants.OrderServiceQueueName}");
        _bus = bus.GetSendEndpoint(sendToUri).Result;
    }

    [HttpPost]
    public async Task<IActionResult> Post(SubmitOrder submitOrder)
    {
        await _bus.Send<ISubmitOrderCommand>(submitOrder);

        return Ok();
    }
}

Burada constructor içerisinde göndereceğimiz Command’ın endpointini elde etmek bus initialization işlemini gerçekleştiriyoruz. İlerleyen satırlarda GetSendEndpoint methodu aracılığıyla bir endpoint belirliyoruz. İşlemlere elde ettiğimiz ISendEndpoint tipindeki nesne üzerinden devam edeceğiz. Action içerisindeyse bu arayüz üzerinden send işlemini ISubmitOrderCommand tipiyle gerçekleştirmekteyiz.

Endpoint aracılığıyla gönderilecek mesajı consume edecek OrderService console projemizi oluşturalım ve MassTransit.RabbitMQ paketini kurarak MessageContracts projesini referans olarak ekleyelim.

var bus = BusConfigurator.ConfigureBus(configuration =>
{
    configuration.ReceiveEndpoint(RabbitMQConstants.OrderServiceQueueName, e =>
    {
        // Variations
        // e.Consumer(() => new SubmitOrderCommandConsumer());
        // e.Consumer(typeof(SubmitOrderCommandConsumer), type => Activator.CreateInstance(type));
        e.Consumer<SubmitOrderCommandConsumer>();
    });
});

await bus.StartAsync();

Console.WriteLine("Listening order commands... Press any key to exit.");
Console.ReadKey();

await bus.StopAsync();

Program.cs içerisinde bus için gerekli initialization işlemini sağlıyoruz. Burada farklı olarak consume edeceğimiz endpointi ReceiveEndpoint methoduyla belirtiyor ve SubmitOrderCommandConsumer ile mesajları consume edeceğimizi bildiriyoruz. Consumer sınıfını aşağıdaki gibi kodlayalım.

public class SubmitOrderCommandConsumer : IConsumer<ISubmitOrderCommand>
{
    public Task Consume(ConsumeContext<ISubmitOrderCommand> context)
    {
        Console.WriteLine("");

        return Task.CompletedTask;
    }
}

Consumer sınıfı IConsumer<TMessage> arayüzünü ilgili command conract’ı ile implemente etmektedir. Consume methoduyla consume işlemini gerçekleştirerek mesajı işlemektedir.

Produce edilen siparişi consume edip işledikten sonra artık kullanıcıya NotificationService ile e-posta göndermeli ve BillingService ile fatura oluşturmalıyız. Birden fazla uygulama kendine ait queue’larıyla bu mesajları işleyeceğinden dolayı burada Event (Publish/Subscribe) kullanmak daha doğru olacaktır. Bu eventi OrderService içerisinden fırlatacağız.

Publish için gerekli olacak IOrderSubmittedEvent arayüzünü MessageContracts projesi içerisinde Events klasörü altına oluşturalım.

public interface IOrderSubmittedEvent
{
    int OrderId { get; }
    string OrderCode { get; }
    bool Success { get; }
}

SubmitOrderCommandConsumer isimli consumer sınıfı altında mesaj işlendikten sonra eventimizi fırlatalım.

public class SubmitOrderCommandConsumer : IConsumer<ISubmitOrderCommand>
{
    public async Task Consume(ConsumeContext<ISubmitOrderCommand> context)
    {
        var message = context.Message;
        await Console.Out.WriteLineAsync($"Order with Id: {message.OrderId} and Code: {message.OrderCode} has beed submitted.");

        await context.Publish<IOrderSubmittedEvent>(new
        {
            message.OrderId,
            message.OrderCode,
            Success = true
        });
    }
}

Eventimizi context nesnesi üzerinden IOrderSubmittedEvent tipinde puslish ediyoruz. Şimdi Notification ve Billing servislerimizi oluşturup MassTransit.RabbitMQ paketini kurarak MessagingContracts projesini referans olarak ekleyelim.

Servislerimize geçmeden önce ilgili consumer sınıflarını aşağıdaki gibi oluşturalım.

public class SubmittedOrderNotificationEventConsumer : IConsumer<IOrderSubmittedEvent>
{
    public async Task Consume(ConsumeContext<IOrderSubmittedEvent> context)
    {
        var message = context.Message;
        await Console.Out.WriteLineAsync($"A bill has been created for the order Id with: {message.OrderId}");
    }
}
public class SubmittedOrderBillEventConsumer : IConsumer<IOrderSubmittedEvent>
{

    public async Task Consume(ConsumeContext<IOrderSubmittedEvent> context)
    {
        var message = context.Message;
        await Console.Out.WriteLineAsync($"A bill has been created for the order Id with: {message.OrderId}");
    }
}

Consumer sınıflarımızı hazırladıktan sonra queue’daki mesajları consume etmeye başlayabiliriz.

var bus = BusConfigurator.ConfigureBus(configuration =>
{
    configuration.ReceiveEndpoint(RabbitMQConstants.NotificationServiceQueueName, e =>
    {
        e.Consumer<SubmittedOrderNotificationEventConsumer>();
    });
});

await bus.StartAsync();

Console.WriteLine("Listening order commands... Press any key to exit.");
Console.ReadKey();

await bus.StopAsync();
var bus = BusConfigurator.ConfigureBus(configuration =>
{
    configuration.ReceiveEndpoint(RabbitMQConstants.BillingServiceQueueName, e =>
    {
        e.Consumer<SubmittedOrderBillEventConsumer>();
    });
});

await bus.StartAsync();

Console.WriteLine("Listening order commands... Press any key to exit.");
Console.ReadKey();

await bus.StopAsync();

Uygulamaları sırasıyla ayağa kaldırdıktan sonra Api üzerinden gönderdiğimiz isteğin sonucu produce edilen mesajın öncelikle OrderService tarafından consume edildiğini göreceksiniz. Burada mesajın işlenmesiyle Publish edilen mesajın NotificationService ve BillService tarafından consume edildiğini göreceksiniz.

Ayrıca RabbitMQ Management UI’ya girecek olursanız bizim adımıza exchange’lerin, queue’ların oluşturulduğunu ve bunların belirttiğimiz şekilde birbirine bind edildiğini göreceksiniz.

MassTransit Error Handling

Message Retry

Uygulama ya da servislerin durması, bağlantıların kopması, bir exception ile karşılaşılması gibi durumlarda MassTransit ilgili mesajları [queue-adı].error isminde oluşturduğu kuyruğa taşır. Sistemdeki hata çözüldükten sonra mesajların tekrar işlenmesini talep edebiliriz.

var bus = BusConfigurator.ConfigureBus(configuration =>
{
    configuration.ReceiveEndpoint(RabbitMQConstants.OrderServiceQueueName, e =>
    {
        // Variations
        // e.Consumer(() => new SubmitOrderCommandConsumer());
        // e.Consumer(typeof(SubmitOrderCommandConsumer), type => Activator.CreateInstance(type));
        e.Consumer<SubmitOrderCommandConsumer>();
        e.UseRetry(r => r.Immediate(5));
    });
});

Yukarıda görüldüğü üzere sistemde bir hata oluştuğunda error queue’sine aktarılmadan önce UseRetry methodunun Immediate policy’siyle 5 kez ard arda denemede bulunmasını talep ediyoruz. Ayrıca Immediate politikasının yanında None, Intervals, Immediate, Incremental, Exponential policy’leri de mevcuttur.

Ayrıca işlem sırasında bir hata alınması durumunda retry filter kullanarak ignore edebiliriz.

e.UseRetry(r => r.Ignore(typeof(ArgumentNullException), typeof(DivideByZeroException)));

Rate Limits

Belirli bir süre içerisinde gönderilecek mesaj sayısını UseRateLimit methoduyla belirleyebiliriz.

e.UseRateLimit(1000, TimeSpan.FromSeconds(5));

Yukarıda görüldüğü üzere 5 saniye içerisinde 1000 adet mesaj işlenebileceğini belirttik.

Circuit Breaker

Sistemde oluşabilecek aşırı yoğunluk, deadlock gibi durumlarda oluşacak kesintilere karşı kullanırız.

e.UseCircuitBreaker(cbConfiguration =>
{
    cbConfiguration.TripThreshold = 25;
    cbConfiguration.ActiveThreshold = 5;
    cbConfiguration.TrackingPeriod = TimeSpan.FromMinutes(5);
    cbConfiguration.ResetInterval = TimeSpan.FromMinutes(10);
});

Yukarıda görüldüğü üzere sistemde bir hata, kesinti olması durumunda alınan taleplerden yüzde 25’inin hatalı olması ya da 5 tanesinin hatayla karşılaşması durumunda sistem 10 dakika bekleyip 5 dakika takipte kalacak. Tekrar hata alınması durumunda TripTreshold ve ActiveTreshold gözardı edilerek belirtilen süre kadar bekleyip takipte kalacaktır.

Yazı boyunca görüleceği üzere ESB’lerin bize sağladığı imkanları göz ardı edemeyiz, Message Broker’larının zahmetli yönetimsel süreçlerini üstlenerek bize sadece gerekli konfigürasyonları yapmak kalıyor.

İlgili projeye buradan ulaşabilirsiniz.

4 Responses

  1. Enes dedi ki:

    Mükemmel bi açıklama teşekkür ederim

  2. Husoka dedi ki:

    Güzel açıklama, örnek de gerçek hayat projelerinde kullanılabilecek bir örnek olmuş. Elinize sağlık.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir