Sagas are a design pattern that allows you to manage data consistency across multiple microservices without using distributed transactions.
Let’s walk through implementing a saga in Python, specifically using a choreography-based approach where services react to each other’s events. Imagine we’re building an e-commerce order processing system. An order involves creating an order, processing payment, and updating inventory.
Here’s a simplified view of our services and their responsibilities:
- Order Service: Creates the order.
- Payment Service: Processes the payment for the order.
- Inventory Service: Updates stock levels for ordered items.
We’ll use a message broker (like RabbitMQ or Kafka) for inter-service communication. For this example, we’ll simulate message passing with Python dictionaries.
Step 1: Define Your Events
Events are crucial for choreography. Each service publishes events when a significant state change occurs.
# events.py
class OrderCreated:
def __init__(self, order_id, user_id, total_amount):
self.order_id = order_id
self.user_id = user_id
self.total_amount = total_amount
class PaymentProcessed:
def __init__(self, order_id, payment_id):
self.order_id = order_id
self.payment_id = payment_id
class PaymentFailed:
def __init__(self, order_id, reason):
self.order_id = order_id
self.reason = reason
class InventoryUpdated:
def __init__(self, order_id, item_id, quantity):
self.order_id = order_id
self.item_id = item_id
self.quantity = quantity
class InventoryUpdateFailed:
def __init__(self, order_id, item_id, reason):
self.order_id = order_id
self.item_id = item_id
self.reason = reason
# Compensation events
class PaymentRefunded:
def __init__(self, order_id, payment_id):
self.order_id = order_id
self.payment_id = payment_id
class OrderCancelled:
def __init__(self, order_id):
self.order_id = order_id
Step 2: Implement the Order Service
The Order Service initiates the saga.
# order_service.py
from events import OrderCreated, OrderCancelled
import uuid
import time
class OrderService:
def __init__(self, message_broker):
self.message_broker = message_broker
self.orders = {} # In-memory store for simplicity
def create_order(self, user_id, items):
order_id = str(uuid.uuid4())
total_amount = sum(item['price'] for item in items)
self.orders[order_id] = {
'user_id': user_id,
'items': items,
'status': 'PENDING',
'total_amount': total_amount
}
print(f"Order {order_id} created. Publishing OrderCreated event.")
self.message_broker.publish('order_events', OrderCreated(order_id, user_id, total_amount))
return order_id
def handle_payment_processed(self, event: PaymentProcessed):
order_id = event.order_id
if order_id in self.orders and self.orders[order_id]['status'] == 'PENDING':
self.orders[order_id]['status'] = 'PAID'
print(f"Order {order_id}: Payment processed. Status updated to PAID.")
# In a real system, you might publish an OrderPaid event here
# For this saga, we just track the state.
def handle_inventory_updated(self, event: InventoryUpdated):
order_id = event.order_id
if order_id in self.orders and self.orders[order_id]['status'] == 'PAID':
self.orders[order_id]['status'] = 'COMPLETED'
print(f"Order {order_id}: Inventory updated. Order COMPLETED.")
def handle_payment_failed(self, event: PaymentFailed):
order_id = event.order_id
if order_id in self.orders and self.orders[order_id]['status'] == 'PENDING':
self.orders[order_id]['status'] = 'FAILED'
print(f"Order {order_id}: Payment failed. Order status set to FAILED.")
# No compensation needed here, as it failed before payment
def handle_inventory_update_failed(self, event: InventoryUpdateFailed):
order_id = event.order_id
if order_id in self.orders and self.orders[order_id]['status'] == 'PAID':
print(f"Order {order_id}: Inventory update failed. Initiating compensation.")
self.orders[order_id]['status'] = 'PAYMENT_PENDING_REFUND' # Intermediate state
# Publish a compensation event for Payment Service
self.message_broker.publish('order_events', OrderCancelled(order_id)) # Simplified: OrderCancelled implies refund
Step 3: Implement the Payment Service
The Payment Service listens for OrderCreated and publishes PaymentProcessed or PaymentFailed. It also listens for OrderCancelled to perform compensation.
# payment_service.py
from events import OrderCreated, PaymentProcessed, PaymentFailed, PaymentRefunded, OrderCancelled
import uuid
import random
import time
class PaymentService:
def __init__(self, message_broker):
self.message_broker = message_broker
self.payments = {} # In-memory store
def handle_order_created(self, event: OrderCreated):
order_id = event.order_id
amount = event.total_amount
print(f"Payment Service: Received OrderCreated for {order_id}. Processing payment of {amount}.")
payment_id = str(uuid.uuid4())
self.payments[order_id] = {'payment_id': payment_id, 'status': 'PENDING'}
# Simulate payment processing with a chance of failure
time.sleep(0.5) # Simulate network latency/processing time
if random.random() > 0.1: # 90% success rate
self.payments[order_id]['status'] = 'PROCESSED'
print(f"Payment {payment_id} processed for order {order_id}. Publishing PaymentProcessed.")
self.message_broker.publish('payment_events', PaymentProcessed(order_id, payment_id))
else:
self.payments[order_id]['status'] = 'FAILED'
print(f"Payment failed for order {order_id}. Publishing PaymentFailed.")
self.message_broker.publish('payment_events', PaymentFailed(order_id, "Insufficient funds"))
def handle_order_cancelled(self, event: OrderCancelled):
order_id = event.order_id
if order_id in self.payments and self.payments[order_id]['status'] == 'PROCESSED':
payment_id = self.payments[order_id]['payment_id']
print(f"Payment Service: Received OrderCancelled for {order_id}. Refunding payment {payment_id}.")
self.payments[order_id]['status'] = 'REFUNDED'
# Publish a compensation event (optional, depending on saga design)
self.message_broker.publish('payment_events', PaymentRefunded(order_id, payment_id))
elif order_id in self.payments and self.payments[order_id]['status'] == 'PENDING':
print(f"Payment Service: Received OrderCancelled for {order_id}. Payment was not yet processed, no refund needed.")
self.payments[order_id]['status'] = 'CANCELLED_BEFORE_PROCESSING'
Step 4: Implement the Inventory Service
The Inventory Service listens for PaymentProcessed and publishes InventoryUpdated or InventoryUpdateFailed. It also listens for OrderCancelled to potentially reverse inventory updates (though in this simple model, cancellation implies payment failed before inventory was touched).
# inventory_service.py
from events import PaymentProcessed, InventoryUpdated, InventoryUpdateFailed, OrderCancelled
import uuid
import random
import time
class InventoryService:
def __init__(self, message_broker):
self.message_broker = message_broker
self.inventory = {'ITEM123': 100, 'ITEM456': 50} # Stock levels
self.order_item_reservations = {} # Track items reserved for orders
def handle_payment_processed(self, event: PaymentProcessed):
# In a real system, we'd know the items from the OrderCreated event
# For simplicity, let's assume a fixed item for the order
order_id = event.order_id
item_id = 'ITEM123' # Example item
quantity = 1 # Example quantity
print(f"Inventory Service: Received PaymentProcessed for order {order_id}. Attempting to update inventory for {item_id}.")
if item_id not in self.inventory or self.inventory[item_id] < quantity:
print(f"Inventory update failed for order {order_id}: Insufficient stock for {item_id}.")
self.message_broker.publish('inventory_events', InventoryUpdateFailed(order_id, item_id, "Insufficient stock"))
# Publish OrderCancelled to trigger payment refund
self.message_broker.publish('inventory_events', OrderCancelled(order_id))
return
self.inventory[item_id] -= quantity
self.order_item_reservations[order_id] = {'item_id': item_id, 'quantity': quantity}
print(f"Inventory updated for order {order_id}. New stock for {item_id}: {self.inventory[item_id]}. Publishing InventoryUpdated.")
self.message_broker.publish('inventory_events', InventoryUpdated(order_id, item_id, quantity))
def handle_order_cancelled(self, event: OrderCancelled):
order_id = event.order_id
if order_id in self.order_item_reservations:
reservation = self.order_item_reservations[order_id]
item_id = reservation['item_id']
quantity = reservation['quantity']
print(f"Inventory Service: Received OrderCancelled for {order_id}. Releasing inventory for {item_id} (quantity: {quantity}).")
self.inventory[item_id] += quantity
del self.order_item_reservations[order_id]
# In a real system, you might publish an InventoryReleased event
Step 5: Simulate the Message Broker and Run the Saga
This is a simplified simulation. In reality, you’d use libraries like pika for RabbitMQ or kafka-python for Kafka.
# message_broker.py
import threading
from collections import defaultdict
class MessageBroker:
def __init__(self):
self.listeners = defaultdict(list)
self.lock = threading.Lock()
def subscribe(self, topic, callback):
with self.lock:
self.listeners[topic].append(callback)
def publish(self, topic, message):
print(f"\n--- Publishing to '{topic}': {type(message).__name__} ---")
with self.lock:
for callback in self.listeners[topic]:
# Simulate asynchronous processing by using a thread
thread = threading.Thread(target=callback, args=(message,))
thread.start()
# --- Main Execution ---
if __name__ == "__main__":
broker = MessageBroker()
order_service = OrderService(broker)
payment_service = PaymentService(broker)
inventory_service = InventoryService(broker)
# Subscribe services to relevant topics
broker.subscribe('order_events', order_service.handle_payment_processed)
broker.subscribe('order_events', order_service.handle_inventory_updated)
broker.subscribe('order_events', order_service.handle_payment_failed)
broker.subscribe('order_events', order_service.handle_inventory_update_failed)
broker.subscribe('payment_events', payment_service.handle_order_created)
broker.subscribe('payment_events', payment_service.handle_order_cancelled)
broker.subscribe('inventory_events', inventory_service.handle_payment_processed)
broker.subscribe('inventory_events', inventory_service.handle_order_cancelled)
# --- Simulate an Order ---
print("--- Starting Saga Simulation ---")
order_id = order_service.create_order("user123", [{'item_id': 'ITEM123', 'price': 50.00}])
# Wait for events to propagate and services to process them
time.sleep(3) # Give threads time to finish
print("\n--- Final Order Status ---")
print(f"Order {order_id} status: {order_service.orders.get(order_id, {}).get('status', 'Not Found')}")
print(f"Current inventory: {inventory_service.inventory}")
print("\n--- Simulating another order that fails inventory ---")
# Reduce stock to force inventory failure
inventory_service.inventory['ITEM123'] = 0
order_id_fail = order_service.create_order("user456", [{'item_id': 'ITEM123', 'price': 50.00}])
time.sleep(3)
print("\n--- Final Order Status (Failure Scenario) ---")
print(f"Order {order_id_fail} status: {order_service.orders.get(order_id_fail, {}).get('status', 'Not Found')}")
print(f"Current inventory: {inventory_service.inventory}")
In this choreography-based saga:
- The Order Service starts the process by publishing an
OrderCreatedevent. - The Payment Service listens for
OrderCreated, attempts payment, and publishesPaymentProcessedorPaymentFailed. - The Inventory Service listens for
PaymentProcessed, attempts to update stock, and publishesInventoryUpdatedorInventoryUpdateFailed. - If Inventory fails, the Inventory Service publishes an
OrderCancelledevent. - The Payment Service listens for
OrderCancelledand performs a refund if the payment was already processed.
The key is that each service reacts to events from others. If any step in the sequence fails (e.g., inventory update), compensation events are published to undo previous successful steps (e.g., payment refund). This ensures eventual consistency without a central orchestrator or distributed locks. The final state might be COMPLETED (all steps succeeded), or FAILED / REFUNDED (a step failed, and compensation ran).