Orchestration-Based Saga Implementation

Saga Pattern Distributed Transactions

Saga Pattern dağıtık ortamda distributed transcation yönetilirken veri tutarlılığını hedefler. Implemente edilirken Choreography-Based Saga ve Orchestration-Based Saga olmak üzere iki yaklaşım vardır. Saga Pattern Nedir yazısında bu konu teorik olarak incelendi. Bu yazıda Orchestration-Based Saga Implementation konusu üzerine pratik yapacağız.

MassTransit, üzerine instance storage, event correlation gibi özellikler ekleyerek, bir State Machine kütüphanesi olan Automatonymous‘u kullanır. Bu bize state ve event’lar dahil olmak üzere bir state machine tanımlamamızı sağlıyor. Senaryo gereği yapacağımız örneğe geçmeden önce bir kaç terim hakkında bilgi edinmeliyiz.

State Machine

State, event ve davranışları tanımlayan merkezi birimdir. Oluşturulacak state machine MassTransitStateMachine<TInstance> sınıfını derive etmelidir. Bir state machine bir kere oluşturulur ve instance’larına tetiklenen event doğrultusunda ilgili davranışı uygular.

State Instance

State Machine için gerekli verileri tutan sınıftır. Her consume edilen Initial event doğrultusunda yeni bir instance oluşturulur. (Aynı CorrelationId’ye sahip instance bulunamazsa) Oluşturulacak State Instance SagaStateMachineInstance arayüzünü implemente etmelidir.

Gelen isteklerin birbirinden ayrılarak correlate edilebilmesi için CorrelationId kullanılır. Burada, o anki state CurrentState property’sinde tutulacaktır. Bu state bilgisi State, string veya int tipinde olabilir.

InstanceState methoduyla mevcut state’in hangi property’de tutulacağı belirtilir. Tip olarak State kullanılırsa otomatik olarak yapılandırılacağından dolayı bu method’un çağırılmasına gerek yoktur.

Yukarıda Accepted ve Submitted olmak üzere iki state tanımlanmıştır. Bir instance aynı zamanda bir state’e sahip olabilir. Her yeni instance varsayılan olarak Initial durumundadır. Ayrıca Final durumu da her state machine için tanımlıdır, instance’ın son duruma geldiğini belirtir.

Ayrıca OrderAccepted tipinde bir event tanımlandığını görüyoruz. Bu event’in bir instance ile correlate edilebilmesi için tekil olan OrderId üzerinden ilişkilendirileceği belirtiliyor. Aynı şekilde bu expression bildirimini kullanmak yerine oluşturulan event CorrelatedBy<TKey> arayüzünü implemente edebilir.

Eğer burada event unique bir değer aracılığıyla instance ile correlate olamıyorsa SelectId methodu kullanılmalıdır. NewId Nuget paketiyle generate edilen unique değer instance CorrelationId‘sine atanıyor.

Initially methoduyla state Initial halindeyken OrderSubmitted event davranışı belirleniyor. Bir OrderSubmitted event’i consume edildiğinde ve OrderId ile eşleşen CorrelationId‘ye sahip bir instance bulunamadığında Initial state içerisinde yeni bir instance oluşturulur.

Then methoduyla gelen mesaj üzerindeki property instance üzerindeki property’e set ediliyor. Gelen mesaj üzerindeki property’lere Data, instance property’lerine Instance üzerinden erişiyoruz.

Publish methoduyla OrderAcceptedEvent tipinde bir event yayınlanıyor. TransitionTo methoduyla instance Submitted state’ine geçiriliyor.

Sonrasındaysa During methoduyla instance Submitted halindeyken OrderAccepted eventi consume edildiğinde Send methoduyla UpdateOrderCommand tipinde bir komut gönderiliyor ve instance Accepted durumuna geçiriliyor.

DuringAny methoduyla herhangi bir state’teyken OrderCompleted eventi instance tarafından consume edildiğinde Finalize methoduyla sonlandırılıyor. Bir instance sonlandığında Final durumuna geçer ve varsayılan olarak Saga Repository‘sinden silinmez. SetCompletedWhenFinalized methoduyla Final durumlu state Saga Repository‘sinden kaldırılmak üzere tamamlandı olarak işaretleniyor.

Senaryo

Choreography-Based Saga Implementation yazısında olduğu gibi yine bir e-commerce sistemi üzerinden ilerleyeceğiz. Bu sistem sipariş, stok ve ödeme servislerinden oluşacaktır.

Saga Orchestration Implementation

Yukarıda bulunan diyagramı incelediğimizde;

  1. Order.Api bir istekte bulunarak OrderService‘e CreateOrderCommand tipinde bir komut gönderiyor. OrderService bu mesajı consume ederek PENDING durumunda bir sipariş oluşturuyor. Sonrasında Orchestrator‘a OrderSubmitted tipinde bir komut gönderiyor.
  2. Orchestrator stok işlemlerini gerçekleştirebilmek için StockService‘e ReserveStockCommand tipinde bir komut gönderiyor.
  3. Stok işlemlerinin başarılı olması durumunda StockReservedEvent publish ediliyor. Aksi durumdaysa StockNotReservedEvent publish ediliyor. Bu durumda OrderService‘e OrderFailedCommand tipinde bir komut gönderiliyor. Bu mesaj consume edilerek sipariş durumu REJECTED olarak güncelleniyor.
  4. Orchestrator ödeme işlemlerini gerçekleştirmek üzere PaymentService‘e RequestPaymentCommand tipinde bir komut gönderiyor.
  5. Ödeme işlemleri başarıyla gerçekleştiğinde PaymentConfirmedEvent publish ediliyor. Bu işlemden sonra OrderService‘e OrderCompletedCommand tipinde bir mesaj gönderilerek sipariş durumu CONFIRMED olarak güncelleniyor. Aksi durumdaysa PaymentRejectedEvent yayınlanıyor. Sonrasındaysa bu eventi StockService ve OrderService consume ederek compensable transaction işlemlerini gerçekleştiriyor.

Bu uygulamada bulunan tüm projeler MassTransit.AspNetCore ve MassTransit.RabbitMQ Nuget paketlerini kullanmaktadır. MassTransit konfigürasyonları ve mesaj tanımlamalarına yazının uzamaması adına yer verilmemiştir, proje yazının sonunda paylaşılmıştır.

RabbitMQ ile ilgili bilgileri tutmak için Shared projesine aşağıdaki sınıfı ekliyoruz.

Order Api

Senaryomuza göre ilk tetiklenecek kısımdır. Client tarafından tetiklenerek OrderService’e sipariş isteğini iletecektir.

Görüldüğü üzere gelen sipariş bilgisi doğrudan OrderService‘e ICreateOrderCommand komutuyla oluşturulmak üzere gönderiliyor.

OrderService

Api aracılığıyla gelen istekler doğrultusunda daha sonra işlenmek üzere Pending durumunda bir sipariş oluşturur. Sonrasında IOrderSubmitted tipinde bir event oluşturur.

Sipariş oluşturulduktan sonra artık Orchestrator‘e IOrderSubmittedEvent aracılığıyla mesaj gönderiliyor.

SagaService

Öncelikle State Machine tarafından kullanılacak Instance sınıfımızı oluşturalım.

Instance’ların correlate edileceği CorrelationId ve anlık durum bilgisinin tutulacağı CurrentState yanında state’i tutulacak sipariş ile ilgili bilgilere yer veriliyor.

Sonrasında State Machine tanımlamasını yapalım, yukarıda ön bilgi verildiğinden dolayı tamamı paylaşılacaktır.

İlk olarak State Machine tarafından kullanılacak state’ler tanımlanıyor. Ardından InstanceState methoduyla durumun hangi property üzerinde tutulacağı belirtiliyor.

Sonrasındaysa Event methodlarıyla instance’ların OrderId üzerinden correlate edileceği belirtiliyor. Sıralaması önemli olarak ilk gelecek mesajın OrderSubmittedEvent olacağı belirtiliyor.

Initially methoduyla instance state’i Initial durumundayken hangi işlemlerin yapılacağı tanımlanıyor. Buna göre When methoduyla gelen event kontrolü yapılarak OrderSubmittedEvent olup olmadığına bakılıyor. Eşleşme sağlanıyorsa Then methoduyla gelen mesaj bilgileri Data propery’si üzerinden Instance property’sine atanıyor. Ardından TransitionTo methoduyla state OrderSubmitted durumuna geçiriliyor. Sonrasında Send methoduyla StockService‘ine stok işlemlerini gerçekleştirebilmesi için ReserveStockCommand mesajı gönderiliyor.

During methoduyla instance state’inin OrderSubmitted ve When methoduyla gelen event’in StockReservedEvent olması durumunda yapılacak işlemler belirtiliyor. Buna göre state StockReserved‘a geçiriliyor ve Send methoduyla PaymentService‘e ödeme işlemlerini gerçekleştirebilmesi için RequestPaymentCommand tipinde bir mesaj gönderiliyor. Stok işlemlerinin başarısız olduğu durumdaysa (StockNotReservedEvent) state StockNotReserved durumuna geçirilerek Send methoduyla OrderFailedCommand tipinde bir mesaj gönderiliyor.

Bir sonraki satırdaysa instance state’i StockReserved durumunda ve gelen event’in PaymentConfirmedEvent olduğu zaman state PaymentConfirmed durumuna geçiriliyor. Ardından Send methoduya OrderService‘e OrderCompletedCommand tipinde bir mesaj gönderiliyor. Eğer gelen event PaymentRejectedEvent ise state PaymentRejected durumuna geçiriliyor. Böylece Send methodlarıyla OrderService‘e OrderFailedCommand tipinde, StockService‘e CompensateStockCommand komutları gönderiliyor.

Finalize methoduyla ilgili state’i Final durumuna getiriyoruz. SetCompletedWhenFinalized methoduyla Final durumundaki instance’ların Repository’den temizlenmesini belirtiyoruz.

Program.cs dosyasında SagaStateMachine konfigürasyonlarını yapıyoruz.

InMemoryRepository methoduna aşağıda Saga Persistence başlığında değinilmiştir.

StockService

Bir sipariş oluşturulduğunda sipariş ürün adetleri kadar stoğun güncelleneceği servistir.

Stok kontrolü bir değişken aracılığıyla simüle ediliyor. Yeterli stok olması durumunda IStockReservedEvent tipinde, aksi durumda IStockNotReservedEvent tipinde bir event publish ediliyor.

PaymentService

Stoğun başarıyla işlemlerini gerçekleştirmesi sonrasında ödeme işlemlerinin yapılacağı servistir.

Ödeme işlemleri bir değişken aracılığıyla simüle ediliyor. Başarılı olduğu durumda IPaymentConfirmedEvent, aksi durumda IPaymentRejectedEvent tipinde bir event publish ediliyor.

Ödemenin başarısız olduğu durumda StockService, Orchestrator üzerinden compensable transaction gerçekleştirerek eski haline döndürmelidir.

Aynı şekilde OrderService‘te ödemenin başarısız olduğu durumda sipariş durumunu güncellemelidir. Ayrıca bu consumer StockService‘inden IStockNotReservedEvent tipinde bir event publish edildiğinde de kullanılmaktadır.

Ödeme başarılıysa OrderService sipariş durumunu güncellemelidir.

Bonus: Saga Persistence

Saga, durum bilgisine sahip event-based message consumer’ları olduğundan event’ler arası state bilgilerini korumak önem arz etmektedir. Persistent bir state olmadan Saga her eventi yeni bir event olarak algılayacak ve sonrasında gelecek eventler birbirine bağlı olmayacaktır. Bu nedenle state’i depolamalıyız. MassTransit bize state bilgisini depolamak için farklı yollar sunar. InMemory, EF Core, MongoDb, Redis ve daha fazlası… Biz Entity Framework Core kullanacağız.

Bunun için öncelikle MassTransit.EntityFrameworkCore Nuget paketini yükleyelim ve DbContext sınıfımızı oluşturalım.

DbContext sınıfımız SagaDbContext abstract sınıfından türemelidir.

SagaClassMap<TSaga> abstract sınıfıyla mapping işlemlerini gerçekleştiriyoruz ve SagaDbContext sınıfının Configurations property’sini override ederek uyguluyoruz.

Program.cs içerisinde gerekli konfigürasyonları yapalım.

Görüldüğü üzere mikroservisler arasında distributed transaction veri tutarlılığını Orchestration Pattern uygulanarak sağladık. Senaryoda geliştirilmiş olan projeye buradan ulaşabilirsiniz.

You may also like...

Bir cevap yazın

E-posta hesabınız yayımlanmayacak.