The most surprising thing about using MassTransit state machines for sagas is how much of your business logic you can push out of your core services and into the saga itself, making your services remarkably thin and focused.
Let’s see a state machine in action. Imagine an order processing system. We want to track an order from Submitted all the way to Shipped or Cancelled, handling payment and inventory.
public class OrderSaga : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State PaymentAuthorized { get; private set; }
public State InventoryReserved { get; private set; }
public State Shipped { get; private set; }
public State Cancelled { get; private set; }
public Event<SubmitOrder> SubmitOrder { get; private set; }
public Event<OrderSubmitted> OrderSubmitted { get; private set; }
public Event<AuthorizePayment> AuthorizePayment { get; private set; }
public Event<PaymentAuthorized> PaymentAuthorizedEvent { get; private set; }
public Event<ReserveInventory> ReserveInventory { get; private set; }
public Event<InventoryReserved> InventoryReservedEvent { get; private set; }
public Event<ShipOrder> ShipOrder { get; private set; }
public Event<OrderShipped> OrderShippedEvent { get; private set; }
public Event<CancelOrder> CancelOrder { get; private set; }
public Event<OrderCancelled> OrderCancelledEvent { get; private set; }
public OrderSaga()
{
InstanceState(x => x.CurrentState);
// Initial state
Initially(
When(SubmitOrder)
.Then(context =>
{
context.Instance.OrderId = context.Data.OrderId;
context.Instance.OrderDate = context.Data.OrderDate;
context.Instance.CustomerId = context.Data.CustomerId;
Console.WriteLine($"Order submitted: {context.Data.OrderId}");
})
.Publish(context => new OrderSubmitted { OrderId = context.Data.OrderId, CustomerId = context.Data.CustomerId })
.TransitionTo(Submitted)
);
// State: Submitted
During(Submitted,
When(AuthorizePayment)
.Then(context => Console.WriteLine($"Authorizing payment for order: {context.Data.OrderId}"))
.Publish(context => new PaymentAuthorized { OrderId = context.Data.OrderId, Amount = context.Data.Amount })
.TransitionTo(PaymentAuthorized),
When(CancelOrder)
.Then(context => Console.WriteLine($"Cancelling order: {context.Data.OrderId}"))
.Publish(context => new OrderCancelled { OrderId = context.Data.OrderId })
.TransitionTo(Cancelled)
);
// State: PaymentAuthorized
During(PaymentAuthorized,
When(ReserveInventory)
.Then(context => Console.WriteLine($"Reserving inventory for order: {context.Data.OrderId}"))
.Publish(context => new InventoryReserved { OrderId = context.Data.OrderId, Items = context.Data.Items })
.TransitionTo(InventoryReserved),
When(CancelOrder)
.Then(context => Console.WriteLine($"Cancelling order after payment auth: {context.Data.OrderId}"))
.Publish(context => new OrderCancelled { OrderId = context.Data.OrderId })
.TransitionTo(Cancelled)
);
// State: InventoryReserved
During(InventoryReserved,
When(ShipOrder)
.Then(context => Console.WriteLine($"Shipping order: {context.Data.OrderId}"))
.Publish(context => new OrderShipped { OrderId = context.Data.OrderId, ShippingAddress = context.Data.ShippingAddress })
.TransitionTo(Shipped),
When(CancelOrder)
.Then(context => Console.WriteLine($"Cancelling order after inventory reserved: {context.Data.OrderId}"))
.Publish(context => new OrderCancelled { OrderId = context.Data.OrderId })
.TransitionTo(Cancelled)
);
// Final states
SetCompletedWhen(Shipped);
SetCompletedWhen(Cancelled);
// Handling events that might arrive out of order or in a completed state
// For example, if CancelOrder arrives after Shipped, it should be ignored.
// MassTransit handles this by default for completed states.
// For other cases, you might add explicit handling.
}
}
// Data contracts (simplified)
public record SubmitOrder(Guid OrderId, DateTime OrderDate, Guid CustomerId);
public record OrderSubmitted(Guid OrderId, Guid CustomerId);
public record AuthorizePayment(Guid OrderId, decimal Amount);
public record PaymentAuthorized(Guid OrderId, decimal Amount);
public record ReserveInventory(Guid OrderId, List<string> Items);
public record InventoryReserved(Guid OrderId, List<string> Items);
public record ShipOrder(Guid OrderId, string ShippingAddress);
public record OrderShipped(Guid OrderId, string ShippingAddress);
public record CancelOrder(Guid OrderId);
public record OrderCancelled(Guid OrderId);
// State instance
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public int CurrentState { get; set; } // MassTransit enum for state
public Guid OrderId { get; set; }
public DateTime OrderDate { get; set; }
public Guid CustomerId { get; set; }
}
In this example, the OrderSaga defines the entire lifecycle of an order. It listens for specific events (SubmitOrder, AuthorizePayment, ReserveInventory, ShipOrder, CancelOrder) and, based on the current state of the saga instance, transitions to a new state and/or publishes new events.
The OrderState class is the persistence model for the saga. MassTransit uses this to store the current state of each individual order’s saga instance. The CorrelationId is crucial; it’s how MassTransit links incoming messages to the correct saga instance. Typically, this would be the OrderId.
Here’s how you’d wire this up in your Program.cs (or equivalent):
// Assuming you have a MassTransit bus configured
var bus = MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ConfigureEndpoints(context);
cfg.AddSagaStateMachine<OrderSaga, OrderState>(x =>
{
x.Instance(instance =>
{
instance.SetCorrelationId(instance => instance.OrderId); // Link state instance to message correlation
});
x.State(() => new OrderSaga().Submitted); // Register states
x.State(() => new OrderSaga().PaymentAuthorized);
x.State(() => new OrderSaga().InventoryReserved);
x.State(() => new OrderSaga().Shipped);
x.State(() => new OrderSaga().Cancelled);
// Configure persistence for the saga state.
// For example, using SQL Server:
x.UseInMemoryOutbox(); // Essential for ensuring event publishing happens within the transaction
x.UseSqlServerPersistence(configure =>
{
configure.ConnectionFactory(async () => new SqlConnection("Your_Connection_String"));
configure.SchemaName = "sagas"; // Optional schema
configure.TableName = "OrderState"; // Optional table name
});
});
});
await bus.StartAsync();
The SetCorrelationId(instance => instance.OrderId) is the magic link. When an SubmitOrder message arrives, MassTransit looks for an OrderState instance where OrderId matches the OrderId in the message. If it finds one, it loads its state and processes the event. If not, and the event is the initial event for the saga (like SubmitOrder), it creates a new OrderState instance.
The UseSqlServerPersistence (or UseMongoDbPersistence, UseInMemoryPersistence, etc.) tells MassTransit where to store the OrderState. This is critical for the saga to maintain its state across message deliveries and server restarts. UseInMemoryOutbox is vital because it ensures that any events published by the saga are part of the same atomic transaction that updates the saga’s state in the database.
The power here is that your individual services don’t need to know about the order of operations. A PaymentService might just publish PaymentAuthorized, and an InventoryService might publish InventoryReserved. The saga orchestrates these, ensuring they happen in the correct sequence.
The one thing most people don’t realize is that when a state transition happens, MassTransit performs a few operations atomically: it loads the saga state, updates the CurrentState property, executes any Then actions, and then publishes any outgoing events. This entire process is wrapped in a database transaction if persistence is configured, guaranteeing that either all these operations succeed, or none of them do, preventing the system from getting into an inconsistent state.
If you’re using the SQL Server persistence and have correctly configured your OrderState table and the saga definition, the next thing you might encounter is ensuring your message consumers are correctly routing the saga events back to the saga.