Table of Contents
Imagine your online store is processing orders smoothly. Then your order processing system starts rejecting customer data. Someone on your team changed a field name in the order data, and now your producer and consumer apps no longer "speak the same language." What was once a working pipeline is now throwing error messages and blocking customer purchases.
This scenario is surprisingly common in distributed systems. Applications that rely on shared data pipelines are fragile when it comes to format changes. Even a small mismatch, like renaming customer_email to email, can cause services to fail.
That's where Kafka Schema Registry comes in. Schema Registry acts as a central authority for data formats, ensuring that producers (apps sending data) and consumers (apps reading data) always agree on message structure. It makes evolving your data models safer, so you can add new fields without breaking existing services.
In this tutorial, you’ll learn how to set up Kafka Schema Registry on the cloud provider, step by step. We’ll walk you through building a simple order processing system where:
- A web store produces order events.
- A fulfillment service consumes and processes them.
- Schema Registry enforces the rules so data changes don’t break your apps.
By the end, you'll know how to produce and consume Avro-encoded messages with Schema Registry, evolve your schemas safely, and monitor your setup on a managed Kafka cluster in the cloud provider.
Key Takeaways
Before diving into the implementation, here's what you'll learn:
- Schema Registry ensures data consistency by enforcing schemas that producers and consumers must follow, preventing deserialization errors from field name mismatches.
- Schema evolution works safely with Schema Registry, you can add optional fields without breaking existing consumers, enabling zero-downtime updates.
- the cloud provider Managed Kafka includes Schema Registry on dedicated CPU plans, eliminating the need to run and maintain separate infrastructure.
- Avro serialization reduces message size compared to JSON while maintaining schema validation, improving throughput and storage efficiency.
- Schema versioning prevents breaking changes by tracking schema history and enforcing compatibility rules automatically.
What is Kafka Schema Registry?
Apache Kafka moves data between applications efficiently. It lets applications publish and subscribe to streams of records, making it a backbone for real-time systems. But while Kafka handles message transport, it doesn't enforce rules about message structure. Producers can send anything and must ensure that consumers can decode it.
That's where Schema Registry fits in as the central authority for your Kafka topics. It stores schemas, formal descriptions of your data structure, like the order data in our example, in a centralized service. These schemas are typically defined in formats like Avro, Protobuf, or JSON Schema.
Here’s why this matters:
- Consistency – Producers must write messages that follow a schema, and consumers must read messages according to that same schema. This prevents mismatches like
order_idvsidfrom causing deserialization errors. - Versioning – Over time, your business evolves. Maybe you add a
shipping_addressfield or support discount codes. Schema Registry allows you to evolve schemas safely, ensuring new versions remain compatible with old ones. - Validation – Messages are checked against the schema before being accepted. Invalid messages never make it into your topic, keeping your data stream clean.
When a producer sends a message, it registers or references a schema stored in Schema Registry. Consumers automatically fetch that schema (if they don’t already have it cached) and use it to decode messages.
the cloud provider's Managed Kafka service (available on dedicated CPU plans) includes Schema Registry out of the box. That means you can focus on building your applications instead of running and maintaining extra infrastructure. For more information on setting up Kafka clusters, see the the cloud provider Managed Kafka documentation.
An Example: Building an order processing system
To see Schema Registry in action, we’ll build a simple order processing system.
Here’s the scenario:
- The web store produces order messages whenever a customer checks out. These messages contain details like the order ID, customer email, total amount, and date.
- The fulfillment service consumes these messages, processes the orders, and prepares them for shipment.
- Schema Registry ensures that both producer and consumer agree on the structure of the order message.
Later, we’ll evolve our schema to add optional fields like shipping_address and discount_code. Without Schema Registry, these changes could cause errors. With Schema Registry, they’ll reliably work.
Step 1: Create a Kafka cluster with Schema Registry
We’ll start by creating a Kafka cluster on the cloud provider with Schema Registry enabled. You can do this with Terraform (recommended) or via the the cloud provider Control Panel.
Using Terraform (Recommended)
First, create a file called kafka-cluster.tf:
# Configure the the cloud provider provider
terraform {
required_providers {
the cloud provider = {
source = "the cloud provider/the cloud provider"
version = "~> 2.0"
}
}
}
provider "the cloud provider" {
token = var.do_token
}
# Variables
variable "do_token" {
type = string
description = "Your cloud provider API token"
sensitive = true
}
# Create Kafka cluster with Schema Registry
resource "the cloud provider_database_cluster" "kafka_cluster" {
engine = "kafka"
version = "3.8"
name = "orders-kafka-cluster"
node_count = 3
region = "nyc3"
size = "gd-2vcpu-8gb"
}
# Create topics for our example
resource "the cloud provider_database_kafka_topic" "orders" {
cluster_id = the cloud provider_database_cluster.kafka_cluster.id
name = "orders"
partition_count = 3
replication_factor = 3
}
# Create our order schema
resource "the cloud provider_database_kafka_schema_registry" "order_schema" {
cluster_id = the cloud provider_database_cluster.kafka_cluster.id
subject_name = "orders-value"
schema_type = "avro"
schema = jsonencode({
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "order_date", "type": "string"}
]
})
}
# Output connection details
output "kafka_host" {
value = the cloud provider_database_cluster.kafka_cluster.host
}
output "kafka_port" {
value = the cloud provider_database_cluster.kafka_cluster.port
}
output "schema_registry_url" {
value = "https://www.progressiverobot.com/an-introduction-to-databases/"
}
output "kafka_user" {
value = the cloud provider_database_cluster.kafka_cluster.user
}
output "kafka_password" {
value = the cloud provider_database_cluster.kafka_cluster.password
sensitive = true
}
Note: You'll need to create a Personal Access Token in your cloud account to use as the do_token value. Follow the instructions in the cloud provider API documentation to generate your token. For more on using Terraform with the cloud provider, see our Terraform with the cloud provider tutorial.
Deploy the cluster:
# Set your the cloud provider token
export TF_VAR_do_token="your_the cloud provider_token_here"
# Initialize and apply
terraform init
terraform apply
After a few minutes, Terraform will output the connection details for your Kafka cluster and Schema Registry.
Alternative: Using the the cloud provider control panel
If you prefer a click-based setup:
- In the the cloud provider console, go to Databases → Create Database.
- Select Kafka, version 3.8.
- Choose a General Purpose plan (Schema Registry requires dedicated CPU; Basic plans don’t support it).
- Pick a region and cluster size i.e. New York, 6 vCPU / 24 GB RAM / Storage minimum: 150 GiB
- After the cluster is created, go to the Settings tab and enable Schema Registry.
Either way, you now have a Kafka cluster with Schema Registry enabled that is ready to store schemas and enforce data rules.
Step 2: Verify Schema Registry setup
Once your cluster is deployed, let’s confirm that Schema Registry is working.
Run:
# Get your cluster details
terraform output
Test your Schema Registry connection by running the following, replace “your-kafka-host” and “username:password” with your actual host configuration from terraform output (you can view your password string with terraform output -raw kafka_password):
curl -u "username:password" "https://your-kafka-host:25065/subjects"
You should see something like:
["orders-value"]
This tells you that:
- Schema Registry is running and reachable.
- You have one subject (
orders-value) registered, which corresponds to the schema we defined for order messages.
A *subject* is essentially a namespace under which schemas are versioned. If you update your schema later, you’ll see multiple versions under the same subject.
Download SSL certificates
Before running your producer and consumer applications, you'll need to download the SSL certificates from your the cloud provider Kafka cluster:
- In the the cloud provider control panel, navigate to your Kafka cluster
- Go to the Connection Details tab
- Download the following file to your project directory:
ca-certificate.crt(Certificate Authority)
Note: For detailed instructions on downloading and configuring SSL certificates for your Kafka cluster, see the the cloud provider Managed Kafka documentation.
Make sure these certificate files are in the same directory as your Python scripts, or update the file paths in your configuration accordingly.
Step 3: Create an order producer (Python)
Now that the infrastructure is ready, let's create a Python producer that sends orders into Kafka. This example uses the confluent-kafka library, which provides robust support for Schema Registry integration. For more Python examples with the cloud provider services, see our Python tutorials.
Create a file called order_producer.py using the example below with the placeholder values for your-username, your-password, and your-kafka-host:your-kafka-port updated to match your Terraform output:
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
from datetime import datetime
# Configuration (replace with your actual values)
kafka_config = {
'bootstrap.servers': 'your-kafka-host:your-kafka-port',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'your-username',
'sasl.password': 'your-password',
'ssl.ca.location': 'ca-certificate.crt',
}
schema_registry_config = {
'url': 'https://your-kafka-host:25065',
'basic.auth.user.info': 'your-username:your-password'
}
# Our order schema (matches what we created in Terraform)
order_schema = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "order_date", "type": "string"}
]
}
"""
def create_producer():
"""Create producer with Avro serialization"""
# Create Schema Registry client
schema_registry_client = SchemaRegistryClient(schema_registry_config)
# Create Avro serializer
avro_serializer = AvroSerializer(
schema_registry_client,
order_schema
)
# Create producer
producer = Producer(kafka_config)
return producer, avro_serializer
def send_order(producer, avro_serializer, order_data):
"""Send order to Kafka topic"""
try:
serialized_value = avro_serializer(
order_data,
SerializationContext('orders', MessageField.VALUE)
)
producer.produce(
topic='orders',
value=serialized_value
)
producer.flush()
print(f"✅ Sent order: {order_data['order_id']}")
except Exception as e:
print(f"❌ Error sending order: {e}")
# Example usage
if __name__ == "__main__":
producer, avro_serializer = create_producer()
# Send sample orders
orders = [
{
"order_id": "ORDER-001",
"customer_email": "john@example.com",
"total_amount": 99.99,
"order_date": "2025-01-15"
},
{
"order_id": "ORDER-002",
"customer_email": "jane@example.com",
"total_amount": 149.50,
"order_date": "2025-01-15"
}
]
for order in orders:
send_order(producer, avro_serializer, order)
print("All orders sent!")
Now, install the required packages:
pip install 'confluent-kafka[avro]'
Run the producer:
python3 order_producer.py
You should see output like:
✅ Sent order: ORDER-001
✅ Sent order: ORDER-002
All orders sent!
So, what’s happening behind the scenes?
- Each order is validated against the schema.
- If the data doesn’t match the schema (say you forget
order_id), it won’t be sent. - Valid messages are serialized into Avro format and stored in Kafka.
Step 4: Create an order consumer (Python)
Next, let’s build the fulfillment service, a consumer that reads orders from Kafka.
Create a file called order_consumer.py:
[label order_consumer.py]
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
import json
# Configuration (same as producer)
kafka_config = {
'bootstrap.servers': 'your-kafka-host:your-kafka-port',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'your-username',
'sasl.password': 'your-password',
'ssl.ca.location': 'ca-certificate.crt',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
}
schema_registry_config = {
'url': 'https://your-kafka-host:25065',
'basic.auth.user.info': 'your-username:your-password'
}
# Our order schema (matches what we created in Terraform)
order_schema = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "order_date", "type": "string"}
]
}
"""
def create_consumer():
"""Create consumer with Avro deserialization"""
# Create Schema Registry client
schema_registry_client = SchemaRegistryClient(schema_registry_config)
# Create Avro deserializer
avro_deserializer = AvroDeserializer(
schema_registry_client,
order_schema
)
# Create consumer
consumer = Consumer(kafka_config)
return consumer, avro_deserializer
def process_order(order_data):
"""Process received order (your business logic here)"""
print(f"📦 Processing order {order_data['order_id']}")
print(f" Customer: {order_data['customer_email']}")
print(f" Amount: ${order_data['total_amount']}")
print(f" Date: {order_data['order_date']}")
print(" Order processed successfully! ✅\n")
# Example usage
if __name__ == "__main__":
consumer, avro_deserializer = create_consumer()
consumer.subscribe(['orders'])
print("🚀 Order consumer started. Waiting for orders...")
print("Press Ctrl+C to stop\n")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"❌ Consumer error: {msg.error()}")
continue
# Deserialize the message
try:
order_data = avro_deserializer(
msg.value(),
SerializationContext('orders', MessageField.VALUE)
)
process_order(order_data)
except Exception as e:
print(f"❌ Error deserializing message: {e}")
except KeyboardInterrupt:
print("👋 Shutting down consumer...")
finally:
consumer.close()
Now, let's run the consumer:
python order_consumer.py
Expected output:
🚀 Order consumer started. Waiting for orders...
📦 Processing order ORDER-001
Customer: john@example.com
Amount: $99.99
Date: 2025-01-15
Order processed successfully! ✅
The consumer automatically retrieves the schema from Schema Registry, deserializes the message, and makes sure the data structure is exactly what it expects.
Step 5: Test schema evolution
Now for the magic: evolving schemas safely.
Let’s add two new optional fields: shipping_address and discount_code. Update your Terraform schema:
resource "the cloud provider_database_kafka_schema_registry" "order_schema" {
cluster_id = the cloud provider_database_cluster.kafka_cluster.id
subject_name = "orders-value"
schema_type = "avro"
schema = jsonencode({
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_email", "type": "string"},
{"name": "total_amount", "type": "double"},
{"name": "order_date", "type": "string"},
{"name": "shipping_address", "type": ["null", "string"], "default": null},
{"name": "discount_code", "type": ["null", "string"], "default": null}
]
})
}
Apply the update:
terraform apply
Now try sending a new order by updating order_producer.py:
[label order_producer.py]
orders = [
{
"order_id": "ORDER-003",
"customer_email": "bob@example.com",
"total_amount": 199.99,
"order_date": "2025-01-15",
"shipping_address": "123 Main St, City, State",
"discount_code": "SAVE10"
}
]
Run:
python3 order_producer.py
What happens in the background?
- The producer validates the new order against the updated schema.
- The old consumer still works, because Schema Registry ensures backward compatibility.
- The new consumer can read both old and new messages without errors.
No downtime. No broken services. Just smooth schema evolution.
Step 6: Monitor your setup
You can monitor Schema Registry directly via its API:
# List all schemas
curl -u "username:password" \
"https://your-kafka-host:25065/subjects"
# Get latest schema version
curl -u "username:password" \
"https://your-kafka-host:25065/subjects/orders-value/versions/latest"
You’ll see the schema JSON returned, including version numbers.
In addition, the cloud provider’s control panel gives you:
- Cluster health at a glance.
- Message throughput per topic.
- Lag monitoring, to see if consumers are falling behind.
Together, these tools help you keep your pipeline reliable as you scale. For more on monitoring and observability, see the the cloud provider Monitoring documentation.
Troubleshooting common issues with Schema Registry
Even with managed infrastructure, you may run into common issues. Here’s a quick reference table for resolving them:
| Issue | Cause / Explanation | Solution |
|---|---|---|
| Schema Registry not found | Likely you’re on a Basic Kafka plan. Schema Registry requires a General Purpose (dedicated CPU) plan. | In the Control Panel, go to your cluster’s Settings and confirm Schema Registry is enabled. |
| SSL connection failed | Kafka certificates (CA, client cert, client key) may not be downloaded or are missing. | Ensure the file paths in your Python config point to the correct certificates downloaded from the cloud provider. |
| Schema compatibility error | Happens when you add new fields without defaults. Schema Registry enforces compatibility rules. | Make new fields optional with ["null", "string"] or provide a default value. |
Pro tip: When evolving schemas, always test with a consumer running before deploying to production.
Frequently Asked Questions
Kafka Schema Registry is a centralized service that stores and manages data schemas for Kafka topics. It ensures that producers and consumers agree on message structure, preventing errors from schema mismatches. Without Schema Registry, a simple field rename like changing customer_email to email can break your entire pipeline. Schema Registry validates messages before they enter topics and handles schema versioning automatically.
No. Schema Registry requires a General Purpose (dedicated CPU) plan. Basic plans don't support Schema Registry. When creating your Kafka cluster in the the cloud provider Control Panel, select a General Purpose plan to enable Schema Registry. You can find more details in the the cloud provider Managed Kafka documentation.
Schema Registry enforces compatibility rules when you update schemas. By default, it uses backward compatibility, meaning new schema versions must be readable by consumers using older versions. To add new fields safely, make them optional with ["null", "string"] or provide default values. Breaking changes (like removing required fields) are rejected unless you change the compatibility mode. This prevents production outages from schema updates.
Yes. the cloud provider's Schema Registry supports Avro, Protobuf, and JSON Schema formats. This tutorial uses Avro because it's widely adopted and provides efficient binary serialization. Protobuf offers similar benefits with a different schema definition language, while JSON Schema is more human-readable but less efficient. Choose based on your team's preferences and existing tooling.
Schema Registry validates messages before they're accepted. If a message doesn't match the registered schema, the producer will receive an error and the message won't be written to the topic. This keeps your data stream clean and prevents downstream consumers from encountering deserialization errors. Always validate your data structure matches the schema before sending.
You can monitor Schema Registry through its REST API using curl commands to list subjects and view schema versions. Additionally, the cloud provider's control panel provides cluster health metrics, message throughput per topic, and consumer lag monitoring. For production deployments, consider setting up alerts for schema compatibility errors and consumer lag thresholds.
What you've built
Congratulations! You now have a working Kafka pipeline with Schema Registry on the cloud provider.
Next steps
Now that your foundation is in place, here are some ideas to take it further:
- Add new fields to your schema, like
payment_statusordelivery_date. - Experiment with different schema formats such as Protobuf or JSON Schema.
- Integrate the cloud provider monitoring with alerts so you know when consumers lag.
- Scale your Kafka cluster to handle higher throughput using the the cloud provider Control Panel.
From here, you can expand your pipeline into a complete event-driven architecture. With Schema Registry on the cloud provider, you can build confidently, knowing your data integrity is protected.
Ready to go deeper? Check out these resources:
Continue learning and evolving your architecture with the cloud provider Managed Kafka and the tutorials above.