Orchestration-Based Saga Implementation

Saga Pattern Distributed Transactions

Saga Pattern dağıtık ortamda distributed transcation yönetilirken veri tutarlılığını hedefler. Implemente edilirken Choreography-Based Saga ve Orchestration-Based Saga olmak üzere iki yaklaşım vardır. Saga Pattern Nedir yazısında bu konu teorik olarak incelendi. Bu yazıda Orchestration-Based Saga Implementation konusu üzerine pratik yapacağız.

MassTransit, üzerine instance storage, event correlation gibi özellikler ekleyerek, bir State Machine kütüphanesi olan Automatonymous‘u kullanır. Bu bize state ve event’lar dahil olmak üzere bir state machine tanımlamamızı sağlıyor. Senaryo gereği yapacağımız örneğe geçmeden önce bir kaç terim hakkında bilgi edinmeliyiz.

State Machine

State, event ve davranışları tanımlayan merkezi birimdir. Oluşturulacak state machine MassTransitStateMachine<TInstance> sınıfını derive etmelidir. Bir state machine bir kere oluşturulur ve instance’larına tetiklenen event doğrultusunda ilgili davranışı uygular.

public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
{
    // Events, states, behaviors
}

State Instance

State Machine için gerekli verileri tutan sınıftır. Her consume edilen Initial event doğrultusunda yeni bir instance oluşturulur. (Aynı CorrelationId’ye sahip instance bulunamazsa) Oluşturulacak State Instance SagaStateMachineInstance arayüzünü implemente etmelidir.

public class OrderStateInstance : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
}

Gelen isteklerin birbirinden ayrılarak correlate edilebilmesi için CorrelationId kullanılır. Burada, o anki state CurrentState property’sinde tutulacaktır. Bu state bilgisi State, string veya int tipinde olabilir.

public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
{
    public OrderStateMachine()
    {
        InstanceState(instance => instance.CurrentState);
    }
}

InstanceState methoduyla mevcut state’in hangi property’de tutulacağı belirtilir. Tip olarak State kullanılırsa otomatik olarak yapılandırılacağından dolayı bu method’un çağırılmasına gerek yoktur.

public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }

    public OrderStateMachine()
    {
        InstanceState(instance => instance.CurrentState);

        Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
    }

    public Event<OrderAccepted> OrderAccepted { get; private set; }
}

Yukarıda Accepted ve Submitted olmak üzere iki state tanımlanmıştır. Bir instance aynı zamanda bir state’e sahip olabilir. Her yeni instance varsayılan olarak Initial durumundadır. Ayrıca Final durumu da her state machine için tanımlıdır, instance’ın son duruma geldiğini belirtir.

Ayrıca OrderAccepted tipinde bir event tanımlandığını görüyoruz. Bu event’in bir instance ile correlate edilebilmesi için tekil olan OrderId üzerinden ilişkilendirileceği belirtiliyor. Aynı şekilde bu expression bildirimini kullanmak yerine oluşturulan event CorrelatedBy<TKey> arayüzünü implemente edebilir.

public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
{
    public OrderStateMachine()
    {

        Event(() => OrderAccepted, x => x
                .CorrelateBy((instance, context) => instance.OrderId == context.Message.OrderId)
                .SelectId(x => NewId.NextGuid());
    }

    public Event<OrderAccepted> OrderAccepted { get; private set; }
}

Eğer burada event unique bir değer aracılığıyla instance ile correlate olamıyorsa SelectId methodu kullanılmalıdır. NewId Nuget paketiyle generate edilen unique değer instance CorrelationId‘sine atanıyor.

public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }
    public State Completed { get; private set; }

    public OrderStateMachine()
    {
        InstanceState(instance => instance.CurrentState);

        Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));

        Initially(
            When(OrderSubmitted)
                .Then(x => x.Instance.CreatedOn = x.Data.CreatedOn)
                .Publish(context => new OrderAcceptedEvent(context.Instance.CorrelationId))
                .TransitionTo(Submitted));

        During(Submitted,
            When(OrderAccepted)
                .Send(ServiceAddress, context => new UpdateOrderCommand(context.Instance.CorrelationId))
                .TransitionTo(Accepted));

        DuringAny(When(OrderCompleted)
                .Finalize());

        SetCompletedWhenFinalized();
    }

    public Event<OrderAccepted> OrderAccepted { get; private set; }
    public Event<OrderSubmitted> OrderSubmitted { get; private set; }
    public Event<OrderCompleted> OrderCompleted { get; private set; }
}

Initially methoduyla state Initial halindeyken OrderSubmitted event davranışı belirleniyor. Bir OrderSubmitted event’i consume edildiğinde ve OrderId ile eşleşen CorrelationId‘ye sahip bir instance bulunamadığında Initial state içerisinde yeni bir instance oluşturulur.

Then methoduyla gelen mesaj üzerindeki property instance üzerindeki property’e set ediliyor. Gelen mesaj üzerindeki property’lere Data, instance property’lerine Instance üzerinden erişiyoruz.

Publish methoduyla OrderAcceptedEvent tipinde bir event yayınlanıyor. TransitionTo methoduyla instance Submitted state’ine geçiriliyor.

Sonrasındaysa During methoduyla instance Submitted halindeyken OrderAccepted eventi consume edildiğinde Send methoduyla UpdateOrderCommand tipinde bir komut gönderiliyor ve instance Accepted durumuna geçiriliyor.

DuringAny methoduyla herhangi bir state’teyken OrderCompleted eventi instance tarafından consume edildiğinde Finalize methoduyla sonlandırılıyor. Bir instance sonlandığında Final durumuna geçer ve varsayılan olarak Saga Repository‘sinden silinmez. SetCompletedWhenFinalized methoduyla Final durumlu state Saga Repository‘sinden kaldırılmak üzere tamamlandı olarak işaretleniyor.

Senaryo

Choreography-Based Saga Implementation yazısında olduğu gibi yine bir e-commerce sistemi üzerinden ilerleyeceğiz. Bu sistem sipariş, stok ve ödeme servislerinden oluşacaktır.

Saga Orchestration Implementation

Yukarıda bulunan diyagramı incelediğimizde;

  1. Order.Api bir istekte bulunarak OrderService‘e CreateOrderCommand tipinde bir komut gönderiyor. OrderService bu mesajı consume ederek PENDING durumunda bir sipariş oluşturuyor. Sonrasında Orchestrator‘a OrderSubmitted tipinde bir komut gönderiyor.
  2. Orchestrator stok işlemlerini gerçekleştirebilmek için StockService‘e ReserveStockCommand tipinde bir komut gönderiyor.
  3. Stok işlemlerinin başarılı olması durumunda StockReservedEvent publish ediliyor. Aksi durumdaysa StockNotReservedEvent publish ediliyor. Bu durumda OrderService‘e OrderFailedCommand tipinde bir komut gönderiliyor. Bu mesaj consume edilerek sipariş durumu REJECTED olarak güncelleniyor.
  4. Orchestrator ödeme işlemlerini gerçekleştirmek üzere PaymentService‘e RequestPaymentCommand tipinde bir komut gönderiyor.
  5. Ödeme işlemleri başarıyla gerçekleştiğinde PaymentConfirmedEvent publish ediliyor. Bu işlemden sonra OrderService‘e OrderCompletedCommand tipinde bir mesaj gönderilerek sipariş durumu CONFIRMED olarak güncelleniyor. Aksi durumdaysa PaymentRejectedEvent yayınlanıyor. Sonrasındaysa bu eventi StockService ve OrderService consume ederek compensable transaction işlemlerini gerçekleştiriyor.

Bu uygulamada bulunan tüm projeler MassTransit.AspNetCore ve MassTransit.RabbitMQ Nuget paketlerini kullanmaktadır. MassTransit konfigürasyonları ve mesaj tanımlamalarına yazının uzamaması adına yer verilmemiştir, proje yazının sonunda paylaşılmıştır.

RabbitMQ ile ilgili bilgileri tutmak için Shared projesine aşağıdaki sınıfı ekliyoruz.

public static class RabbitMQConstants
{
    public const string Uri = "amqp://localhost";

    public const string SagaQueue = "order.saga";
    public const string CreateOrderCommandQueue = "order.create";

    public const string ReserveStockCommandQueue = "stock.order.received";
    public const string RequestPaymentCommandQueue = "payment.request";
    public const string OrderCompletedCommandQueue = "order.completed";
    public const string OrderFailedCommandQueue = "order.failed";
    public const string CompensateStockCommandQueue = "stock.compensate";
}

Order Api

Senaryomuza göre ilk tetiklenecek kısımdır. Client tarafından tetiklenerek OrderService’e sipariş isteğini iletecektir.

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
    private readonly ISendEndpoint _sendEndpoint;

    public OrderController(ISendEndpointProvider sendEndpointProvider)
    {
        _sendEndpoint = sendEndpointProvider.GetSendEndpoint(new($"queue:{RabbitMQConstants.CreateOrderCommandQueue}")).Result;
    }

    [HttpPost]
    public async Task Post(CreateOrder createOrder)
    {
        await _sendEndpoint.Send<ICreateOrderCommand>(new
        {
            UserId = createOrder.UserId,
            Items = createOrder.Items
        });
    }
}

Görüldüğü üzere gelen sipariş bilgisi doğrudan OrderService‘e ICreateOrderCommand komutuyla oluşturulmak üzere gönderiliyor.

OrderService

Api aracılığıyla gelen istekler doğrultusunda daha sonra işlenmek üzere Pending durumunda bir sipariş oluşturur. Sonrasında IOrderSubmitted tipinde bir event oluşturur.

public class CreateOrderCommandConsumer : IConsumer<ICreateOrderCommand>
{
    private readonly ISendEndpoint _sendEndpoint;

    public CreateOrderCommandConsumer(ISendEndpointProvider sendEndpointProvider)
    {
        _sendEndpoint = sendEndpointProvider.GetSendEndpoint(new($"queue:{RabbitMQConstants.SagaQueue}")).Result;
    }

    public async Task Consume(ConsumeContext<ICreateOrderCommand> context)
    {
        var message = context.Message;

        // Creating order with Pending status...

        await _sendEndpoint.Send<IOrderSubmittedEvent>(new
        {
            OrderId = new Random().Next(),
            UserId = message.UserId,
            Items = message.Items
        });
    }
}

Sipariş oluşturulduktan sonra artık Orchestrator‘e IOrderSubmittedEvent aracılığıyla mesaj gönderiliyor.

SagaService

Öncelikle State Machine tarafından kullanılacak Instance sınıfımızı oluşturalım.

public class OrderStateInstance : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public int OrderId { get; set; }
    public int UserId { get; set; }
    public decimal TotalPrice { get; set; }
    public DateTime CreatedOn { get; set; }
}

Instance’ların correlate edileceği CorrelationId ve anlık durum bilgisinin tutulacağı CurrentState yanında state’i tutulacak sipariş ile ilgili bilgilere yer veriliyor.

Sonrasında State Machine tanımlamasını yapalım, yukarıda ön bilgi verildiğinden dolayı tamamı paylaşılacaktır.

public class OrderStateMachine : MassTransitStateMachine<OrderStateInstance>
{
    public State OrderSubmitted { get; set; }
    public State StockReserved { get; set; }
    public State StockNotReserved { get; set; }
    public State PaymentConfirmed { get; set; }
    public State PaymentRejected { get; set; }

    public OrderStateMachine()
    {
        InstanceState(instance => instance.CurrentState);

        Event(() => OrderSubmittedEvent, instance => instance
            .CorrelateBy<int>(state => state.OrderId, context => context.Message.OrderId)
            .SelectId(s => Guid.NewGuid()));

        Event(() => StockReservedEvent, instance => instance
                .CorrelateById(selector => selector.Message.CorrelationId));

        Event(() => StockNotReservedEvent, instance => instance
                .CorrelateById(selector => selector.Message.CorrelationId));

        Event(() => PaymentConfirmedEvent, instance => instance
                .CorrelateById(selector => selector.Message.CorrelationId));

        Event(() => PaymentRejectedEvent, instance => instance
                .CorrelateById(selector => selector.Message.CorrelationId));

        Initially(When(OrderSubmittedEvent)
            .Then(context =>
            {
                context.Instance.UserId = context.Data.UserId;
                context.Instance.OrderId = context.Data.OrderId;
                context.Instance.TotalPrice = context.Data.Items.Sum(s => s.Count * s.Price);
                context.Instance.CreatedOn = DateTime.Now;
            })
            .TransitionTo(OrderSubmitted)
            .Send(new Uri($"queue:{RabbitMQConstants.ReserveStockCommandQueue}"), context => new ReserveStockCommand(context.Instance.CorrelationId)
            {
                Items = context.Data.Items
            }));

        During(OrderSubmitted,
            When(StockReservedEvent)
            .TransitionTo(StockReserved)
            .Send(new Uri($"queue:{RabbitMQConstants.RequestPaymentCommandQueue}"), context => new RequestPaymentCommand(context.Instance.CorrelationId)
            {
                Items = context.Data.Items
            }),
            When(StockNotReservedEvent)
            .TransitionTo(StockNotReserved)
            .Send(new Uri($"queue:{RabbitMQConstants.OrderFailedCommandQueue}"), context => new OrderFailedCommand
            {
                OrderId = context.Instance.OrderId,
                Reason = context.Data.Reason
            }));

        During(StockReserved,
            When(PaymentConfirmedEvent)
            .TransitionTo(PaymentConfirmed)
            .Send(new Uri($"queue:{RabbitMQConstants.OrderCompletedCommandQueue}"), context => new OrderCompletedCommand
            {
                OrderId = context.Instance.OrderId
            })
            .Finalize(),
            When(PaymentRejectedEvent)
            .TransitionTo(PaymentRejected)
            .Send(new Uri($"queue:{RabbitMQConstants.OrderFailedCommandQueue}"), context => new OrderFailedCommand
            {
                OrderId = context.Instance.OrderId,
                Reason = context.Data.Reason
            })
            .Send(new Uri($"queue:{RabbitMQConstants.CompensateStockCommandQueue}"), context => new CompensateStockCommand
            {
                Items = context.Data.Items
            }));

        SetCompletedWhenFinalized();
    }

    public Event<IOrderSubmittedEvent> OrderSubmittedEvent { get; set; }
    public Event<IStockReservedEvent> StockReservedEvent { get; set; }
    public Event<IStockNotReservedEvent> StockNotReservedEvent { get; set; }
    public Event<IPaymentConfirmedEvent> PaymentConfirmedEvent { get; set; }
    public Event<IPaymentRejectedEvent> PaymentRejectedEvent { get; set; }
}

İlk olarak State Machine tarafından kullanılacak state’ler tanımlanıyor. Ardından InstanceState methoduyla durumun hangi property üzerinde tutulacağı belirtiliyor.

Sonrasındaysa Event methodlarıyla instance’ların OrderId üzerinden correlate edileceği belirtiliyor. Sıralaması önemli olarak ilk gelecek mesajın OrderSubmittedEvent olacağı belirtiliyor.

Initially methoduyla instance state’i Initial durumundayken hangi işlemlerin yapılacağı tanımlanıyor. Buna göre When methoduyla gelen event kontrolü yapılarak OrderSubmittedEvent olup olmadığına bakılıyor. Eşleşme sağlanıyorsa Then methoduyla gelen mesaj bilgileri Data propery’si üzerinden Instance property’sine atanıyor. Ardından TransitionTo methoduyla state OrderSubmitted durumuna geçiriliyor. Sonrasında Send methoduyla StockService‘ine stok işlemlerini gerçekleştirebilmesi için ReserveStockCommand mesajı gönderiliyor.

During methoduyla instance state’inin OrderSubmitted ve When methoduyla gelen event’in StockReservedEvent olması durumunda yapılacak işlemler belirtiliyor. Buna göre state StockReserved‘a geçiriliyor ve Send methoduyla PaymentService‘e ödeme işlemlerini gerçekleştirebilmesi için RequestPaymentCommand tipinde bir mesaj gönderiliyor. Stok işlemlerinin başarısız olduğu durumdaysa (StockNotReservedEvent) state StockNotReserved durumuna geçirilerek Send methoduyla OrderFailedCommand tipinde bir mesaj gönderiliyor.

Bir sonraki satırdaysa instance state’i StockReserved durumunda ve gelen event’in PaymentConfirmedEvent olduğu zaman state PaymentConfirmed durumuna geçiriliyor. Ardından Send methoduya OrderService‘e OrderCompletedCommand tipinde bir mesaj gönderiliyor. Eğer gelen event PaymentRejectedEvent ise state PaymentRejected durumuna geçiriliyor. Böylece Send methodlarıyla OrderService‘e OrderFailedCommand tipinde, StockService‘e CompensateStockCommand komutları gönderiliyor.

Finalize methoduyla ilgili state’i Final durumuna getiriyoruz. SetCompletedWhenFinalized methoduyla Final durumundaki instance’ların Repository’den temizlenmesini belirtiyoruz.

Program.cs dosyasında SagaStateMachine konfigürasyonlarını yapıyoruz.

builder.Services.AddMassTransit(configure =>
{
    configure.AddSagaStateMachine<OrderStateMachine, OrderStateInstance>()
        .InMemoryRepository();

    configure.AddBus(factory => Bus.Factory.CreateUsingRabbitMq(config =>
    {
        config.Host(RabbitMQConstants.Uri);
        config.ReceiveEndpoint(RabbitMQConstants.SagaQueue, e =>
        {
            e.ConfigureSaga<OrderStateInstance>(factory);
        });
    }));
});
builder.Services.AddMassTransitHostedService();

InMemoryRepository methoduna aşağıda Saga Persistence başlığında değinilmiştir.

StockService

Bir sipariş oluşturulduğunda sipariş ürün adetleri kadar stoğun güncelleneceği servistir.

public class ReserveStockCommandConsumer : IConsumer<IReserveStockCommand>
{
    private readonly IPublishEndpoint _publishEndpoint;

    public ReserveStockCommandConsumer(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }

    public async Task Consume(ConsumeContext<IReserveStockCommand> context)
    {
        var stockResult = true;
        var message = context.Message;

        if (stockResult)
        {
            await _publishEndpoint.Publish<IStockReservedEvent>(new
            {
                CorrelationId = message.CorrelationId,
                Items = message.Items
            });
        }
        else
        {
            await _publishEndpoint.Publish<IStockNotReservedEvent>(new
            {
                CorrelationId = message.CorrelationId,
                Reason = "Insufficent stock!"
            });
        }
    }
}

Stok kontrolü bir değişken aracılığıyla simüle ediliyor. Yeterli stok olması durumunda IStockReservedEvent tipinde, aksi durumda IStockNotReservedEvent tipinde bir event publish ediliyor.

PaymentService

Stoğun başarıyla işlemlerini gerçekleştirmesi sonrasında ödeme işlemlerinin yapılacağı servistir.

public class RequestPaymentCommandConsumer : IConsumer<IRequestPaymentCommand>
{
    private readonly IPublishEndpoint _publishEndpoint;

    public RequestPaymentCommandConsumer(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }

    public async Task Consume(ConsumeContext<IRequestPaymentCommand> context)
    {
        var message = context.Message;
        var paymentResult = true;

        if (paymentResult)
        {
            await _publishEndpoint.Publish<IPaymentConfirmedEvent>(new
            {
                CorrelationId = message.CorrelationId
            });
        }
        else
        {
            await _publishEndpoint.Publish<IPaymentRejectedEvent>(new
            {
                CorrelationId = message.CorrelationId,
                Items = message.Items,
                Reason = "Payment rejected!"
            });
        }
    }
}

Ödeme işlemleri bir değişken aracılığıyla simüle ediliyor. Başarılı olduğu durumda IPaymentConfirmedEvent, aksi durumda IPaymentRejectedEvent tipinde bir event publish ediliyor.

Ödemenin başarısız olduğu durumda StockService, Orchestrator üzerinden compensable transaction gerçekleştirerek eski haline döndürmelidir.

public class PaymentRejectedEventConsumer : IConsumer<IPaymentRejectedEvent>
{
    public Task Consume(ConsumeContext<IPaymentRejectedEvent> context)
    {
        var message = context.Message;

        // Payment rejected, rollback stock.

        return Task.CompletedTask;
    }
}

Aynı şekilde OrderService‘te ödemenin başarısız olduğu durumda sipariş durumunu güncellemelidir. Ayrıca bu consumer StockService‘inden IStockNotReservedEvent tipinde bir event publish edildiğinde de kullanılmaktadır.

public class OrderFailedCommandConsumer : IConsumer<IOrderFailedCommand>
{
    public Task Consume(ConsumeContext<IOrderFailedCommand> context)
    {
        var message = context.Message;

        // Order failed, update order status to Rejected...

        return Task.CompletedTask;
    }
}

Ödeme başarılıysa OrderService sipariş durumunu güncellemelidir.

public class OrderCompletedCommandConsumer : IConsumer<IOrderCompletedCommand>
{
    public Task Consume(ConsumeContext<IOrderCompletedCommand> context)
    {
        var message = context.Message;

        // Order completed, update order status to Confirmed...

        return Task.CompletedTask;
    }
}

Bonus: Saga Persistence

Saga, durum bilgisine sahip event-based message consumer’ları olduğundan event’ler arası state bilgilerini korumak önem arz etmektedir. Persistent bir state olmadan Saga her eventi yeni bir event olarak algılayacak ve sonrasında gelecek eventler birbirine bağlı olmayacaktır. Bu nedenle state’i depolamalıyız. MassTransit bize state bilgisini depolamak için farklı yollar sunar. InMemory, EF Core, MongoDb, Redis ve daha fazlası… Biz Entity Framework Core kullanacağız.

Bunun için öncelikle MassTransit.EntityFrameworkCore Nuget paketini yükleyelim ve DbContext sınıfımızı oluşturalım.

public class OrderStateDbContext : SagaDbContext
{
    public OrderStateDbContext(DbContextOptions<OrderStateDbContext> options) : base(options) { }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get
        {
            yield return new OrderStateMap();
        }
    }
}

DbContext sınıfımız SagaDbContext abstract sınıfından türemelidir.

public class OrderStateMap : SagaClassMap<OrderStateInstance>
{
    protected override void Configure(EntityTypeBuilder<OrderStateInstance> entity, ModelBuilder model)
    {
        entity.Property(p => p.UserId).IsRequired();
        entity.Property(p => p.OrderId).IsRequired();
    }
}

SagaClassMap<TSaga> abstract sınıfıyla mapping işlemlerini gerçekleştiriyoruz ve SagaDbContext sınıfının Configurations property’sini override ederek uyguluyoruz.

Program.cs içerisinde gerekli konfigürasyonları yapalım.

builder.Services.AddMassTransit(configure =>
{
    configure.AddSagaStateMachine<OrderStateMachine, OrderStateInstance>()
        .EntityFrameworkRepository(config =>
        {
            config.AddDbContext<DbContext, OrderStateDbContext>((p, b) =>
            {
                b.UseSqlServer(builder.Configuration.GetConnectionString("OrderStateDb"));
            });
        });
});

Görüldüğü üzere mikroservisler arasında distributed transaction veri tutarlılığını Orchestration Pattern uygulanarak sağladık. Senaryoda geliştirilmiş olan projeye buradan ulaşabilirsiniz.

You may also like...

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir