RabbitMQ Streams can replay messages, but it’s not a primary feature of its "replay" mechanism; it’s more about efficient, ordered consumption of historical data.
Let’s see it in action. Imagine we have a super-stream that’s been chugging along for a while.
// Producer sending messages
Producer producer = new ProducerBuilder()
.uri("rabbitmq-stream://localhost:5551")
.build();
for (int i = 0; i < 100; i++) {
producer.send(
"super-stream",
MessageBuilder.withBody(("Hello " + i).getBytes(StandardCharsets.UTF_8))
.build()
);
}
producer.close();
Now, a consumer wants to pick up where it left off, but also re-process the last 10 messages.
// Consumer wanting to replay
Consumer consumer = new ConsumerBuilder()
.uri("rabbitmq-stream://localhost:5551")
.handle((message) -> {
System.out.println("Received: " + new String(message.getBody()));
})
.build();
// This is where the magic happens for "replay"
consumer.start(
"super-stream",
new ConsumerOptions()
.offset(OffsetSpecification.last(10)) // Start 10 messages ago
.build()
);
The core idea behind RabbitMQ Streams’ "replay" capability is its persistent, ordered log structure. Unlike traditional message queues where messages are deleted upon acknowledgment, Streams keeps all messages until explicitly pruned. This log is partitioned, and each partition has a monotonically increasing offset. When a consumer connects, it specifies an offset. If that offset is in the past, the consumer simply starts reading from that point in the log, effectively replaying messages. The OffsetSpecification.last(N) is a convenience to say "start N messages before the current end of the stream." You could also specify an absolute offset if you knew it.
The key components you control are:
OffsetSpecification: This is your primary lever. You can define offsets in several ways:OffsetSpecification.first(): Start from the absolute beginning of the stream.OffsetSpecification.last(int count): Startcountmessages before the current end.OffsetSpecification.offset(long offset): Start at a specific, absolute offset within the stream’s log. You’d typically get this offset from a previous consumption checkpoint.OffsetSpecification.time(long epochMillis): Start at the first message after a specific timestamp.
- Consumer Group: Consumers within the same group share offsets. This means if one consumer in a group reprocesses messages, other consumers in that same group won’t automatically reprocess them unless they also specify an offset that dictates it. This is crucial for managing state and avoiding duplicate processing across your application.
- Pruning: Streams doesn’t keep data forever by default. You’ll need to configure pruning policies (e.g., by age or size) on the stream itself to manage disk space. If a message has been pruned, it can no longer be replayed.
The surprising thing is how seamlessly this replay fits into the consumption model. You don’t have separate "replay" queues or complex routing. You just tell a standard consumer to start reading from an earlier point in the stream’s log. The underlying storage and protocol handle delivering those historical messages as if they were new. The system doesn’t distinguish between "new" and "replayed" messages at the delivery level; it just delivers based on the requested offset. The consumer’s application logic is responsible for idempotency if it needs to handle replayed messages without side effects.
The next concept to explore is how to manage consumer offsets reliably for fault tolerance and precise checkpointing.