Bu yazıda dağıtık sistemlerin mesaj tabanlı iletişimine Spring Cloud Stream ve RabbitMQ kullanarak bir örnek oluşturmaya çalışacağım. Öncelikle konunun biraz teorik tarafına değinmek istiyorum.
Uygulamalar arasında iletişim için senkron ya da asenkron yöntemler kullanabiliriz. Dağıtık uygulamalarda özellikle dikkat ettiğimiz konu uygulamalar arası iletişimde sıkı bağ oluşturmamak olur.
Senkron yöntemler için HTTP request/response modelini örnek verebiliriz. HTTP requestler her ne kadar talebin karşılanması noktasında uygulama tarafından NonBlocking thread modeli ile asenkron gerçekleşebilse de temelde senkron bir haberleşme şeklidir. Çünkü talep yapıldığı anda karşı servis cevap veremeyecek durumda ise talebi yapan uygulama sürecini devam ettiremez. Bu sebepten dolayı bu tarz iletişimden mümkün olduğunca kaçınmak sistemin güçlü bir karaktere sahip olmasını sağlar.
Bu yaklaşıma alternatif olan asenkron haberleşme dediğimizde ise aklımıza ilk olarak mesaj bazlı iletişim gelebilir. Mesaj bazlı gerçekleşen iletişim bize servisler arası esnek bağ sağlar. Mesajın muhatabı olan uygulama o anda uygun olmasa bile, uygun olduğunda talebi alıp cevaplayacaktır.
Bu esneklik ile birlikte uygulama stack’ine farklı componentleri de dahil etmek gerekliliği ortaya çıkar. Örneğin kümeye mesaj trafiğini yöneten, buffer’layan, hataları yöneten RabbitMQ gibi bir araç dahil etmek gerekir. Sisteme dahil edilen her yeni component her ne kadar problemleri çözmek için dahil edilse de bununla birlikte o componentin yönetim/yaşatma maliyeti oluşur. Uygulamanın karakteristiğine göre çözülmeye çalışılan iş problemini en az component ile çözmek daha yalın ve yaşatılabilir bir sisteme sahip olmamızı sağlar.
Bu yaklaşım her ne kadar kritik iş problemleri çözmek için bize avantaj sağlasa da karşımıza yönetilmesi/dikkat edilmesi gereken aşağıdak gibi durumları ortaya çıkarır;
- Uygulamanın sağlamlığını gözetmek için bu aracın darboğaz oluşturmaması çok önemli. Bu noktada RabbitMQ gibi bir araç kullanılıyorsa bunun cluster haline getirilmesi gereklidir. Componentin yedekli olması sistemin daha güçlü olmasını sağlayacaktır.
- Mesaj gönderimi sırasında yapılan işlemler bir database transaction’ı içerisinde gerçekleşiyorsa mesajın bu transaction bütünlüğü içerisinde gönderilmesi gereklidir. İşlemler gerçekleşirken bir mesaj gönderimi yapılmış olabilir, fakat DB seviyesinde commit işlemi sırasında bir hata olursa veriler DB’ye yazılamaz. Fakat öncesinde mesaj gönderilmiş olur.
- Mesajların alınması/tüketilmesi sırasında herhangi bir hatadan dolayı mesaj işlenemeyebilir. Hatta birkaç deneme sonrası dahi işlenemeyebilir. Bu durumda bu teslim edilemeyen mesajın yönetilmesi gereklidir.
- Mesajların belirli bir sıraya göre işlenmesi gerekebilir.
- Aynı mesaj bir şekilde iki defa gönderilmiş olabilir.
Görüldüğü gibi uygulamanın başarısını, kalitesini ve sağlamlığını etkileyen önemli konular karşımıza çıkmaktadır.
Spring Cloud Stream bu noktada bize şu şekilde fayda sağlar; kendi tanımında ifade edildiği gibi yüksek ölçeklenebilir microservices uygulamalar geliştirmeyi sağlayan bir framework’tür.
En büyük avantajı binder olarak ifade ettikleri RabbitMQ, Kafka gibi bir çok mesaj yönetim sistemini abstract hale getirmesidir. Bu şu anlama gelmektedir; uygulamamızı geliştirirken RabbitMQ ya da Kafka gibi bir sistemin api’ları ile değil Spring Cloud Stream’in API’ları üzerine inşa ederiz. Bu sayede uygulamamız bugün RabbitMQ yarın ise Apache Kafka ile çalışabilir.
Şimdi bu yaklaşım ile bir sistemi nasıl geliştirebileceğimize bakalım. Ben örnekte yukarıda bahsettiğim üzere Spring Cloud Stream ve RabbitMQ kullanarak yapmacağım.
Örnek Uygulama
Uygulamaya çalışacağım senaryoda iki servis mevcut; PaymentService ve StockService. Payment Service kendisine RestAPI ile gelen talebi işler, ödemeyi alır ve işlem bittikten sonra Stock Servisine sevkiyatın başlaması için talebi gönderir.
Dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
StockService – application.yml
spring: cloud: stream: bindings: stockOperationInputChannel: destination: stock-service.operation group: stock-service-group binder: local_rabbit rabbit: bindings: stockOperationInputChannel: consumer: autoBindDlq: true republishToDlq: true binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /
Konfigürasyon stream altında üç bölüme ayrılmakta; bindings, rabbit, binders.
Bindings; spring içerisinde mesajlar için queue/topic/exchange yapıları temsil eden channel tanımlarını yapmayı sağlar. Onun da altındaki destination, mesaj servisi üzerindeki kuyruk tanımını belirmemizi sağlar.
group ise mesaj consumer tarafında mesajların load balance şekilde işlenmesini sağlar. Standart olarak RabbitMQ üzerinde bir queue’yi dinleyen client/consumer’lar queue’ye gelen tüm mesajları her birisi alır. Bu durum genelde pek istenmez. Çünkü consumer tarafında birden fazla instance olduğu durumda aynı mesajı tüm consumer’lar işler. Bu durum aynı işin tekrarlı yapılmasına sebep olur. Spring Cloud Stream bu problemi group tanımı ile çözmektedir. Aynı grupta olan consumer’lar mesajları sıralı şekilde işleyerek load balancing ve işlem tekilliği sağlanmış olur.
binder ise konfigürasyondaki binders bölümünde tanımlanan hangi servis(RabbitMQ, Apache Kafka) üzerinde çalışacağını ifade eder.
rabbit bölümünde ise Spring Cloud Stream ile sağlanan abstraction dışında sadece RabbitMQ’ya özel paramatrelerin tanımlandığı bölümdür. Burada ilgili channel ile customization yapılabilir. Bizim burada tanımladığımız parametereler yukarıda bahsettiğim “mesajların tüketilememesi/alınamaması” probleminin çözümü için kullanılan ve “Dead Letter Queue — DLQ” olarak adandırılan queue tanımının yapılmasını sağlamaktır. Burada yapılan, herhangi bir mesaj default olarak üç defa deneme sonucunda teslim edilemezse/tüketilemezse DLQ olarak belirtilen queue’ye yönlendirilir. Bu sayede problemli mesajların kaybolması önlenmiş olur. Consumer tarafında bu problemli mesajları dinleyen bir listener ile yakalayıp loglanabilir, düzeltilip tekrar publish edilebilir vb. şekilde yönetilebilir.
Senaryonun işletilmesi
1 – Ödeme talebinin alınması(Payment Service)
2 – Ödeme alma işleminin yapılması ve stok servisine mesajın gönderilmesi(Payment Service)
Yukarıda bahsettiğimiz gibi burada transactional bir işlem gerçekleşmektedir. Eğer transaction başarılı bir şekilde tamamlanana kadar stok işlemleri için gerekli olan mesajın stok servisine gönderilmemesi gerekmektedir. Bu problemin çözümü için Spring’in bize sağladığı ApplicationEventPublisher kullanıyoruz. ApplicationEventPublisher uygulama context’i içerisinde event yayınlamayı sağlar. Bu işlemi yaparken bize sağladığı diğer avantaj ise işlemin transaction sürecinin bir parçası haline getirebilmesidir.
Yukarıda görüldüğü gibi iki annotasyon bize event listen etmeyi sağlamaktadır. TransactionalEventListener ise mesajın yakalandığı anda bir transaction devam ediyor ise mesajın işlenmesinin transaction sonrası ya da sağlanan diğer aşamalarda yapılabilmesine olanak sağlar. Biz burada AFTER_COMMIT aşamasında yapıyoruz. Çünkü transaction’ın başarılı bir şekilde tamamlanması bizim için önem arzetmektedir.
Peki transaction başarılı bir şekilde tamamlandı ve mesaj artık stok service’e gönderilecek. Fakat o anda RabbitMQ erişilebilir durumda değil. Bu durumda ise mesaj gönderilemeyecek ve işlem bütünlüğü bozulmuş olacak. Bu problemin çözümünü ise Outbox Pattern ile sağlayabiliriz. Ben bu örnekte uygulamadım fakat kısaca çözüm şu; mesajlar bir database tablosuna kaydedilir ve sonrasında bir job aracılığı ile gönderilemeyen mesajlar kontrollü bir şekilde RabbitMQ’ya teslime edilir. Bu sayede mesaj kaybı önlenmiş olup işlem bütünlüğü sağlanır.
3 – StockService’e gelen mesajların işlenmesi;
Konfigürasyonda belirttiğimiz bu tanımların artık uygulama içerisinden erişilip nasıl kullanılacağına bakabiliriz.
Bunun için uygulama içerisinde iki tanım yapılması gerekmektedir, birincisi Channel tanımıdır;

İkincisi ise mesajları almayı sağlayan listener tanımıdır.

Diğer servislerden gelen mesajlar alınıp süreç devam ettirilir. Diğer yandan DEAD_LETTER_QUEUE listener’i ile alınamayan mesajlar loglanır. Dead letter oluşması için kod içerisinde fiyat -1 gelirse hata fırlatmasını sağladım. Bu sayede dead letter oluşmuş olacaktır.
Son olarak Channel tanımının uygulama içerisinde aktif olabilmesi için aşağıdaki tanımın yapılması(EnableBinding) gerekmektedir.


Microservices gibi dağıtık mimarilerde nasıl daha kolay ve yönetilebilir şekilde message driven haberleşme yaklaşımını uygulamaya bir bakış açısı oluşturmaya çalıştım.
Örnek uygulama kodlarına buradan erişebilirsiniz.