A Pulsar consumer can fall behind its topic’s producer, leading to a backlog of unacknowledged messages, which Pulsar calls "consumer lag" or "backpressure."
Let’s see it in action. Imagine a producer writing messages to a topic named persistent://public/default/my-topic.
# Producer sending messages
bin/pulsar-client produce -m "message-$(date +%s)" persistent://public/default/my-topic
# Consumer receiving messages
bin/pulsar-client consume -s -t 100000 persistent://public/default/my-topic
If the consumer is slow, it won’t acknowledge messages quickly enough. Pulsar’s broker, by default, will start throttling the producer to prevent the topic from growing indefinitely and consuming excessive memory. You’ll see this in the broker logs as messages related to "backlog" or "throttling."
The core problem is a mismatch between the rate at which messages are produced and the rate at which they can be processed and acknowledged by consumers. This can happen for many reasons:
1. Consumer Application Logic is Too Slow
This is the most common culprit. The code processing messages simply can’t keep up.
- Diagnosis:
- Monitor the consumer’s processing time per message. If it’s consistently higher than the message arrival rate, you have a bottleneck.
- Check Pulsar admin stats for consumer lag:
Look forbin/pulsar-admin topics stats persistent://public/default/my-topic -g my-subscription-namemsgBacklogandunackedMsgs. High values here indicate lag.
- Fix:
- Optimize Consumer Code: Profile your consumer application to find and fix performance bottlenecks. This might involve more efficient data parsing, database operations, or network calls.
- Increase Consumer Instances: If your consumer logic is already optimized, scale out by running more instances of your consumer application. Pulsar handles distributing partitions among them.
This works because Pulsar’s subscription types (exclusive, shared, failover) manage message distribution. Shared subscriptions, in particular, allow multiple consumers to receive messages from the same topic partition.# Example: Start multiple consumer instances java -jar my-consumer.jar --broker-url pulsar://localhost:6650 --topic persistent://public/default/my-topic --subscription my-subscription-name --consumer-id consumer-1 & java -jar my-consumer.jar --broker-url pulsar://localhost:6650 --topic persistent://public/default/my-topic --subscription my-subscription-name --consumer-id consumer-2 &
- Why it works: Adding more processing power or optimizing existing power allows messages to be acknowledged faster, reducing the backlog and unblocking the producer.
2. Insufficient Consumer Resources (CPU/Memory/Network)
Even with efficient code, a consumer might be starved of resources on its host machine.
- Diagnosis:
- Use system monitoring tools (
top,htop,docker stats, Kubernetes metrics) to check CPU, memory, and network I/O on the consumer’s host. - Look for high CPU utilization, memory swapping, or network saturation.
- Use system monitoring tools (
- Fix:
- Scale Up/Out Consumer Hosts: Provide more powerful machines for your consumers (scale up) or run more consumer instances on separate, adequately resourced machines (scale out).
# If using Kubernetes, adjust resource requests/limits for the consumer deployment # Example in deployment.yaml: resources: requests: cpu: "500m" memory: "1Gi" limits: cpu: "1" memory: "2Gi"
- Scale Up/Out Consumer Hosts: Provide more powerful machines for your consumers (scale up) or run more consumer instances on separate, adequately resourced machines (scale out).
- Why it works: Ensuring the consumer has enough CPU cycles, RAM, and network bandwidth allows it to execute its processing logic without being artificially slowed down by the operating system or hardware.
3. Network Latency Between Consumer and Broker
High network latency or packet loss can significantly slow down acknowledgments.
- Diagnosis:
- Use
pingandtraceroutefrom the consumer’s host to the Pulsar broker. - Monitor network metrics on the consumer host and any intermediate network devices.
- Use
- Fix:
- Co-locate Consumers and Brokers: Deploy consumers in the same network availability zone or region as the Pulsar brokers they connect to.
- Optimize Network Configuration: Ensure network paths are clear, firewalls are not introducing excessive delays, and consider using dedicated network links if possible.
- Why it works: Reducing the time it takes for acknowledgment messages to travel from the consumer to the broker and for new messages to reach the consumer directly impacts the end-to-end processing time.
4. Broker Configuration: maxConsumerAccumulatedMessages
Pulsar brokers have a limit on how many messages a single consumer can accumulate (i.e., have delivered but not yet acknowledged). If this limit is hit across multiple consumers for a partition, the broker will throttle the producer.
- Diagnosis:
- Check Pulsar broker configuration files (e.g.,
conf/broker.conf). - Look for the
maxConsumerAccumulatedMessagessetting. - Monitor broker logs for messages indicating throttling due to this limit.
- Check Pulsar broker configuration files (e.g.,
- Fix:
- Increase
maxConsumerAccumulatedMessages: This allows consumers more buffer space to hold messages before acknowledging them.
Restart the broker after changing this.# In conf/broker.conf maxConsumerAccumulatedMessages = 5000 # Default is 1000
- Increase
- Why it works: By increasing the allowed backlog per consumer, the broker is less likely to throttle producers when consumers experience temporary slowdowns, giving them more time to catch up before backpressure is applied.
5. Broker Configuration: backlogQuota
While not directly a consumer lag setting, if a topic’s backlog quota is exceeded due to slow consumers, Pulsar might start deleting old messages or blocking new ones, effectively creating backpressure.
- Diagnosis:
- Check topic-level or namespace-level backlog quotas:
bin/pulsar-admin topics get-backlog-quota persistent://public/default/my-topic bin/pulsar-admin namespaces get-backlog-quota public/default - Broker logs will show messages if quotas are being hit.
- Check topic-level or namespace-level backlog quotas:
- Fix:
- Increase Backlog Quota: If you intend to keep a larger backlog, increase the quota.
bin/pulsar-admin topics set-backlog-quota persistent://public/default/my-topic --limit 10G --policy delete # Or for a namespace bin/pulsar-admin namespaces set-backlog-quota public/default --limit 50G --policy delete - Adjust
policy: If the policy isdelete, it might be removing messages too quickly. Considerkeep_bothif you need to retain everything, but be mindful of disk space.
- Increase Backlog Quota: If you intend to keep a larger backlog, increase the quota.
- Why it works: This gives the topic more room to accumulate messages while slow consumers catch up, preventing premature message deletion or producer blocking due to storage limits.
6. Consumer Acknowledgment Strategy
The way consumers acknowledge messages impacts how quickly the broker considers them processed.
- Diagnosis:
- Review the consumer application’s acknowledgment logic. Is it acknowledging individual messages, batches, or only after a complex operation?
- Fix:
- Batch Acknowledgments: If your consumer logic allows, acknowledge messages in batches rather than individually. This reduces the network overhead and processing cost per message acknowledgment.
// Example using Java client acknowledging a batch Message<byte[]> msg = consumer.receive(); // ... process message ... consumer.acknowledgeCumulative(msg.getMessageId()); // Acknowledging all up to this message ID - Acknowledge Appropriately: Ensure you are acknowledging messages in a way that makes sense for your processing. For shared subscriptions, individual acknowledgments are often necessary if processing order within a partition isn’t guaranteed. For other subscription types, cumulative acknowledgments can be more efficient.
- Batch Acknowledgments: If your consumer logic allows, acknowledge messages in batches rather than individually. This reduces the network overhead and processing cost per message acknowledgment.
- Why it works: Batching acknowledgments reduces the chattiness between the consumer and the broker, allowing the consumer to spend more time on processing and less on reporting back. Cumulative acknowledgments are particularly efficient as a single ack covers potentially many messages.
If you’ve addressed all these points and are still seeing lag, investigate the specific subscription type you’re using, as Shared subscriptions have different backpressure dynamics than Exclusive or Failover.
The next error you’ll likely encounter is io.netty.handler.codec.TooLongFrameException if you’ve increased maxConsumerAccumulatedMessages too aggressively without sufficient memory on the broker.