Pulsar’s batch producer doesn’t actually send messages one by one in the background; it groups them into larger chunks for efficiency.
Let’s see it in action. Imagine you have a simple Pulsar producer that’s just spitting out messages as fast as it can. Without batching, each message would be a separate network request, a separate entry in Pulsar’s ledger, and a separate acknowledgment to handle. That’s a lot of overhead.
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
public class SimpleProducer {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://public/default/my-topic")
.create();
for (int i = 0; i < 1000; i++) {
String message = "Message " + i;
producer.send(message);
// Without batching, each send() is a separate network operation
}
producer.close();
client.close();
}
}
Now, let’s enable batching. The ProducerBuilder has options to control this. The key parameters are batchingMaxPublishDelay and batchingMaxMessages.
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import java.util.concurrent.TimeUnit;
public class BatchedProducer {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://public/default/my-topic")
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) // Max time to wait for a batch
.batchingMaxMessages(100) // Max messages per batch
.create();
for (int i = 0; i < 1000; i++) {
String message = "Message " + i;
producer.send(message);
// Messages are now buffered and sent in batches
}
producer.close();
client.close();
}
}
Here’s how it works internally: when you call producer.send(message), the message isn’t immediately sent to the broker. Instead, it’s added to an in-memory buffer managed by the producer. This buffer is then sent to the broker as a single, larger "batch" when one of two conditions is met:
batchingMaxMessagesreached: The buffer fills up with the configured maximum number of messages.batchingMaxPublishDelayreached: The configured maximum time has elapsed since the first message was added to the buffer, even if it’s not full.
This batching mechanism drastically reduces the overhead per message. Instead of thousands of individual network round trips, acknowledgments, and ledger entries, you have a much smaller number of larger operations. This translates to higher throughput and lower latency for applications that send many small messages.
Pulsar also supports compression within these batches. When a batch is ready to be sent, you can configure the producer to compress the entire batch as a single unit. This further reduces the amount of data transferred over the network. You configure this using compressionType and compressionMaxBytesPerRow on the ProducerBuilder.
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import java.util.concurrent.TimeUnit;
public class BatchedCompressedProducer {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://public/default/my-topic")
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.batchingMaxMessages(100)
.compressionType(CompressionType.LZ4) // Choose your compression codec
.compressionMaxBytesPerRow(1024 * 1024) // Optional: Max uncompressed size of a single message to compress
.create();
for (int i = 0; i < 1000; i++) {
String message = "Message " + i;
producer.send(message);
}
producer.close();
client.close();
}
}
The compressionType can be LZ4, ZLIB, ZSTD, or SNAPPY. compressionMaxBytesPerRow is a bit of a misnomer; it’s more about limiting the uncompressed size of an individual message that will be included in a compressed batch. If a single message is larger than this threshold, it might not be compressed or could be handled specially depending on the codec. For most use cases with typical message sizes, it’s fine to leave it at a reasonable value like 1MB.
When compression is enabled, the producer compresses the entire batch of messages before sending it to the broker. The broker then decompresses it upon receipt. This is highly effective for reducing network bandwidth, especially when dealing with repetitive data.
The most surprising thing about Pulsar’s batch producer is that it doesn’t just send messages in batches; it can also compress those batches before sending, effectively reducing network traffic and improving overall throughput. The batchingMaxMessages and batchingMaxPublishDelay settings work together to determine when a batch is flushed, allowing you to trade off latency for efficiency. If you set batchingMaxPublishDelay to 0, you disable the time-based flushing, and batches will only be sent when batchingMaxMessages is reached, which can lead to higher latency but potentially more efficient batching.
The next hurdle is understanding how consumers handle these compressed batches and what happens during retries.