The Pulsar broker failed to assign partitions to your consumer because the consumer’s subscription state was unexpectedly removed from the broker’s internal tracking.

This usually happens because the consumer has been idle for too long and was garbage collected, or because there was a network partition that led the broker to believe the consumer had died.

Here are the most common reasons and how to fix them:

1. Consumer Idle Timeout: Pulsar brokers have a configurable brokerUnackedMessageTimeoutMs (defaulting to 5 minutes). If a consumer doesn’t acknowledge any messages or send heartbeats within this period, the broker considers it inactive and might clean up its resources, including its subscription state.

  • Diagnosis: Check your Pulsar broker configuration for brokerUnackedMessageTimeoutMs. On the client side, you can see if your consumer is actively processing messages or if there are long pauses.
  • Fix: Increase brokerUnackedMessageTimeoutMs in your broker.conf or set ackTimeout on your consumer to a value greater than your typical longest processing time between acknowledgements. For example, to set it to 10 minutes:
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .ackTimeout(600, TimeUnit.SECONDS) // 10 minutes
        .subscribe();
    
  • Why it works: This prevents the broker from prematurely assuming the consumer is dead due to legitimate processing delays.

2. Network Instability / Broker Restart: Transient network issues can cause the broker to lose contact with the consumer. If the broker doesn’t receive heartbeats for a configured period (heartbeatIntervalMs for the broker, and the client’s keepAliveInterval settings), it will assume the consumer has disconnected. A broker restart can also cause a temporary loss of state.

  • Diagnosis: Examine broker logs for messages related to disconnected clients or subscription cleanup. Check network connectivity between your Pulsar brokers and your consumer instances.
  • Fix: Ensure your network is stable. On the client, configure keepAliveInterval and connectionTimeoutMs in your ConsumerBuilder. For example:
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .keepAliveInterval(60, TimeUnit.SECONDS) // Client sends heartbeats every 60s
        .connectionTimeout(20, TimeUnit.SECONDS) // Wait 20s for connection
        .subscribe();
    
    Also, ensure your broker heartbeatIntervalMs (in broker.conf) is reasonably set, often around 30 seconds.
  • Why it works: More frequent client heartbeats and longer connection timeouts make the system more resilient to brief network glitches.

3. Large Message Processing / Batching Delays: If your consumer processes messages in large batches or takes a very long time to process a single message, it might not acknowledge messages (or send heartbeats implicitly via acknowledgements) within the timeout period.

  • Diagnosis: Monitor your consumer’s message processing rate and acknowledge latency. If processing time per message or batch exceeds ackTimeout or brokerUnackedMessageTimeoutMs, this is likely the cause.
  • Fix:
    • Client-side: Increase ackTimeout on the consumer (as in point 1).
    • Client-side: If using auto-acknowledgement, consider disabling it and manually acknowledging messages after processing to ensure the broker knows the consumer is active.
    • Server-side: Increase brokerUnackedMessageTimeoutMs on the broker.
    • Application Logic: Optimize message processing to be faster or implement periodic "no-op" acknowledgements if processing is inherently long.
  • Why it works: Explicitly telling the broker that processing is ongoing, even if slow, keeps the subscription state alive.

4. Subscription State Corruption (Rare): In very rare cases, the internal state of the subscription on the broker can become corrupted, leading to it being incorrectly marked for cleanup or removal.

  • Diagnosis: This is hard to diagnose directly. Look for unusual patterns in broker logs related to subscription management or inconsistent behavior across multiple consumers on the same subscription.
  • Fix: The most reliable fix is to delete and recreate the subscription. This forces Pulsar to re-establish the subscription state from scratch.
    # Example using Pulsar Admin CLI
    pulsar-admin subscriptions delete my-tenant/my-namespace/my-topic my-subscription
    # Then restart your consumer(s) to have them re-create the subscription
    
  • Why it works: It resets the subscription’s internal state, discarding any potentially corrupted data.

5. Insufficient Broker Resources: If brokers are under heavy load (CPU, memory, network), they might struggle to keep up with managing consumer states and heartbeats, leading to timeouts and assignment errors.

  • Diagnosis: Monitor the CPU, memory, and network utilization of your Pulsar brokers. Look for high latency in broker request processing.
  • Fix: Scale up your broker resources (more CPU, RAM) or add more broker instances to your cluster. Optimize broker configuration parameters like nettyMaxConcurrentRequests or maxConcurrentLookupRequests.
  • Why it works: Ensuring brokers have adequate resources allows them to perform their state management duties reliably.

6. Incorrect subscriptionInitialSequenceNumber Configuration: If you are manually setting subscriptionInitialSequenceNumber when creating a subscription (e.g., for specific replay scenarios) and it’s set incorrectly or to a sequence number that doesn’t exist, it can cause issues during subscription creation and assignment.

  • Diagnosis: Check if subscriptionInitialSequenceNumber is being used when creating the subscription. Verify the sequence number against the topic’s message IDs.
  • Fix: Ensure subscriptionInitialSequenceNumber is either not set (letting Pulsar pick the latest or earliest based on subscription type) or is set to a valid, existing message ID. If you want to start from the beginning, use MessageId.earliest and if from the end, MessageId.latest.
    // Example for starting from the earliest
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscribe();
    
  • Why it works: Correctly initializing the subscription’s starting point prevents internal state mismatches.

After addressing these, the next error you’ll likely encounter if the underlying issue persists is related to connection refused or a different kind of "consumer is not found" error, indicating the broker is still not seeing the consumer as a valid participant.

Want structured learning?

Take the full Pulsar course →