Using a registry to decouple schemas from messages in an event streaming analytics architecture
Introduction
In the last post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR, we learned about Apache Spark and Spark Structured Streaming on Amazon EMR (fka Amazon Elastic MapReduce) with Amazon Managed Streaming for Apache Kafka (Amazon MSK). We consumed messages from and published messages to Kafka using both batch and streaming queries. In that post, we serialized and deserialized messages to and from JSON using schemas we defined as a StructType (pyspark.sql.types.StructType
) in each PySpark script. Likewise, we constructed similar structs for CSV-format data files we read from and wrote to Amazon S3.
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
In this follow-up post, we will read and write messages to and from Amazon MSK in Apache Avro format. We will store the Avro-format Kafka message’s key and value schemas in Apicurio Registry and retrieve the schemas instead of hard-coding the schemas in the PySpark scripts. We will also use the registry to store schemas for CSV-format data files.

Video Demonstration
In addition to this post, there is now a video demonstration available on YouTube.
Technologies
In the last post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR, we learned about Apache Spark, Apache Kafka, Amazon EMR, and Amazon MSK.
In a previous post, Hydrating a Data Lake using Log-based Change Data Capture (CDC) with Debezium, Apicurio, and Kafka Connect on AWS, we explored Apache Avro and Apicurio Registry.
Apache Spark
Apache Spark, according to the documentation, is a unified analytics engine for large-scale data processing. Spark provides high-level APIs in Java, Scala, Python (PySpark), and R. Spark provides an optimized engine that supports general execution graphs (aka directed acyclic graphs or DAGs). In addition, Spark supports a rich set of higher-level tools, including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Spark Structured Streaming
Spark Structured Streaming, according to the documentation, is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end, exactly-once stream processing without the user having to reason about streaming.
Apache Avro
Apache Avro describes itself as a data serialization system. Apache Avro is a compact, fast, binary data format similar to Apache Parquet, Apache Thrift, MongoDB’s BSON, and Google’s Protocol Buffers (protobuf). However, Apache Avro is a row-based storage format compared to columnar storage formats like Apache Parquet and Apache ORC.

Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. According to the documentation, schemas permit each datum to be written with no per-value overheads, making serialization fast and small. Schemas also facilitate use with dynamic scripting languages since data, together with its schema, is fully self-describing.

Apicurio Registry
We can decouple the data from its schema by using schema registries such as Confluent Schema Registry or Apicurio Registry. According to Apicurio, in a messaging and event streaming architecture, data published to topics and queues must often be serialized or validated using a schema (e.g., Apache Avro, JSON Schema, or Google Protocol Buffers). Of course, schemas can be packaged in each application. Still, it is often a better architectural pattern to register schemas in an external system [schema registry] and then reference them from each application.
It is often a better architectural pattern to register schemas in an external system and then reference them from each application.
Amazon EMR
According to AWS documentation, Amazon EMR (fka Amazon Elastic MapReduce) is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hadoop, Hive, HBase, Flink, Hudi, and Presto. Amazon EMR is a fully managed AWS service that makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters.
Amazon EMR on EKS, a deployment option for Amazon EMR since December 2020, allows you to run Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). With the EKS deployment option, you can focus on running analytics workloads while Amazon EMR on EKS builds, configures, and manages containers for open-source applications.
If you are new to Amazon EMR for Spark, specifically PySpark, I recommend a recent two-part series of posts, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.
Apache Kafka
According to the documentation, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Amazon MSK
Apache Kafka clusters are challenging to set up, scale, and manage in production. According to AWS documentation, Amazon MSK is a fully managed AWS service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning and analytics applications.
Prerequisites
Similar to the previous post, this post will focus primarily on configuring and running Apache Spark jobs on Amazon EMR. To follow along, you will need the following resources deployed and configured on AWS:
- Amazon S3 bucket (holds all Spark/EMR resources);
- Amazon MSK cluster (using IAM Access Control);
- Amazon EKS container or an EC2 instance with the Kafka APIs installed and capable of connecting to Amazon MSK;
- Amazon EKS container or an EC2 instance with Apicurio Registry installed and capable of connecting to Amazon MSK (if using Kafka for backend storage) and being accessed by Amazon EMR;
- Ensure the Amazon MSK Configuration has
auto.create.topics.enable=true
; this setting isfalse
by default;
The architectural diagram below shows that the demonstration uses three separate VPCs within the same AWS account and AWS Region us-east-1
, for Amazon EMR, Amazon MSK, and Amazon EKS. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports and the corresponding CIDR ranges within your Amazon EMR, Amazon MSK, and Amazon EKS Security Groups. For additional security and cost savings, use a VPC endpoint for private communications between Amazon EMR and Amazon S3.

Source Code
All source code for this post and the three previous posts in the Amazon MSK series, including the Python and PySpark scripts demonstrated herein, are open-sourced and located on GitHub.
Objective
We will run a Spark Structured Streaming PySpark job to consume a simulated event stream of real-time sales data from Apache Kafka. Next, we will enrich (join) that sales data with the sales region and aggregate the sales and order volumes by region within a sliding event-time window. Next, we will continuously stream those aggregated results back to Kafka. Finally, a batch query will consume the aggregated results from Kafka and display the sales results in the console.

Kafka messages will be written in Apache Avro format. The schemas for the Kafka message keys and values and the schemas for the CSV-format sales and sales regions data will all be stored in Apricurio Registry. The Python and PySpark scripts will use Apricurio Registry’s REST API to read, write, and manage the Avro schema artifacts.
We are writing the Kafka message keys in Avro format and storing an Avro key schema in the registry. This is only done for demonstration purposes and not a requirement. Kafka message keys are not required, nor is it necessary to store both the key and the value in a common format of Avro in Kafka.
Schema evolution, compatibility, and validation are important considerations, but out of scope for this post.
PySpark Scripts
PySpark, according to the documentation, is an interface for Apache Spark in Python. PySpark allows you to write Spark applications using the Python API. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core. There are three PySpark scripts and one new helper Python script covered in this post:
- 10_create_schemas.py: Python script creates all Avro schemas in Apricurio Registry using the REST API;
- 11_incremental_sales_avro.py: PySpark script simulates an event stream of sales data being published to Kafka over 15–20 minutes;
- 12_streaming_enrichment_avro.py: PySpark script uses a streaming query to read messages from Kafka in real-time, enriches sales data, aggregates regional sales results, and writes results back to Kafka as a stream;
- 13_batch_read_results_avro.py: PySpark script uses a batch query to read aggregated regional sales results from Kafka and display them in the console;
Preparation
To prepare your Amazon EMR resources, review the instructions in the previous post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR. Here is a recap, with a few additions required for this post.
Amazon S3
We will start by gathering and copying the necessary files to your Amazon S3 bucket. The bucket will serve as the location for the Amazon EMR bootstrap script, additional JAR files required by Spark, PySpark scripts, and CSV-format data files.
There are a set of additional JAR files required by the Spark jobs we will be running. Download the JARs from Maven Central and GitHub, and place them in the emr_jars
project directory. The JARs will include AWS MSK IAM Auth, AWS SDK, Kafka Client, Spark SQL for Kafka, Spark Streaming, and other dependencies. Compared to the last post, there is one additional JAR for Avro.
Update the SPARK_BUCKET
environment variable, then upload the JARs, PySpark scripts, sample data, and EMR bootstrap script from your local copy of the GitHub project repository to your Amazon S3 bucket using the AWS s3
API.
Amazon EMR
The GitHub project repository includes a sample AWS CloudFormation template and an associated JSON-format CloudFormation parameters file. The CloudFormation template, stack.yml
, accepts several environment parameters. To match your environment, you will need to update the parameter values such as SSK key, Subnet, and S3 bucket. The template will build a minimally-sized Amazon EMR cluster with one master and two core nodes in an existing VPC. You can easily modify the template and parameters to meet your requirements and budget.
aws cloudformation deploy \
--stack-name spark-kafka-demo-dev \
--template-file ./cloudformation/stack.yml \
--parameter-overrides file://cloudformation/dev.json \
--capabilities CAPABILITY_NAMED_IAM
The CloudFormation template has two essential Spark configuration items — the list of applications to install on EMR and the bootstrap script deployment.
Below, we see the EMR bootstrap shell script, bootstrap_actions.sh.
The bootstrap script performed several tasks, including deploying the additional JAR files we copied to Amazon S3 earlier to EMR cluster nodes.

Parameter Store
The PySpark scripts in this demonstration will obtain configuration values from the AWS Systems Manager (AWS SSM) Parameter Store. Configuration values include a list of Amazon MSK bootstrap brokers, the Amazon S3 bucket that contains the EMR/Spark assets, and the Apicurio Registry REST API base URL. Using the Parameter Store ensures that no sensitive or environment-specific configuration is hard-coded into the PySpark scripts. Modify and execute the ssm_params.sh
script to create the AWS SSM Parameter Store parameters.
Create Schemas in Apricurio Registry
To create the schemas necessary for this demonstration, a Python script is included in the project, 10_create_schemas.py. The script uses Apricurio Registry’s REST API to create six new Avro-based schema artifacts.
Apricurio Registry supports several common artifact types, including AsyncAPI specification, Apache Avro schema, GraphQL schema, JSON Schema, Apache Kafka Connect schema, OpenAPI specification, Google protocol buffers schema, Web Services Definition Language, and XML Schema Definition. We will use the registry to store Avro schemas for use with Kafka and CSV data sources and sinks.
Although Apricurio Registry does not support CSV Schema, we can store the schemas for the CSV-format sales and sales region data in the registry as JSON-format Avro schemas.
{
"name": "Sales",
"type": "record",
"doc": "Schema for CSV-format sales data",
"fields": [
{
"name": "payment_id",
"type": "int"
},
{
"name": "customer_id",
"type": "int"
},
{
"name": "amount",
"type": "float"
},
{
"name": "payment_date",
"type": "string"
},
{
"name": "city",
"type": [
"string",
"null"
]
},
{
"name": "district",
"type": [
"string",
"null"
]
},
{
"name": "country",
"type": "string"
}
]
}
We can then retrieve the JSON-format Avro schema from the registry, convert it to PySpark StructType, and associate it to the DataFrame used to persist the sales data from the CSV files.
root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- amount: float (nullable = true)
|-- payment_date: string (nullable = true)
|-- city: string (nullable = true)
|-- district: string (nullable = true)
|-- country: string (nullable = true)
Using the registry allows us to avoid hard-coding the schema as a StructType in the PySpark scripts in advance.
Add the PySpark script as an EMR Step. EMR will run the Python script the same way it runs PySpark jobs.
The Python script creates six schema artifacts in Apricurio Registry, shown below in Apricurio Registry’s browser-based user interface. Schemas include two key/value pairs for two Kafka topics and two for CSV-format sales and sales region data.

You have the option of enabling validation and compatibility rules for each schema with Apricurio Registry.

Each Avro schema artifact is stored as a JSON object in the registry.

Simulate Sales Event Stream
Next, we will simulate an event stream of sales data published to Kafka over 15–20 minutes. The PySpark script, 11_incremental_sales_avro.py, reads 1,800 sales records into a DataFrame (pyspark.sql.DataFrame
) from a CSV file located in S3. The script then takes each Row (pyspark.sql.Row
) of the DataFrame, one row at a time, and writes them to the Kafka topic, pagila.sales.avro
, adding a slight delay between each write.
The PySpark scripts first retrieve the JSON-format Avro schema for the CSV data from Apricurio Registry using the Python requests
module and Apricurio Registry’s REST API (get_schema()
).
{
"name": "Sales",
"type": "record",
"doc": "Schema for CSV-format sales data",
"fields": [
{
"name": "payment_id",
"type": "int"
},
{
"name": "customer_id",
"type": "int"
},
{
"name": "amount",
"type": "float"
},
{
"name": "payment_date",
"type": "string"
},
{
"name": "city",
"type": [
"string",
"null"
]
},
{
"name": "district",
"type": [
"string",
"null"
]
},
{
"name": "country",
"type": "string"
}
]
}
The script then creates a StructType from the JSON-format Avro schema using an empty DataFrame (struct_from_json()
). Avro column types are converted to Spark SQL types. The only apparent issue is how Spark mishandles the nullable
value for each column. Recognize, column nullability in Spark is an optimization statement, not an enforcement of the object type.
root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- amount: float (nullable = true)
|-- payment_date: string (nullable = true)
|-- city: string (nullable = true)
|-- district: string (nullable = true)
|-- country: string (nullable = true)
The resulting StructType is used to read the CSV data into a DataFrame (read_from_csv()
).
For Avro-format Kafka key and value schemas, we use the same method, get_schema()
. The resulting JSON-format schemas are then passed to the to_avro()
and from_avro()
methods to read and write Avro-format messages to Kafka. Both methods are part of the pyspark.sql.avro.functions
module. Avro column types are converted to and from Spark SQL types.
We must run this PySpark script, 11_incremental_sales_avro.py, concurrently with the PySpark script, 12_streaming_enrichment_avro.py, to simulate an event stream. We will start both scripts in the next part of the post.
Stream Processing with Structured Streaming
The PySpark script, 12_streaming_enrichment_avro.py, uses a streaming query to read sales data messages from the Kafka topic, pagila.sales.avro
, in real-time, enriches the sales data, aggregates regional sales results, and writes the results back to Kafka in micro-batches every two minutes.
The PySpark script performs a stream-to-batch join between the streaming sales data from the Kafka topic, pagila.sales.avro
, and a CSV file that contains sales regions based on the common country
column. Schemas for the CSV data and the Kafka message keys and values are retrieved from Apicurio Registry using the REST API identically to the previous PySpark script.
The PySpark script then performs a streaming aggregation of the sale amount and order quantity over a sliding 10-minute event-time window, writing results to the Kafka topic, pagila.sales.summary.avro
, every two minutes. Below is a sample of the resulting streaming DataFrame, written to external storage, Kafka in this case, using a DataStreamWriter interface (pyspark.sql.streaming.DataStreamWriter
).
Once again, schemas for the second Kafka topic’s message key and value are retrieved from Apicurio Registry using its REST API. The key schema:
{
"name": "Key",
"type": "int",
"doc": "Schema for pagila.sales.summary.avro Kafka topic key"
}
And, the value schema:
{
"name": "Value",
"type": "record",
"doc": "Schema for pagila.sales.summary.avro Kafka topic value",
"fields": [
{
"name": "region",
"type": "string"
},
{
"name": "sales",
"type": "float"
},
{
"name": "orders",
"type": "int"
},
{
"name": "window_start",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "window_end",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
The schema as applied to the streaming DataFrame utilizing the to_avro()
method.
root
|-- region: string (nullable = false)
|-- sales: float (nullable = true)
|-- orders: integer (nullable = false)
|-- window_start: long (nullable = true)
|-- window_end: long (nullable = true)
Submit this streaming PySpark script, 12_streaming_enrichment_avro.py, as an EMR Step.
Wait about two minutes to give this third PySpark script time to start its streaming query fully.

Then, submit the second PySpark script, 11_incremental_sales_avro.py, as an EMR Step. Both PySpark scripts will run concurrently on your Amazon EMR cluster or using two different clusters.
The PySpark script, 11_incremental_sales_avro.py, should run for approximately 15–20 minutes.

During that time, every two minutes, the script, 12_streaming_enrichment_avro.py, will write micro-batches of aggregated sales results to the second Kafka topic, pagila.sales.summary.avro
in Avro format. An example of a micro-batch recorded in PySpark’s stdout
log is shown below.
Once this script completes, wait another two minutes, then stop the streaming PySpark script, 12_streaming_enrichment_avro.py.
Review the Results
To retrieve and display the results of the previous PySpark script’s streaming computations from Kafka, we can use the final PySpark script, 13_batch_read_results_avro.py.
Run the final script PySpark as EMR Step.
This final PySpark script reads all the Avro-format aggregated sales messages from the Kafka topic, using schemas from Apicurio Registry, using a batch read. The script then summarizes the final sales results for each sliding 10-minute event-time window, by sales region, to the stdout
job log.
Conclusion
In this post, we learned how to get started with Spark Structured Streaming on Amazon EMR using PySpark, the Apache Avro format, and Apircurio Registry. We decoupled Kafka message key and value schemas and the schemas of data stored in S3 as CSV, storing those schemas in a registry.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.