Redpanda Connect Pipelines, powered by Benthos, lets you build robust ETL workflows directly within Redpanda. The surprising truth is that Benthos isn’t just a data transformation tool; it’s a declarative stream processing engine that treats your data pipelines as first-class citizens, allowing for complex logic and resilience without external orchestration.
Let’s see it in action. Imagine you have a Kafka topic named raw_sales containing JSON objects like this:
{"product_id": "A123", "quantity": 2, "timestamp": "2023-10-27T10:00:00Z"}
{"product_id": "B456", "quantity": 1, "timestamp": "2023-10-27T10:01:30Z"}
You want to transform this into a new topic, processed_sales, with enriched data, perhaps adding a sale_value field calculated as quantity * price. You’d define a Benthos configuration (a "pipeline") like this:
# benthos.yaml
input:
kafka:
addresses:
- kafka:9092
topics:
- raw_sales
consumer_group: benthos_etl_group
commit_offset: true
pipeline:
processors:
- json:
# no specific config needed here, just parses JSON
- metadata:
set:
# Mocking a price lookup for demonstration
price: ${!json("product_id") == "A123" ? 10.50 : 25.00}
- calculation:
parts:
- target: sale_value
value: ${!json("quantity") * !json("price")}
- static:
add:
processed_timestamp: ${!time("2006-01-02T15:04:05Z")}
output:
kafka:
addresses:
- kafka:9092
topic: processed_sales
When you run this Benthos pipeline, it continuously polls raw_sales, applies the transformations, and writes to processed_sales. The output processed_sales topic would contain:
{"product_id": "A123", "quantity": 2, "timestamp": "2023-10-27T10:00:00Z", "sale_value": 21.00, "processed_timestamp": "2023-10-27T10:15:00Z"}
{"product_id": "B456", "quantity": 1, "timestamp": "2023-10-27T10:01:30Z", "sale_value": 25.00, "processed_timestamp": "2023-10-27T10:15:00Z"}
The core problem Benthos solves is the complexity of building reliable, scalable data ingestion and transformation pipelines. Traditionally, this involves separate services for ingestion, transformation (e.g., Spark, Flink), and output, each needing its own deployment, monitoring, and orchestration. Benthos consolidates this into a single, declarative configuration file that runs as a service.
Internally, Benthos operates on a message-centric model. Each message flows through a series of "inputs," "processors," and "outputs." Inputs read data, processors transform it (parsing, enriching, filtering, etc.), and outputs send it to its destination. The pipeline section in the configuration is a list of processors that are executed sequentially for each message. Benthos handles batching, retries, and acknowledgments automatically based on the input and output configurations.
The levers you control are primarily within the Benthos configuration. The input section defines where data comes from (Kafka, S3, HTTP, etc.) and how it’s consumed. The pipeline section, with its vast array of processors (like json, metadata, calculation, template, filter, rate_limit, cache, http, aws_lambda), defines the transformation logic. The output section dictates where the processed data goes. You can also configure resources for caching, metrics, and tracing.
A key aspect that many overlook is Benthos’s ability to manage distributed tracing and metrics. By configuring the http or prometheus exporter within the metrics section, and the trace section with providers like otel, you can gain deep visibility into your pipeline’s performance and identify bottlenecks. This is crucial for understanding message latency through each processor and for debugging complex flows.
The next step is to explore how to implement error handling and dead-letter queues for messages that fail processing.