The Redpanda Transform SDK in Go lets you write distributed stream processing logic that runs directly inside Redpanda, eliminating the need for a separate processing cluster.
Let’s see it in action. Imagine you have a Kafka topic named sensor-readings containing JSON messages like this:
{
"sensor_id": "A123",
"timestamp": 1678886400,
"value": 25.5,
"unit": "celsius"
}
You want to transform these readings, perhaps converting Celsius to Fahrenheit and filtering out readings that are too cold. Here’s a Go transform that does just that:
package main
import (
"encoding/json"
"fmt"
"github.com/redpanda-data/redpanda/src/go/transform"
)
type SensorReading struct {
SensorID string `json:"sensor_id"`
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Unit string `json:"unit"`
}
type TransformedReading struct {
SensorID string `json:"sensor_id"`
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Unit string `json:"unit"`
Processed bool `json:"processed"`
}
func main() {
transform.Serve(transform.ServeOptions{
FetchTopics: []string{"sensor-readings"},
Transform: func(ctx transform.TransformContext, record transform.Record) ([]transform.Record, error) {
var reading SensorReading
if err := json.Unmarshal(record.Value, &reading); err != nil {
return nil, fmt.Errorf("failed to unmarshal record value: %w", err)
}
var outputValue float64
outputUnit := reading.Unit
if reading.Unit == "celsius" {
outputValue = (reading.Value * 9 / 5) + 32
outputUnit = "fahrenheit"
} else {
outputValue = reading.Value // Assume already in desired unit if not celsius
}
if outputValue < 0 { // Filter out readings below 0 Fahrenheit
return nil, nil // Returning nil, nil drops the record
}
transformed := TransformedReading{
SensorID: reading.SensorID,
Timestamp: reading.Timestamp,
Value: outputValue,
Unit: outputUnit,
Processed: true,
}
outputBytes, err := json.Marshal(transformed)
if err != nil {
return nil, fmt.Errorf("failed to marshal transformed record: %w", err)
}
// Create a new record with the transformed data
newRecord := transform.Record{
Key: record.Key, // Preserve original key
Value: outputBytes,
Headers: record.Headers, // Preserve original headers
}
return []transform.Record{newRecord}, nil
},
})
}
This Go program, when compiled and deployed as a Redpanda transform, will read from sensor-readings. For each incoming record, it unmarshals the JSON, converts Celsius to Fahrenheit if necessary, filters out values below 0 Fahrenheit, and then marshals the result back into JSON. The transformed records are then published to an internal topic managed by Redpanda. You can then consume these transformed records from another topic, say transformed-sensor-readings.
The core problem Redpanda Transforms solve is the operational overhead and latency of a separate stream processing cluster. Instead of deploying Kafka Streams, Flink, or Spark Streaming applications on their own infrastructure, you package your processing logic as a Go binary that Redpanda itself loads and runs. This means your data transformations happen within the same process as your Kafka broker, drastically reducing network hops and simplifying deployment.
Internally, Redpanda uses a WebAssembly (WASM) runtime to execute these Go transforms. When you deploy a transform, Redpanda compiles your Go code into a WASM module. This module is then loaded into a sandboxed environment within the Redpanda process. The transform.Serve function is the entry point, and it expects you to provide a Transform function. This function receives transform.Record objects (which mirror Kafka records with Key, Value, and Headers) and should return a slice of transform.Record objects or nil to drop the record.
The transform.ServeOptions struct is where you configure your transform. FetchTopics specifies the input topic(s) the transform will consume from. The Transform function is the heart of your logic. It’s called for every record on the FetchTopics. Inside this function, you have access to the transform.TransformContext (for things like logging or accessing configuration) and the transform.Record itself. You can unmarshal the record.Value, perform your operations, and then marshal a new transform.Record to be returned. If you return an empty slice or nil for the record slice, the record is effectively dropped.
The transform.Record struct is quite flexible. You can modify the Value, but you can also preserve or modify the Key and Headers. This allows for complex routing and metadata manipulation. The transform.Serve function handles the entire lifecycle of reading from input topics, invoking your Transform function, and writing the results to an internal output topic that Redpanda then makes available for consumption.
A common pitfall is forgetting that Redpanda manages the output topic implicitly. When your Transform function returns a []transform.Record, Redpanda takes those records and publishes them to an internal topic associated with that specific transform. To consume these results, you’d typically create a new Kafka topic (e.g., transformed-sensor-readings) and then configure another transform, or a standard Kafka consumer, to read from this internal topic. The name of the internal topic is usually derived from the transform’s name and input topic, but it’s best to consult Redpanda’s documentation for the exact naming conventions as they can evolve.
The next step after building a basic transform is exploring how to manage multiple input topics and perform joins or aggregations across them.