Sagas are a design pattern that allows you to manage data consistency across multiple microservices without using distributed transactions.

Let’s walk through implementing a saga in Python, specifically using a choreography-based approach where services react to each other’s events. Imagine we’re building an e-commerce order processing system. An order involves creating an order, processing payment, and updating inventory.

Here’s a simplified view of our services and their responsibilities:

  • Order Service: Creates the order.
  • Payment Service: Processes the payment for the order.
  • Inventory Service: Updates stock levels for ordered items.

We’ll use a message broker (like RabbitMQ or Kafka) for inter-service communication. For this example, we’ll simulate message passing with Python dictionaries.

Step 1: Define Your Events

Events are crucial for choreography. Each service publishes events when a significant state change occurs.

# events.py

class OrderCreated:
    def __init__(self, order_id, user_id, total_amount):
        self.order_id = order_id
        self.user_id = user_id
        self.total_amount = total_amount

class PaymentProcessed:
    def __init__(self, order_id, payment_id):
        self.order_id = order_id
        self.payment_id = payment_id

class PaymentFailed:
    def __init__(self, order_id, reason):
        self.order_id = order_id
        self.reason = reason

class InventoryUpdated:
    def __init__(self, order_id, item_id, quantity):
        self.order_id = order_id
        self.item_id = item_id
        self.quantity = quantity

class InventoryUpdateFailed:
    def __init__(self, order_id, item_id, reason):
        self.order_id = order_id
        self.item_id = item_id
        self.reason = reason

# Compensation events
class PaymentRefunded:
    def __init__(self, order_id, payment_id):
        self.order_id = order_id
        self.payment_id = payment_id

class OrderCancelled:
    def __init__(self, order_id):
        self.order_id = order_id

Step 2: Implement the Order Service

The Order Service initiates the saga.

# order_service.py
from events import OrderCreated, OrderCancelled
import uuid
import time

class OrderService:
    def __init__(self, message_broker):
        self.message_broker = message_broker
        self.orders = {} # In-memory store for simplicity

    def create_order(self, user_id, items):
        order_id = str(uuid.uuid4())
        total_amount = sum(item['price'] for item in items)
        self.orders[order_id] = {
            'user_id': user_id,
            'items': items,
            'status': 'PENDING',
            'total_amount': total_amount
        }
        print(f"Order {order_id} created. Publishing OrderCreated event.")
        self.message_broker.publish('order_events', OrderCreated(order_id, user_id, total_amount))
        return order_id

    def handle_payment_processed(self, event: PaymentProcessed):
        order_id = event.order_id
        if order_id in self.orders and self.orders[order_id]['status'] == 'PENDING':
            self.orders[order_id]['status'] = 'PAID'
            print(f"Order {order_id}: Payment processed. Status updated to PAID.")
            # In a real system, you might publish an OrderPaid event here
            # For this saga, we just track the state.

    def handle_inventory_updated(self, event: InventoryUpdated):
        order_id = event.order_id
        if order_id in self.orders and self.orders[order_id]['status'] == 'PAID':
            self.orders[order_id]['status'] = 'COMPLETED'
            print(f"Order {order_id}: Inventory updated. Order COMPLETED.")

    def handle_payment_failed(self, event: PaymentFailed):
        order_id = event.order_id
        if order_id in self.orders and self.orders[order_id]['status'] == 'PENDING':
            self.orders[order_id]['status'] = 'FAILED'
            print(f"Order {order_id}: Payment failed. Order status set to FAILED.")
            # No compensation needed here, as it failed before payment

    def handle_inventory_update_failed(self, event: InventoryUpdateFailed):
        order_id = event.order_id
        if order_id in self.orders and self.orders[order_id]['status'] == 'PAID':
            print(f"Order {order_id}: Inventory update failed. Initiating compensation.")
            self.orders[order_id]['status'] = 'PAYMENT_PENDING_REFUND' # Intermediate state
            # Publish a compensation event for Payment Service
            self.message_broker.publish('order_events', OrderCancelled(order_id)) # Simplified: OrderCancelled implies refund

Step 3: Implement the Payment Service

The Payment Service listens for OrderCreated and publishes PaymentProcessed or PaymentFailed. It also listens for OrderCancelled to perform compensation.

# payment_service.py
from events import OrderCreated, PaymentProcessed, PaymentFailed, PaymentRefunded, OrderCancelled
import uuid
import random
import time

class PaymentService:
    def __init__(self, message_broker):
        self.message_broker = message_broker
        self.payments = {} # In-memory store

    def handle_order_created(self, event: OrderCreated):
        order_id = event.order_id
        amount = event.total_amount
        print(f"Payment Service: Received OrderCreated for {order_id}. Processing payment of {amount}.")

        payment_id = str(uuid.uuid4())
        self.payments[order_id] = {'payment_id': payment_id, 'status': 'PENDING'}

        # Simulate payment processing with a chance of failure
        time.sleep(0.5) # Simulate network latency/processing time
        if random.random() > 0.1: # 90% success rate
            self.payments[order_id]['status'] = 'PROCESSED'
            print(f"Payment {payment_id} processed for order {order_id}. Publishing PaymentProcessed.")
            self.message_broker.publish('payment_events', PaymentProcessed(order_id, payment_id))
        else:
            self.payments[order_id]['status'] = 'FAILED'
            print(f"Payment failed for order {order_id}. Publishing PaymentFailed.")
            self.message_broker.publish('payment_events', PaymentFailed(order_id, "Insufficient funds"))

    def handle_order_cancelled(self, event: OrderCancelled):
        order_id = event.order_id
        if order_id in self.payments and self.payments[order_id]['status'] == 'PROCESSED':
            payment_id = self.payments[order_id]['payment_id']
            print(f"Payment Service: Received OrderCancelled for {order_id}. Refunding payment {payment_id}.")
            self.payments[order_id]['status'] = 'REFUNDED'
            # Publish a compensation event (optional, depending on saga design)
            self.message_broker.publish('payment_events', PaymentRefunded(order_id, payment_id))
        elif order_id in self.payments and self.payments[order_id]['status'] == 'PENDING':
            print(f"Payment Service: Received OrderCancelled for {order_id}. Payment was not yet processed, no refund needed.")
            self.payments[order_id]['status'] = 'CANCELLED_BEFORE_PROCESSING'

Step 4: Implement the Inventory Service

The Inventory Service listens for PaymentProcessed and publishes InventoryUpdated or InventoryUpdateFailed. It also listens for OrderCancelled to potentially reverse inventory updates (though in this simple model, cancellation implies payment failed before inventory was touched).

# inventory_service.py
from events import PaymentProcessed, InventoryUpdated, InventoryUpdateFailed, OrderCancelled
import uuid
import random
import time

class InventoryService:
    def __init__(self, message_broker):
        self.message_broker = message_broker
        self.inventory = {'ITEM123': 100, 'ITEM456': 50} # Stock levels
        self.order_item_reservations = {} # Track items reserved for orders

    def handle_payment_processed(self, event: PaymentProcessed):
        # In a real system, we'd know the items from the OrderCreated event
        # For simplicity, let's assume a fixed item for the order
        order_id = event.order_id
        item_id = 'ITEM123' # Example item
        quantity = 1 # Example quantity

        print(f"Inventory Service: Received PaymentProcessed for order {order_id}. Attempting to update inventory for {item_id}.")

        if item_id not in self.inventory or self.inventory[item_id] < quantity:
            print(f"Inventory update failed for order {order_id}: Insufficient stock for {item_id}.")
            self.message_broker.publish('inventory_events', InventoryUpdateFailed(order_id, item_id, "Insufficient stock"))
            # Publish OrderCancelled to trigger payment refund
            self.message_broker.publish('inventory_events', OrderCancelled(order_id))
            return

        self.inventory[item_id] -= quantity
        self.order_item_reservations[order_id] = {'item_id': item_id, 'quantity': quantity}
        print(f"Inventory updated for order {order_id}. New stock for {item_id}: {self.inventory[item_id]}. Publishing InventoryUpdated.")
        self.message_broker.publish('inventory_events', InventoryUpdated(order_id, item_id, quantity))

    def handle_order_cancelled(self, event: OrderCancelled):
        order_id = event.order_id
        if order_id in self.order_item_reservations:
            reservation = self.order_item_reservations[order_id]
            item_id = reservation['item_id']
            quantity = reservation['quantity']
            print(f"Inventory Service: Received OrderCancelled for {order_id}. Releasing inventory for {item_id} (quantity: {quantity}).")
            self.inventory[item_id] += quantity
            del self.order_item_reservations[order_id]
            # In a real system, you might publish an InventoryReleased event

Step 5: Simulate the Message Broker and Run the Saga

This is a simplified simulation. In reality, you’d use libraries like pika for RabbitMQ or kafka-python for Kafka.

# message_broker.py
import threading
from collections import defaultdict

class MessageBroker:
    def __init__(self):
        self.listeners = defaultdict(list)
        self.lock = threading.Lock()

    def subscribe(self, topic, callback):
        with self.lock:
            self.listeners[topic].append(callback)

    def publish(self, topic, message):
        print(f"\n--- Publishing to '{topic}': {type(message).__name__} ---")
        with self.lock:
            for callback in self.listeners[topic]:
                # Simulate asynchronous processing by using a thread
                thread = threading.Thread(target=callback, args=(message,))
                thread.start()

# --- Main Execution ---
if __name__ == "__main__":
    broker = MessageBroker()

    order_service = OrderService(broker)
    payment_service = PaymentService(broker)
    inventory_service = InventoryService(broker)

    # Subscribe services to relevant topics
    broker.subscribe('order_events', order_service.handle_payment_processed)
    broker.subscribe('order_events', order_service.handle_inventory_updated)
    broker.subscribe('order_events', order_service.handle_payment_failed)
    broker.subscribe('order_events', order_service.handle_inventory_update_failed)

    broker.subscribe('payment_events', payment_service.handle_order_created)
    broker.subscribe('payment_events', payment_service.handle_order_cancelled)

    broker.subscribe('inventory_events', inventory_service.handle_payment_processed)
    broker.subscribe('inventory_events', inventory_service.handle_order_cancelled)

    # --- Simulate an Order ---
    print("--- Starting Saga Simulation ---")
    order_id = order_service.create_order("user123", [{'item_id': 'ITEM123', 'price': 50.00}])

    # Wait for events to propagate and services to process them
    time.sleep(3) # Give threads time to finish

    print("\n--- Final Order Status ---")
    print(f"Order {order_id} status: {order_service.orders.get(order_id, {}).get('status', 'Not Found')}")
    print(f"Current inventory: {inventory_service.inventory}")

    print("\n--- Simulating another order that fails inventory ---")
    # Reduce stock to force inventory failure
    inventory_service.inventory['ITEM123'] = 0
    order_id_fail = order_service.create_order("user456", [{'item_id': 'ITEM123', 'price': 50.00}])
    time.sleep(3)
    print("\n--- Final Order Status (Failure Scenario) ---")
    print(f"Order {order_id_fail} status: {order_service.orders.get(order_id_fail, {}).get('status', 'Not Found')}")
    print(f"Current inventory: {inventory_service.inventory}")

In this choreography-based saga:

  • The Order Service starts the process by publishing an OrderCreated event.
  • The Payment Service listens for OrderCreated, attempts payment, and publishes PaymentProcessed or PaymentFailed.
  • The Inventory Service listens for PaymentProcessed, attempts to update stock, and publishes InventoryUpdated or InventoryUpdateFailed.
  • If Inventory fails, the Inventory Service publishes an OrderCancelled event.
  • The Payment Service listens for OrderCancelled and performs a refund if the payment was already processed.

The key is that each service reacts to events from others. If any step in the sequence fails (e.g., inventory update), compensation events are published to undo previous successful steps (e.g., payment refund). This ensures eventual consistency without a central orchestrator or distributed locks. The final state might be COMPLETED (all steps succeeded), or FAILED / REFUNDED (a step failed, and compensation ran).

Want structured learning?

Take the full Saga-pattern course →