The most surprising thing about Pulsar Functions is that they are designed to be stateless, yet they can process events that are inherently stateful.

Let’s see this in action. Imagine we have a Pulsar topic input-topic with incoming JSON messages like this:

{"user_id": "user123", "event_type": "login", "timestamp": 1678886400}
{"user_id": "user456", "event_type": "click", "timestamp": 1678886410}
{"user_id": "user123", "event_type": "logout", "timestamp": 1678886420}
{"user_id": "user123", "event_type": "login", "timestamp": 1678886500}

We want to calculate the session duration for each user. A "session" is defined as the time between a login event and the next logout event for the same user. This clearly requires keeping track of the last login timestamp for each user_id.

Here’s a Python Pulsar Function that does exactly this:

import pulsar
from pulsar import Function
import time

class SessionDurationFunction(Function):
    def __init__(self):
        self.user_sessions = {} # This is our "state"

    def process(self, input, context):
        data = input.decode('utf-8')
        try:
            import json
            record = json.loads(data)
            user_id = record.get('user_id')
            event_type = record.get('event_type')
            timestamp = record.get('timestamp')

            if not user_id or not event_type or timestamp is None:
                context.get_logger().warn(f"Skipping malformed record: {data}")
                return

            if event_type == 'login':
                self.user_sessions[user_id] = timestamp
                context.get_logger().info(f"User {user_id} logged in at {timestamp}")
            elif event_type == 'logout':
                if user_id in self.user_sessions:
                    login_timestamp = self.user_sessions[user_id]
                    session_duration = timestamp - login_timestamp
                    context.get_logger().info(f"User {user_id} session duration: {session_duration} seconds")
                    # Optionally, emit this result to another topic
                    # context.get_output_topic().publish(f"user:{user_id},duration:{session_duration}".encode('utf-8'))
                    del self.user_sessions[user_id] # Clear state after logout
                else:
                    context.get_logger().warn(f"User {user_id} logged out without a prior login.")
            else:
                # Handle other event types if necessary
                pass

        except json.JSONDecodeError:
            context.get_logger().error(f"Failed to decode JSON: {data}")
        except Exception as e:
            context.get_logger().error(f"An error occurred: {e}")

To deploy this function, you’d use the Pulsar Functions CLI:

pulsar-admin functions create \
  --py session_duration_function.py \
  --classname session_duration_function.SessionDurationFunction \
  --inputs persistent://public/default/input-topic \
  --name session-duration-calculator \
  --tenant public \
  --namespace default \
  --parallelism 1 \
  --memory 128M

When the function processes the example input, you’d see logs like:

INFO [session-duration-calculator] User user123 logged in at 1678886400
INFO [session-duration-calculator] User user456 logged in at 1678886410
INFO [session-duration-calculator] User user123 session duration: 20 seconds
INFO [session-duration-calculator] User user123 logged in at 1678886500

The self.user_sessions dictionary in the __init__ method is where the function stores its state. This dictionary maps user_id to their last known login timestamp. When a logout event arrives, the function looks up the user_id in this dictionary, calculates the duration, and then removes the entry. This removal is crucial for resetting the session state.

The core problem Pulsar Functions solve is enabling event-driven processing without requiring you to manage a separate state store for simple aggregations or transformations. The "stateless" aspect refers to the fact that the Pulsar Functions runtime itself doesn’t persist this state across function restarts or scaling events. If the function process crashes and restarts, the self.user_sessions dictionary will be empty.

This is where the magic of Pulsar’s message acknowledgment and consumer group semantics comes into play. Pulsar Functions are integrated with Pulsar consumers. When a function processes a message and returns successfully, Pulsar acknowledges that message. If the function crashes before returning successfully, the message is not acknowledged and will be redelivered to another instance of the function (or the same one if it restarts).

However, for stateful operations like session tracking, simply relying on redelivery isn’t enough. If a function restarts, it loses its self.user_sessions dictionary. To handle this, Pulsar Functions offer built-in state management via stateful processing. You can declare a state store within your function’s configuration.

Let’s re-write the function to use Pulsar’s managed state:

import pulsar
from pulsar import Function
import time

class SessionDurationFunctionManagedState(Function):
    def __init__(self):
        # State is managed by Pulsar, not local variables
        pass

    def process(self, input, context):
        data = input.decode('utf-8')
        try:
            import json
            record = json.loads(data)
            user_id = record.get('user_id')
            event_type = record.get('event_type')
            timestamp = record.get('timestamp')

            if not user_id or not event_type or timestamp is None:
                context.get_logger().warn(f"Skipping malformed record: {data}")
                return

            # Accessing state via context.state
            if event_type == 'login':
                context.state.put(user_id, str(timestamp)) # Store timestamp as string
                context.get_logger().info(f"User {user_id} logged in at {timestamp}")
            elif event_type == 'logout':
                login_timestamp_str = context.state.get(user_id)
                if login_timestamp_str:
                    login_timestamp = int(login_timestamp_str)
                    session_duration = timestamp - login_timestamp
                    context.get_logger().info(f"User {user_id} session duration: {session_duration} seconds")
                    # context.get_output_topic().publish(f"user:{user_id},duration:{session_duration}".encode('utf-8'))
                    context.state.delete(user_id) # Remove state for this user
                else:
                    context.get_logger().warn(f"User {user_id} logged out without a prior login.")
            else:
                pass

        except json.JSONDecodeError:
            context.get_logger().error(f"Failed to decode JSON: {data}")
        except Exception as e:
            context.get_logger().error(f"An error occurred: {e}")

And deploy it with state store configuration:

pulsar-admin functions create \
  --py session_duration_function_managed.py \
  --classname session_duration_function_managed.SessionDurationFunctionManagedState \
  --inputs persistent://public/default/input-topic \
  --name session-duration-calculator-managed \
  --tenant public \
  --namespace default \
  --parallelism 1 \
  --memory 128M \
  --state-storage-config '{"type":"bookkeeper"}' # or "zookeeper"

Here, context.state.put(), context.state.get(), and context.state.delete() interact with an underlying state store (like Apache BookKeeper or ZooKeeper) managed by Pulsar. This state is durable and will survive function restarts and scaling. The "stateless" nature of the function code itself means you write it as if it were stateless, and Pulsar handles the state persistence and recovery for you transparently.

The secret sauce is that Pulsar Functions leverage the Pulsar client library’s internal state management capabilities, which are designed to work with Pulsar’s distributed nature. When you use context.state, Pulsar ensures that state updates are atomically applied and recoverable. The underlying state store is typically BookKeeper, which provides strong consistency guarantees. This means that even if a function instance fails, another instance can pick up where it left off, and the state will be consistent.

The next concept you’ll want to explore is how to handle multiple outputs from a single Pulsar Function, allowing you to route processed events to different downstream topics based on their content or processing outcome.

Want structured learning?

Take the full Pulsar course →