The most surprising thing about RabbitMQ exchange types is that they don’t actually route messages themselves; they delegate that job to the bindings and their associated routing keys.
Let’s see this in action. Imagine we have a producer sending messages and multiple consumers wanting to receive them. We need a way to direct these messages from the producer to the right consumers. This is where exchanges and their types come in.
Fanout Exchange
A fanout exchange is the simplest. It broadcasts every message it receives to all the queues bound to it, regardless of any routing key. Think of it like a loudspeaker system – everyone connected hears the announcement.
Scenario: You have a real-time notification system where every connected client needs to receive every update.
Producer:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
message = "New user registered!"
channel.basic_publish(exchange='notifications',
routing_key='', # Routing key is ignored for fanout
body=message)
print(f" [x] Sent '{message}'")
connection.close()
Consumer 1:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='notifications', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Consumer 2: (Identical to Consumer 1, but will get its own exclusive queue)
When the producer sends a message to the notifications fanout exchange, both consumers will receive it. The routing_key in basic_publish is ignored.
Direct Exchange
A direct exchange routes messages to queues whose binding key exactly matches the message’s routing key. This is like a direct phone call – you dial a specific number, and only the person at that number answers.
Scenario: You have a system where different types of logs (e.g., "info", "warning", "error") need to go to specific log handlers.
Producer:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='direct')
severity = 'info' # Could be 'warning' or 'error'
message = f"This is an {severity} message."
channel.basic_publish(exchange='logs',
routing_key=severity, # Routing key is crucial here
body=message)
print(f" [x] Sent '{severity}': '{message}'")
connection.close()
Consumer (for 'info' logs):
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = ['info'] # This consumer only wants 'info' logs
for severity in severities:
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key=severity) # Binding key matches routing key
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received '{method.routing_key}': {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Consumer (for 'error' logs): (Same as above, but severities = ['error'])
If the producer sends a message with routing_key='info', only the consumer bound with routing_key='info' will receive it. A message with routing_key='error' goes to the consumer bound with routing_key='error'. If a message’s routing key doesn’t match any binding key, it’s dropped unless a fallback queue is configured.
Topic Exchange
A topic exchange routes messages to queues based on a pattern matching between the routing key and the binding key. The routing key is a string that can contain words separated by dots (e.g., kern.critical, user.login.new). The binding key is a pattern that can use * (matches exactly one word) and # (matches zero or more words). This is the most flexible, allowing for complex routing logic.
Scenario: You want to route messages based on both a topic and a severity, allowing for flexible subscription patterns.
Producer:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# Example routing keys
routing_key_1 = 'kern.critical'
routing_key_2 = 'user.login.new'
routing_key_3 = 'user.activity'
message_1 = f"Kernel panic: {routing_key_1}"
message_2 = f"New user logged in: {routing_key_2}"
message_3 = f"User performed an action: {routing_key_3}"
channel.basic_publish(exchange='topic_logs', routing_key=routing_key_1, body=message_1)
print(f" [x] Sent '{routing_key_1}': '{message_1}'")
channel.basic_publish(exchange='topic_logs', routing_key=routing_key_2, body=message_2)
print(f" [x] Sent '{routing_key_2}': '{message_2}'")
channel.basic_publish(exchange='topic_logs', routing_key=routing_key_3, body=message_3)
print(f" [x] Sent '{routing_key_3}': '{message_3}'")
connection.close()
Consumer (receives all user-related logs):
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# Binding key: 'user.*' will match 'user.login.new', 'user.activity', etc.
binding_key = 'user.*'
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for user logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received '{method.routing_key}': {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Consumer (receives all critical logs, from anywhere):
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# Binding key: '#.critical' will match 'kern.critical', 'user.critical', etc.
binding_key = '#.critical'
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for critical logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received '{method.routing_key}': {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
When routing_key='kern.critical' is published, the second consumer bound with '#.critical' will receive it. When routing_key='user.activity' is published, the first consumer bound with 'user.*' will receive it. A message with routing_key='user.login.new' will be received by both consumers because it matches both 'user.*' and '#.critical' if you were to bind it to the second consumer as well.
The core concept is how the binding key on a queue dictates which messages, based on the routing key of the incoming message, will be delivered to that queue by the exchange. A fanout exchange essentially has an implicit binding for every queue, ignoring the routing key. A direct exchange uses exact matches. A topic exchange uses pattern matching.
What most people don’t realize is that a single queue can be bound to multiple exchanges, or to the same exchange multiple times with different binding keys, allowing for very granular message consumption. You can also bind multiple queues to the same exchange with the same or different binding keys.
The next step is understanding how to manage message acknowledgments and dead-lettering to build resilient message processing pipelines.