Triển khai Saga Pattern trong microservices với NodeJS và Choreography-Based Saga
Triển khai Saga Pattern trong microservices với NodeJS và Choreography-Based Saga
Saga Pattern trong ứng dụng microservices
Mình sẽ sử dụng lại ví dụ Booking Service Online
trong phần trước đó
Ở đây mình sẽ tạo ra các isolated service, đồng thời thiết kế để chúng giao tiếp với nhau thông qua một Message Queue
. Ở đây mình chọn RabbitMQ làm Message Queue
.
1. Triển khai BookingService
// booking-service.ts
import express from 'express'
import amqp from 'amqplib'
const app = express()
const PORT = 3001
app.use(express.json())
let channel: amqp.Channel
const paymentQueue = 'payment_queue'
app.post('/booking', async (req, res) => {
const { userId, eventId, numberOfSeats } = req.body
// Pre-step 1: Validate booking request
// Pre-step 2: Save booking request to application database
const booking = {
userId,
eventId,
numberOfSeats,
bookingReservedSuccessfully: true,
}
/**
* Step 1: Send Booking Request to PaymentService
*/
if (booking.bookingReservedSuccessfully) {
await sendMessageToQueue(paymentQueue, { booking })
}
res.json({ message: 'Booking request sent successfully' })
})
async function connectQueue(queue: string) {
const connection = await amqp.connect('amqp://localhost')
channel = await connection.createChannel()
await channel.assertQueue(queue)
}
async function sendMessageToQueue(queue: string, message: unknown) {
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)))
}
app.listen(PORT, async () => {
console.log(`BookingService is running on http://localhost:${PORT}`)
await connectQueue(paymentQueue)
})
BookingService
xử lý các yêu cầu HTTP POST
để tạo booking
mới. BookingService
cố gắng đặt trước một chỗ và nếu thành công, nó sẽ gửi một message đến PaymentService
thông qua một queue trong RabbitMQ có tên là payment_queue
.
2. Triển khai PaymentService
// booking-service.ts
import amqp from 'amqplib'
let channel: amqp.Channel
const paymentQueue = 'payment_queue'
const seatUpdatingQueue = 'seat_update_queue'
const compensationQueue = 'compensation_queue'
async function processPayment() {
const connection = await amqp.connect('amqp://localhost')
channel = await connection.createChannel()
await channel.assertQueue(paymentQueue)
await channel.assertQueue(seatUpdatingQueue)
await channel.assertQueue(compensationQueue)
channel.consume(paymentQueue, async (msg) => {
const { booking } = JSON.parse(msg.content.toString())
// Pre-step: Process payment
const paymentProcessedSuccessfully = true
/**
* Step 2:
* Send seats update request to SeatUpdatingService
* Or Reverse the booking by sending a compensation request to CompensationService
*/
if (paymentProcessedSuccessfully) {
await sendMessageToQueue(seatUpdatingQueue, { booking })
} else {
await sendMessageToQueue(compensationQueue, {
booking,
event: 'PaymentServiceFailed',
})
}
// Acknowledge the message
channel.ack(msg as amqp.Message)
})
}
async function sendMessageToQueue(queue: string, message: unknown) {
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)))
}
processPayment()
PaymentService
lắng nghe các message trên payment_queue
. Nó chịu trách nhiệm xử lý payment, đồng thời nó cũng sẽ gửi một message đến seat_update_queue
nếu thanh toán thành công hoặc compensation_queue
nếu thanh toán thất bại.
3. Triển khai SeatUpdatingService
// seat-update-service.ts
import amqp from 'amqplib'
let channel: amqp.Channel
const seatUpdatingQueue = 'seat_update_queue'
const notificationQueue = 'notification_queue'
const compensationQueue = 'compensation_queue'
async function processSeatUpdate() {
const connection = await amqp.connect('amqp://localhost')
channel = await connection.createChannel()
await channel.assertQueue(seatUpdatingQueue)
channel.consume(seatUpdatingQueue, async (msg) => {
const { booking } = JSON.parse(msg?.content.toString() || '{}')
// Add your seat updating logic here
const seatsUpdatedSuccessfully = true
/**
* Step 3:
* Send seats update request to NotificationService
* Or Reverse the booking by sending a compensation request to CompensationService
*/
if (seatsUpdatedSuccessfully) {
await sendMessageToQueue(notificationQueue, { booking, isSuccess: true })
} else {
await sendMessageToQueue(compensationQueue, {
booking,
event: 'SeatUpdatingServiceFailed',
})
}
// Acknowledge the message
channel.ack(msg as amqp.Message)
})
}
async function sendMessageToQueue(queue: string, message: unknown) {
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)))
}
processSeatUpdate()
SeatUpdatingService
lắng nghe các message trên seat_update_queue
. Nó chịu trách nhiệm xử lý cập nhật chỗ, đồng thời nó cũng sẽ gửi một message đến notification_queue
nếu thực hiện thành công hoặc compensation_queue
nếu thực hiện thất bại.
4. Triển khai CompensationService
// seat-update-service.ts
import amqp from 'amqplib'
let channel: amqp.Channel
const compensationQueue = 'compensation_queue'
const notificationQueue = 'notification_queue'
async function processCompensation() {
const connection = await amqp.connect('amqp://localhost')
channel = await connection.createChannel()
await channel.assertQueue(compensationQueue)
channel.consume(compensationQueue, async (msg) => {
const { booking, event } = JSON.parse(msg?.content.toString() || '{}')
console.log(`Compensating for user ${booking.userId} and event ${event}`)
/**
* Step 4: Do compensation
*/
switch (event) {
case 'PaymentServiceFailed':
console.log(
`Compensating PaymentServiceFailed for user ${booking.userId}`
)
// Add your compensation logic here
break
case 'SeatUpdatingServiceFailed':
console.log(
`Compensating SeatUpdatingServiceFailed for user ${booking.userId}`
)
// Add your compensation logic here
break
}
// Send notification to user
await sendMessageToQueue(notificationQueue, { booking, isSuccess: false })
// Acknowledge the message
channel.ack(msg as amqp.Message)
})
}
async function sendMessageToQueue(queue: string, message: unknown) {
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)))
}
processCompensation()
CompensationService
lắng nghe các message trên compensation_queue
. Khi CompensationService
nhận được một message
cho biết có sự cố khi xử lý booking, nó sẽ thực hiện một compensation transaction
để hủy booking và đưa dữ liệu về trạng thái ban đầu, giúp hệ thống nhất quán về dữ liệu.
5. Triển khai NotificationService
// seat-update-service.ts
import amqp from 'amqplib'
let channel: amqp.Channel
const notificationQueue = 'notification_queue'
async function processSendNotification() {
const connection = await amqp.connect('amqp://localhost')
channel = await connection.createChannel()
await channel.assertQueue(notificationQueue)
channel.consume(notificationQueue, async (msg) => {
const { booking, isSuccess } = JSON.parse(msg?.content.toString() || '{}')
/**
* Step 4: Send notification to user
*/
if (isSuccess) {
console.log(`Booking SUCCESS sent to user ${booking.userId}`)
} else {
console.log(`Booking FAIL send to user ${booking.userId}`)
}
// Acknowledge the message
channel.ack(msg as amqp.Message)
})
}
processSendNotification()
NotificationService
lắng nghe các message trên notification_queue
. Khi nó nhận được một message, nó sẽ gửi một thông báo (có thể là email, số điện thoại,...) tuỳ theo yêu cầu kỹ thuật.
Như vậy, bạn đã triển khai thành công các microservices với mô hình Choreography-Based Saga. Mỗi service sẽ thực hiện nhiệm vụ của mình và giao tiếp với các service khác thông qua event
.