RabbitMQ consumers don’t just stop; they get actively cancelled, and the broker initiates this, not your application.
Let’s see a consumer in action. Imagine a simple Go program publishing messages to a hello queue and another Go program consuming them.
Publisher (publisher.go):
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Hello, world!"
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
Consumer (consumer.go):
package main
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer tag
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// Simulate work
time.Sleep(1 * time.Second)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
When you run the consumer and then the publisher, you’ll see "Received a message: Hello, world!". The ch.Consume call returns a channel (msgs) that delivers messages. The for d := range msgs loop is the heart of the consumer, processing each delivery.
The problem arises when RabbitMQ decides to cancel a consumer. This isn’t an error in your application logic; it’s a signal from the broker. The most common reason for this is that the consumer is no longer reachable. If the network connection drops, or if the consumer process crashes without closing its connection gracefully, RabbitMQ will eventually detect this and send a consumer cancellation notification. Your msgs channel will then close, and the for d := range msgs loop will terminate.
Without explicit handling, your consumer simply stops processing messages. It’s not an obvious error; it’s a quiet death. The amqp.Connection object in Go has a NotifyClose channel that you can use to detect when the connection itself is closed by the server. Your amqp.Channel also has a NotifyClose channel. This channel will receive an amqp.Error if the channel is closed by the server.
To handle this gracefully, you need to monitor these notification channels. When a notification arrives, it means the broker has initiated a disconnect. Your application should then attempt to reconnect and re-establish the consumer.
Here’s how you’d modify the consumer to handle connection closures:
package main
import (
"log"
"os"
"os/signal"
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Use a reconnecting consumer pattern
for {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Printf("Failed to connect to RabbitMQ: %v. Retrying in 5s...", err)
time.Sleep(5 * time.Second)
continue // Try to connect again
}
ch, err := conn.Channel()
if err != nil {
log.Printf("Failed to open a channel: %v. Retrying in 5s...", err)
conn.Close() // Close the broken connection
time.Sleep(5 * time.Second)
continue
}
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Printf("Failed to declare a queue: %v. Retrying in 5s...", err)
ch.Close()
conn.Close()
time.Sleep(5 * time.Second)
continue
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer tag
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Printf("Failed to register a consumer: %v. Retrying in 5s...", err)
ch.Close()
conn.Close()
time.Sleep(5 * time.Second)
continue
}
// Channel to listen for OS signals (like Ctrl+C)
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)
// Channel to listen for connection closure notifications from RabbitMQ
notifyClose := conn.NotifyClose(make(chan *amqp.Error))
log.Println("Consumer started. Waiting for messages...")
// Goroutine to process messages
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// Simulate work
time.Sleep(1 * time.Second)
}
log.Println("Message channel closed.")
}()
// Wait for either a signal or a connection close notification
select {
case <-stopChan:
log.Println("Received OS shutdown signal. Closing connection...")
ch.Close()
conn.Close()
return // Exit the loop and the program
case err := <-notifyClose:
log.Printf("Connection closed by broker: %v. Attempting to reconnect...", err)
ch.Close()
conn.Close()
// The outer loop will handle reconnection
}
}
}
The core of graceful handling is the conn.NotifyClose(make(chan *amqp.Error)). When RabbitMQ closes the connection (e.g., due to network issues, or if the broker is restarted and the consumer was transient), an amqp.Error will be sent to this channel. The select statement then catches this, logs the error, closes the current channel and connection, and the outer for {} loop immediately attempts to re-establish everything. This creates a resilient consumer that can withstand transient network problems or broker restarts.
The NotifyClose channel on the amqp.Channel itself can also be used, but NotifyClose on the amqp.Connection is generally preferred as it signals the underlying connection closure, which will cascade to all channels.
The most surprising thing about RabbitMQ consumer cancellations is that they are a protocol-level event initiated by the broker, not an application-level timeout or error. Your consumer is effectively being told, "I’m done with you."
The ch.Consume function returns a <-chan Delivery. When the broker cancels the consumer, this channel is closed. The for d := range msgs loop will naturally exit when the channel is closed. The trick is knowing why it closed. If it closed due to an error (e.g., network, broker restart), you want to reconnect. If it closed because your application decided to stop (e.g., received an OS signal), you just exit.
The conn.NotifyClose channel provides the explicit signal from the broker. Without it, you’d only see the msgs channel close, and you’d have to infer the reason for the disconnection, potentially leading to incorrect retry logic.
If you were to only handle the msgs channel closing without checking NotifyClose, you wouldn’t know if the broker disconnected you or if your application logic decided to stop consuming. This would make it hard to distinguish between a temporary network blip requiring a reconnect, and a deliberate shutdown.
The next thing to consider is managing prefetch counts (ch.Qos) to prevent overwhelming your consumer and to ensure that when a consumer does disconnect, RabbitMQ doesn’t have too many unacknowledged messages to re-queue.