Choreography-Based Saga Implementation
Saga Pattern dağıtık ortamda distributed transcation yönetilirken veri tutarlılığını hedefler. Implemente edilirken Choreography-Based Saga ve Orchestration-Based Saga olmak üzere iki yaklaşım vardır. Saga Pattern Nedir yazısında bu konu teorik olarak incelendi. Bu yazıda Choreography-Based Saga Implementation konusu üzerine pratik yapacağız.
Senaryo
Bir e-commerce sistemimiz olduğunu düşünelim. Bu basit sistem üzerinde sipariş, stok ve ödeme servisleriyle çalışacağız.
Yukarıda bulunan diyagramı incelediğimizde;
- Order.Api bir istekte bulunarak OrderService‘e CreateOrderCommand tipinde bir komut gönderiyor.
- OrderService aldığı bu komut ile Pending durumunda bir sipariş oluşturarak OrderCreatedEvent tipinde bir event publish ediyor.
- Bu event’e subscribe olmuş StockService, siparişte bulunan ürünlere bakarak stok kontrolü yapıyor. Yeterli stok bulunduğu taktirde stokları siparişteki ürün miktarı kadar azaltarak StockReservedEvent tipinde bir event publish ediyor. Aksi durumda StockNotReserved tipinde bir event yayınlıyor. Bu event’e subscribe olmuş OrderService tarafından oluşturulmuş siparişin durumu Rejected olarak güncelleniyor.
- Stok başarıyla düşürüldükten sonra sıra ödemenin alınmasına geliyor. PaymentService gerekli ödeme işlemlerini yaptıktan sonra başarılı olması durumunda PaymentConfirmedEvent tipinde bir event publish ediyor. Bunu dinleyen OrderService oluşturmuş olduğu siparişi Confirmed olarak işaretleyerek transaction’u tamamlıyor. Aksi durumda bu event’e subscribe olmuş StockService ve OrderService compensable transaction yaparak telafi ediyor.
Bu uygulamada bulunan tüm projeler MassTransit.AspNetCore ve MassTransit.RabbitMQ Nuget paketlerini kullanılmaktadır. Ayrıca proje içi konfigürasyonlara yazının uzamaması adına yer verilmemiştir, yazı sonunda proje paylaşılmıştır.
Order Api
Senaryomuza göre ilk giriş noktasıdır. Client tarafından tetiklenerek, controller aracılığıyla sipariş isteğini OrderService‘e iletecektir.
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
private readonly ISendEndpointProvider _sendEndpointProvider;
public OrderController(ISendEndpointProvider sendEndpointProvider)
{
_sendEndpointProvider = sendEndpointProvider;
}
[HttpPost]
public async Task<IActionResult> Post(CreateOrderRequest createOrderRequest)
{
var sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{RabbitMQConstants.CreateOrderQueueName}"));
await sendEndpoint.Send<ICreateOrderCommand>(new
{
UserId = createOrderRequest.UserId,
Items = createOrderRequest.Items
});
return Accepted();
}
}
Görüldüğü üzere command, MassTransit aracılığıyla RabbitMQ altyapısıyla gönderiliyor.
OrderService
Sipariş isteklerini alarak Pending durumunda daha sonradan işlenmek üzere bir sipariş oluşturarak OrderCreatedEvent tipinde bir event publish eder.
public class CreateOrderCommandConsumer : IConsumer<ICreateOrderCommand>
{
private readonly IPublishEndpoint _publishEndpoint;
public CreateOrderCommandConsumer(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task Consume(ConsumeContext<ICreateOrderCommand> context)
{
var message = context.Message;
// Some validation ...
// Create order with Pending status
await _publishEndpoint.Publish<IOrderCreatedEvent>(new
{
UserId = message.UserId,
OrderId = 1,
Items = message.Items.Select(s => new
{
Id = s.ProductId,
Quantity = s.Quantity
}),
TotalAmount = message.Items.Sum(s => s.Quantity * s.Price)
});
}
}
Görüldüğü üzere gerekli business-logic işlendikten sonra sipariş oluşturuluyor ve event publish ediliyor.
StockService
Bir sipariş oluşturulduktan sonra siparişteki ürün adetleri kadar stoğun güncelleneceği servistir.
public class OrderCreatedEventConsumer : IConsumer<IOrderCreatedEvent>
{
private readonly IPublishEndpoint _publishEndpoint;
public OrderCreatedEventConsumer(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task Consume(ConsumeContext<IOrderCreatedEvent> context)
{
var message = context.Message;
var stockResult = true;
if (stockResult)
{
// Update stocks ...
await _publishEndpoint.Publish<IStockReservedEvent>(new
{
UserId = message.UserId,
OrderId = message.OrderId,
TotalAmount = message.TotalAmount,
Items = message.Items
});
}
else
{
await _publishEndpoint.Publish<IStockNotReservedEvent>(new
{
OrderId = message.OrderId,
Message = "Insufficient Stock"
});
}
}
}
Görüldüğü üzere gerekli stok kontrolü bir değişken ile simüle ediliyor. Yeterli stok olması durumunda stok düşümü yapılarak StockReservedEvent, aksi durumda StockNotReserved event’i publish edilmektedir.
public class StockNotReservedEventConsumer : IConsumer<IStockNotReservedEvent>
{
public Task Consume(ConsumeContext<IStockNotReservedEvent> context)
{
// Update order status to Rejected from Pending via {context.Message.OrderId}
return Task.CompletedTask;
}
}
Stoğun yetersiz olması durumunda OrderService oluşturduğu siparişi bu evente subscribe olarak işliyor. StockService tarafından üretilen mesajı IConsumer<TMessage> arayüzünü implemente ederek consume ediyor. Nihayetinde ConsumeContext<TMessage> üzerinden siparişin durumunu Rejected olarak işaretliyor.
PaymentService
Stoğun başarıyla ayrılması durumunda sıra ödemenin alınmasına geliyor.
public class StockReservedEventConsumer : IConsumer<IStockReservedEvent>
{
private readonly IPublishEndpoint _publishEndpoint;
public StockReservedEventConsumer(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task Consume(ConsumeContext<IStockReservedEvent> context)
{
var message = context.Message;
var paymentResult = false;
if (paymentResult)
{
await _publishEndpoint.Publish<IPaymentConfirmedEvent>(new
{
OrderId = message.OrderId
});
}
else
{
await _publishEndpoint.Publish<IPaymentRejectedEvent>(new
{
UserId = message.UserId,
OrderId = message.OrderId,
Items = message.Items,
Message = "Payment rejected."
});
}
}
}
Ödeme bir değişken üzerinden simüle ediliyor. Ödemenin başarılı olması durumunda PaymentConfirmedEvent, aksi durumda PaymentRejectedEvent tipinde event publish ediliyor.
Öncelikle ödemenin başarılı olduğu durumda siparişin tamamlandığı Consumer sınıfına göz atalım.
public class PaymentConfirmedEventConsumer : IConsumer<IPaymentConfirmedEvent>
{
public Task Consume(ConsumeContext<IPaymentConfirmedEvent> context)
{
// Update order status to Confirmed from Pending via {context.Message.OrderId}
return Task.CompletedTask;
}
}
OrderService tarafından dinlenen event, ödeme başarılı olduğunda bu mesajı consume ederek siparişin durumunu Confirmed olarak işaretliyor.
Ödemenin başarısız olduğu durumda bu sefer hem stoğun eski haline getirilmesi gerekiyor hem de sipariş durumunun değiştirilmesi gerekiyor.
public class PaymentRejectedEventConsumer : IConsumer<IPaymentRejectedEvent>
{
public Task Consume(ConsumeContext<IPaymentRejectedEvent> context)
{
// Update stocks with compensable transaction to back
return Task.CompletedTask;
}
}
Ödemenin başarısız olduğu durumda StockService tarafından ilgili mesaj consume edilerek gerekli stoklar eski haline getiriliyor.
public class PaymentRejectedEventConsumer : IConsumer<IPaymentRejectedEvent>
{
public Task Consume(ConsumeContext<IPaymentRejectedEvent> context)
{
// Update order status to Rejected from Pending via {context.Message.OrderId}
return Task.CompletedTask;
}
}
Ödemenin başarısız olduğu durumda OrderService tarafından yine mesaj consume edilerek sipariş durumu Rejected olarak değiştiriliyor.
Görüldüğü üzere mikroservisler arasında distributed transaction veri tutarlılığını Choreography Pattern uygulanarak sağladık. Her bir servisin hem subscriber hem de publisher rolünde olduğuna dikkat edelim. Bu yaklaşımda servis sayısı -bir önceki yazıda değinildiği üzere- arttıkça yönetilmesi zor bir hale gelecektir. Bu durumda Orchestration-Based Saga yaklaşımını kullanmak daha doğru olacaktır.
Senaryoda geliştirilmiş olan projeye buradan ulaşabilirsiniz.