Redpanda’s WASM transforms let you run arbitrary code directly within the broker, transforming data as it flows through Kafka topics.

Let’s see it in action. Imagine you have a stream of raw JSON events from your application, and you want to enrich them with a timestamp and filter out any events that don’t have a user_id.

First, you need a WASM module. Here’s a simple Rust program that does the job, compiled to WASM:

#[no_mangle]
pub extern "C" fn transform_record(
    payload_ptr: *mut u8,
    payload_len: usize,
    key_ptr: *mut u8,
    key_len: usize,
) -> i64 {
    // Safety: We assume the pointers are valid and point to allocated memory
    // with the given lengths. In a real-world scenario, you'd have more robust
    // error handling and memory management.
    let payload_slice = unsafe { std::slice::from_raw_parts_mut(payload_ptr, payload_len) };
    let key_slice = unsafe { std::slice::from_raw_parts_mut(key_ptr, key_len) };

    // Convert payload to a mutable string
    let payload_str = match std::str::from_utf8_mut(payload_slice) {
        Ok(s) => s,
        Err(_) => return -1, // Indicate error
    };

    // Parse JSON
    let mut record: serde_json::Value = match serde_json::from_str(payload_str) {
        Ok(v) => v,
        Err(_) => return -1, // Indicate error
    };

    // Add timestamp if it doesn't exist
    if record.get("timestamp").is_none() {
        record["timestamp"] = serde_json::Value::String(
            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
        );
    }

    // Filter out records without a user_id
    if record.get("user_id").is_none() {
        return -2; // Special code to indicate record should be dropped
    }

    // Serialize back to JSON
    let new_payload = match serde_json::to_string(&record) {
        Ok(s) => s,
        Err(_) => return -1, // Indicate error
    };

    // Copy the new payload back into the provided buffer
    let new_payload_bytes = new_payload.as_bytes();
    if new_payload_bytes.len() > payload_len {
        return -1; // Buffer too small
    }
    payload_slice[..new_payload_bytes.len()].copy_from_slice(new_payload_bytes);

    // Return the new length of the payload
    new_payload_bytes.len() as i64
}

You’d compile this using cargo build --target wasm32-unknown-unknown --release and then upload the resulting .wasm file to Redpanda.

Next, you configure a transform in Redpanda. This is done by creating a RedpandaTransform custom resource in Kubernetes, or by using the rpk transform command-line tool. Using rpk:

rpk transform create my-wasm-transform \
  --wasm-file ./target/wasm32-unknown-unknown/release/your_wasm_crate.wasm \
  --input-topic raw-events \
  --output-topic processed-events \
  --entrypoint transform_record \
  --format json

Here:

  • my-wasm-transform is the name of your transform.
  • --wasm-file points to your compiled WASM module.
  • --input-topic is the topic Redpanda will read from.
  • --output-topic is where the transformed records will go.
  • --entrypoint specifies the function within your WASM module to execute.
  • --format json tells Redpanda to expect and produce JSON.

Now, when data is published to raw-events, Redpanda will:

  1. Read a record.
  2. Load your WASM module (if not already loaded).
  3. Call the transform_record function, passing a mutable pointer to the record’s value and key, along with their lengths.
  4. The WASM code executes: it parses the JSON, adds a timestamp, checks for user_id, and serializes it back.
  5. If transform_record returns a positive i64, that’s the new length of the record’s value. Redpanda then writes the modified record to processed-events.
  6. If transform_record returns -2, Redpanda drops the record entirely.
  7. If transform_record returns -1 (or any negative value other than -2), an error is logged, and the record is typically dropped or sent to a dead-letter queue depending on configuration.

The core problem this solves is moving data processing logic as close to the data source as possible, eliminating the need for a separate stream processing cluster (like Flink or Spark Streaming) for many common ETL tasks. This drastically reduces latency, operational overhead, and infrastructure costs. Redpanda’s WASM runtime is based on Wasmtime, offering a secure, sandboxed execution environment.

What most people don’t realize is that the WASM transform’s payload_ptr and key_ptr point directly into Redpanda’s internal memory buffers for the record. Your WASM code can directly modify these buffers in place. The return value is crucial: it indicates the new length of the payload. If you shrink the payload, the remaining bytes in the buffer are ignored. If you try to expand it beyond the original allocated space, it will fail, returning an error. This in-place modification is key to its performance and efficiency.

The next challenge is managing WASM module lifecycle and versioning across a distributed Redpanda cluster.

Want structured learning?

Take the full Redpanda course →