Pulsar IO connectors don’t just move data; they orchestrate the entire lifecycle of data flowing into and out of Pulsar, acting as the unacknowledged guardians of your streaming data pipelines.
Let’s look at a Pulsar IO Source connector, specifically the File source, which reads data from local files and publishes it to Pulsar topics. Imagine you have a directory of log files that you want to stream into Pulsar for real-time processing.
# A basic File Source connector configuration
tenant: "my-tenant"
namespace: "my-namespace"
name: "file-source-connector"
archive: "streamnative-connectors/pulsar-io-file.nar" # Path to the connector archive
configs:
topic: "persistent://my-tenant/my-namespace/logs" # The Pulsar topic to publish to
inputDirectory: "/path/to/your/log/files" # Directory containing the log files
fileNamePattern: ".*\\.log$" # Regex to match log files
# Optional: Specify a format if your logs are not plain text
# format: "json"
# Optional: Control how often the connector checks for new files
# pollingFrequencyMs: 5000
When this connector runs, it periodically scans the inputDirectory for files matching fileNamePattern. For each new file it finds, it reads the content line by line (or by a configured delimiter) and publishes each record to the topic. The connector keeps track of which files it has processed and where it left off within each file, ensuring no data is lost even if the connector restarts.
Now, let’s consider a Pulsar IO Sink connector, like the Redis sink, which consumes data from a Pulsar topic and writes it to a Redis instance. This is useful for scenarios where you want to store streaming data in a fast key-value store for quick lookups or caching.
# A basic Redis Sink connector configuration
tenant: "my-tenant"
namespace: "my-namespace"
name: "redis-sink-connector"
archive: "streamnative-connectors/pulsar-io-redis.nar" # Path to the connector archive
configs:
topic: "persistent://my-tenant/my-namespace/processed-data" # The Pulsar topic to consume from
redisHost: "localhost" # Redis server hostname
redisPort: 6379 # Redis server port
# How to map Pulsar messages to Redis keys and values
# This example assumes messages are JSON and we want to use a 'key' field as the Redis key
# and the entire message as the Redis value.
key: "key"
value: "" # Empty string means use the entire message as value
# Optional: Specify the Redis command to use
# command: "SET"
# Optional: Control how often the connector writes to Redis (batching)
# batchSize: 100
# batchDurationMs: 1000
This Redis sink connector subscribes to the processed-data topic. As messages arrive, it extracts the key field from each message (assuming it’s JSON) and uses that as the Redis key. The entire message content becomes the Redis value, which is then written to Redis using the SET command by default. This allows you to populate a Redis cache with real-time results from your Pulsar processing.
The core problem Pulsar IO connectors solve is the boilerplate of integrating Pulsar with external systems. Instead of writing custom producer/consumer code for every database, message queue, or cloud storage service, you leverage pre-built connectors. This dramatically reduces development time and effort.
Internally, both source and sink connectors run as separate processes, often managed by Pulsar’s IO framework. This framework handles deployment, scaling, and fault tolerance. When you deploy a connector, you’re essentially deploying a specialized application that interacts with Pulsar’s core messaging capabilities. Source connectors act as producers, writing data into Pulsar topics, while sink connectors act as consumers, reading data from Pulsar topics. The framework ensures that these connectors are aware of Pulsar’s internal state, such as topic partitions and consumer group offsets, allowing them to resume operations seamlessly after failures.
The magic of connectors lies in their ability to abstract away the complexities of both Pulsar and the external system. For a source connector, it needs to understand how to poll for new data in its source system, how to parse that data into Pulsar messages (including keying and partitioning), and how to efficiently produce those messages to Pulsar. For a sink connector, it must consume messages from Pulsar, understand how to transform them for the target system, and then execute the appropriate write operations, handling potential errors and retries. The IO framework provides the underlying mechanisms for this, including managing connections, handling acknowledgments, and ensuring exactly-once or at-least-once processing semantics where applicable.
What most people miss is that the archive path for a connector is not just a file path; it’s an entry point into Pulsar’s plugin loading mechanism. When you specify an archive, Pulsar looks for a Java Archive (JAR) or a native archive file that contains the connector’s implementation. This archive is typically found in the connectors directory of your Pulsar installation or can be a fully qualified path to a custom-built connector. The streamnative-connectors/pulsar-io-file.nar example assumes you’ve downloaded the connector NAR file into a specific subdirectory relative to where the Pulsar IO process is running. If you’re using Pulsar’s managed connectors or a distributed setup, this archive needs to be accessible by all Pulsar IO worker nodes.
The next step after configuring basic connectors is to explore advanced features like schema handling, dead-letter queues for failed messages, and custom transformations within connectors.