The Pulsar Schema Registry isn’t just a place to store schemas; it’s an active enforcer of data contracts between your producers and consumers, preventing silent data corruption.

Let’s see this in action. Imagine a producer sending messages to a topic my-topic.

// Producer side
Schema<String> stringSchema = Schema.STRING;
Producer<String> producer = pulsarClient.newProducer(stringSchema)
    .topic("my-topic")
    .create();

producer.send("Hello, Pulsar!");
producer.close();

Now, if a consumer tries to read this with a different schema, say Avro, and the topic already has a schema registered (even if it’s just STRING from the previous producer), Pulsar will step in. If the STRING schema is registered and the producer tries to send Avro, it fails. If a consumer tries to read Avro without a compatible schema, it also fails.

The core problem Pulsar’s Schema Registry solves is data evolution and compatibility. Without it, a producer could update its message format, breaking all existing consumers, or vice-versa. This leads to silent data loss or corruption, which is incredibly hard to debug. The Schema Registry acts as a central authority, ensuring that all producers and consumers on a topic adhere to a mutually agreed-upon data format.

Here’s how it works internally:

  1. Schema Registration: When a producer or consumer connects to a topic, it checks if a schema is registered. If not, or if the client has a new schema, it can register it. Pulsar stores these schemas, typically associated with the topic.
  2. Schema Validation: For each message sent, the producer’s schema is implicitly or explicitly checked against the schema registered for the topic. If they don’t match, and compatibility rules (like backward or forward compatibility) aren’t met, the send operation fails.
  3. Schema Encoding/Decoding: When a message is sent, the producer serializes the data according to the registered schema. When a consumer receives it, it deserializes the data using the topic’s registered schema. This ensures data is consistently interpreted.

Pulsar supports various schema types, with Avro and Protobuf being prominent for structured data.

Avro: Avro is a row-based data serialization system. It’s known for its rich data structures and schema evolution capabilities.

  • Schema Definition (Avro):
    {
      "type": "record",
      "name": "User",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number", "type": "int"},
        {"name": "favorite_color", "type": "string", "default": "blue"}
      ]
    }
    
  • Producer using Avro:
    // Producer side with Avro
    Schema<GenericRecord> avroSchema = Schema.AVRO(User.class); // Assuming User is a generated class from your Avro schema
    Producer<GenericRecord> avroProducer = pulsarClient.newProducer(avroSchema)
        .topic("user-topic")
        .create();
    
    // Create a GenericRecord or use your generated class
    GenericRecord userRecord = new GenericRecordBuilder(avroSchema.getSchema())
        .set("name", "Alice")
        .set("favorite_number", 42)
        .build();
    
    avroProducer.send(userRecord);
    avroProducer.close();
    
  • Consumer using Avro:
    // Consumer side with Avro
    Schema<GenericRecord> avroSchema = Schema.AVRO(User.class);
    Consumer<GenericRecord> avroConsumer = pulsarClient.newConsumer(avroSchema)
        .topic("user-topic")
        .subscriptionName("my-sub")
        .subscribe();
    
    Message<GenericRecord> msg = avroConsumer.receive();
    GenericRecord receivedUser = msg.getValue();
    System.out.println("Received: " + receivedUser.get("name"));
    avroConsumer.acknowledge(msg);
    avroConsumer.close();
    

Protobuf: Protocol Buffers are a language-neutral, platform-neutral, extensible mechanism for serializing structured data.

  • Schema Definition (.proto file):
    syntax = "proto3";
    
    package mypackage;
    
    message Person {
      string name = 1;
      int32 id = 2;
      string email = 3;
    }
    
  • Producer using Protobuf:
    // Producer side with Protobuf
    Schema<Person> protobufSchema = Schema.PROTOBUF(Person.class); // Assuming Person is generated from your .proto file
    Producer<Person> protobufProducer = pulsarClient.newProducer(protobufSchema)
        .topic("person-topic")
        .create();
    
    Person person = Person.newBuilder()
        .setName("Bob")
        .setId(123)
        .setEmail("bob@example.com")
        .build();
    
    protobufProducer.send(person);
    protobufProducer.close();
    
  • Consumer using Protobuf:
    // Consumer side with Protobuf
    Schema<Person> protobufSchema = Schema.PROTOBUF(Person.class);
    Consumer<Person> protobufConsumer = pulsarClient.newConsumer(protobufSchema)
        .topic("person-topic")
        .subscriptionName("my-sub")
        .subscribe();
    
    Message<Person> msg = protobufConsumer.receive();
    Person receivedPerson = msg.getValue();
    System.out.println("Received: " + receivedPerson.getName());
    protobufConsumer.acknowledge(msg);
    protobufConsumer.close();
    

When you use Schema.NATIVE_AVRO or Schema.PROTOBUF (which is an alias for Schema.PROTOBUF(generatedClass)), Pulsar automatically registers your schema with the schema registry for the topic if it doesn’t exist. If a schema does exist, it checks for compatibility.

The crucial part of schema enforcement is compatibility. Pulsar supports several compatibility modes, configured per topic. The default is usually BACKWARD, meaning new messages must be readable by old consumers.

  • FULL: New schema can read old messages, and old schema can read new messages.
  • BACKWARD: Old schema can read new messages. (Default)
  • FORWARD: New schema can read old messages.
  • NONE: No compatibility checks.

You can set this when creating a topic or by using the Pulsar Admin API. For example, to set FORWARD compatibility for user-topic using the Pulsar Admin CLI:

pulsar-admin topics update-property user-topic --property message_compatibility.forward.compatibility.enabled true

This command enables forward compatibility for the user-topic. When a new schema is registered that is forward-compatible with the existing schema, the registration succeeds. If a producer then sends a message using the new schema, and a consumer is still using the old schema, the consumer can still successfully deserialize and process the message. This is vital for rolling updates where consumers might lag behind producers.

The most surprising true thing about Pulsar schemas is that the schema itself is a message. When you register a schema or update it, that schema definition is published as a message on an internal Pulsar topic (e.g., __change_feed). This distributed, log-based approach to schema management ensures consistency and allows for auditing.

Understanding the internal __change_feed topic is key to debugging complex schema evolution scenarios across a cluster.

Want structured learning?

Take the full Pulsar course →