Stream Processing with Apache Spark, Kafka, Avro, and Apicurio Registry on Amazon EMR and Amazon MSK

Using a registry to decouple schemas from messages in an event streaming analytics architecture

Introduction

In the last post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR, we learned about Apache Spark and Spark Structured Streaming on Amazon EMR (fka Amazon Elastic MapReduce) with Amazon Managed Streaming for Apache Kafka (Amazon MSK). We consumed messages from and published messages to Kafka using both batch and streaming queries. In that post, we serialized and deserialized messages to and from JSON using schemas we defined as a StructType (pyspark.sql.types.StructType) in each PySpark script. Likewise, we constructed similar structs for CSV-format data files we read from and wrote to Amazon S3.

schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])

In this follow-up post, we will read and write messages to and from Amazon MSK in Apache Avro format. We will store the Avro-format Kafka message’s key and value schemas in Apicurio Registry and retrieve the schemas instead of hard-coding the schemas in the PySpark scripts. We will also use the registry to store schemas for CSV-format data files.

Note the addition of the registry to the architecture for this post’s demonstration

Technologies

In the last post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR, we learned about Apache Spark, Apache Kafka, Amazon EMR, and Amazon MSK.

In a previous post, Hydrating a Data Lake using Log-based Change Data Capture (CDC) with Debezium, Apicurio, and Kafka Connect on AWS, we explored Apache Avro and Apicurio Registry.

Apache Spark

Apache Spark, according to the documentation, is a unified analytics engine for large-scale data processing. Spark provides high-level APIs in Java, Scala, Python (PySpark), and R. Spark provides an optimized engine that supports general execution graphs (aka directed acyclic graphs or DAGs). In addition, Spark supports a rich set of higher-level tools, including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Interest over time in Apache Spark and PySpark compared to Hive and Presto, according to Google Trends

Spark Structured Streaming

Spark Structured Streaming, according to the documentation, is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end, exactly-once stream processing without the user having to reason about streaming.

Apache Avro

Apache Avro describes itself as a data serialization system. Apache Avro is a compact, fast, binary data format similar to Apache Parquet, Apache Thrift, MongoDB’s BSON, and Google’s Protocol Buffers (protobuf). However, Apache Avro is a row-based storage format compared to columnar storage formats like Apache Parquet and Apache ORC.

Undecoded Avro-format messages with their keys and values shown in non-human readable binary format

Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. According to the documentation, schemas permit each datum to be written with no per-value overheads, making serialization fast and small. Schemas also facilitate use with dynamic scripting languages since data, together with its schema, is fully self-describing.

Interest over time in Apache Avro compared to Parquet and ORC, according to Google Trends

Apicurio Registry

We can decouple the data from its schema by using schema registries such as Confluent Schema Registry or Apicurio Registry. According to Apicurio, in a messaging and event streaming architecture, data published to topics and queues must often be serialized or validated using a schema (e.g., Apache Avro, JSON Schema, or Google Protocol Buffers). Of course, schemas can be packaged in each application. Still, it is often a better architectural pattern to register schemas in an external system [schema registry] and then reference them from each application.

It is often a better architectural pattern to register schemas in an external system and then reference them from each application.

Amazon EMR

According to AWS documentation, Amazon EMR (fka Amazon Elastic MapReduce) is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hadoop, Hive, HBase, Flink, Hudi, and Presto. Amazon EMR is a fully managed AWS service that makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters.

Amazon EMR on EKS, a deployment option for Amazon EMR since December 2020, allows you to run Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). With the EKS deployment option, you can focus on running analytics workloads while Amazon EMR on EKS builds, configures, and manages containers for open-source applications.

If you are new to Amazon EMR for Spark, specifically PySpark, I recommend a recent two-part series of posts, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

Apache Kafka

According to the documentation, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Amazon MSK

Apache Kafka clusters are challenging to set up, scale, and manage in production. According to AWS documentation, Amazon MSK is a fully managed AWS service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning and analytics applications.

Prerequisites

Similar to the previous post, this post will focus primarily on configuring and running Apache Spark jobs on Amazon EMR. To follow along, you will need the following resources deployed and configured on AWS:

  1. Amazon S3 bucket (holds all Spark/EMR resources);
  2. Amazon MSK cluster (using IAM Access Control);
  3. Amazon EKS container or an EC2 instance with the Kafka APIs installed and capable of connecting to Amazon MSK;
  4. Amazon EKS container or an EC2 instance with Apicurio Registry installed and capable of connecting to Amazon MSK (if using Kafka for backend storage) and being accessed by Amazon EMR;
  5. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true; this setting is false by default;

The architectural diagram below shows that the demonstration uses three separate VPCs within the same AWS account and AWS Region us-east-1, for Amazon EMR, Amazon MSK, and Amazon EKS. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports and the corresponding CIDR ranges within your Amazon EMR, Amazon MSK, and Amazon EKS Security Groups. For additional security and cost savings, use a VPC endpoint for private communications between Amazon EMR and Amazon S3.

High-level architecture for this post’s demonstration

Source Code

All source code for this post and the three previous posts in the Amazon MSK series, including the Python and PySpark scripts demonstrated herein, are open-sourced and located on GitHub.

Objective

We will run a Spark Structured Streaming PySpark job to consume a simulated event stream of real-time sales data from Apache Kafka. Next, we will enrich (join) that sales data with the sales region and aggregate the sales and order volumes by region within a sliding event-time window. Next, we will continuously stream those aggregated results back to Kafka. Finally, a batch query will consume the aggregated results from Kafka and display the sales results in the console.

DataOps pipeline demonstrated in this post

Kafka messages will be written in Apache Avro format. The schemas for the Kafka message keys and values and the schemas for the CSV-format sales and sales regions data will all be stored in Apricurio Registry. The Python and PySpark scripts will use Apricurio Registry’s REST API to read, write, and manage the Avro schema artifacts.

We are writing the Kafka message keys in Avro format and storing an Avro key schema in the registry. This is only done for demonstration purposes and not a requirement. Kafka message keys are not required, nor is it necessary to store both the key and the value in a common format of Avro in Kafka.

Schema evolution, compatibility, and validation are important considerations, but out of scope for this post.

PySpark Scripts

PySpark, according to the documentation, is an interface for Apache Spark in Python. PySpark allows you to write Spark applications using the Python API. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core. There are three PySpark scripts and one new helper Python script covered in this post:

  1. 10_create_schemas.py: Python script creates all Avro schemas in Apricurio Registry using the REST API;
  2. 11_incremental_sales_avro.py: PySpark script simulates an event stream of sales data being published to Kafka over 15–20 minutes;
  3. 12_streaming_enrichment_avro.py: PySpark script uses a streaming query to read messages from Kafka in real-time, enriches sales data, aggregates regional sales results, and writes results back to Kafka as a stream;
  4. 13_batch_read_results_avro.py: PySpark script uses a batch query to read aggregated regional sales results from Kafka and display them in the console;

Preparation

To prepare your Amazon EMR resources, review the instructions in the previous post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR. Here is a recap, with a few additions required for this post.

Amazon S3

We will start by gathering and copying the necessary files to your Amazon S3 bucket. The bucket will serve as the location for the Amazon EMR bootstrap script, additional JAR files required by Spark, PySpark scripts, and CSV-format data files.

There are a set of additional JAR files required by the Spark jobs we will be running. Download the JARs from Maven Central and GitHub, and place them in the emr_jars project directory. The JARs will include AWS MSK IAM Auth, AWS SDK, Kafka Client, Spark SQL for Kafka, Spark Streaming, and other dependencies. Compared to the last post, there is one additional JAR for Avro.

Update the SPARK_BUCKET environment variable, then upload the JARs, PySpark scripts, sample data, and EMR bootstrap script from your local copy of the GitHub project repository to your Amazon S3 bucket using the AWS s3 API.

cd ./pyspark/
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
aws s3 cp emr_jars/ "s3://${SPARK_BUCKET}/jars/" –recursive
aws s3 cp pyspark_scripts/ "s3://${SPARK_BUCKET}/spark/" –recursive
aws s3 cp emr_bootstrap/ "s3://${SPARK_BUCKET}/spark/" –recursive
aws s3 cp data/ "s3://${SPARK_BUCKET}/spark/" –recursive
view raw copy_to_s3.sh hosted with ❤ by GitHub

Amazon EMR

The GitHub project repository includes a sample AWS CloudFormation template and an associated JSON-format CloudFormation parameters file. The CloudFormation template, stack.yml, accepts several environment parameters. To match your environment, you will need to update the parameter values such as SSK key, Subnet, and S3 bucket. The template will build a minimally-sized Amazon EMR cluster with one master and two core nodes in an existing VPC. You can easily modify the template and parameters to meet your requirements and budget.

aws cloudformation deploy \
--stack-name spark-kafka-demo-dev \
--template-file ./cloudformation/stack.yml \
--parameter-overrides file://cloudformation/dev.json \
--capabilities CAPABILITY_NAMED_IAM

The CloudFormation template has two essential Spark configuration items — the list of applications to install on EMR and the bootstrap script deployment.

Applications:
Name: 'Hadoop'
Name: 'Spark'
Name: 'JupyterEnterpriseGateway'
Name: 'Livy'
BootstrapActions:
Name: bootstrap-script
ScriptBootstrapAction:
Path: !Join [ '', [ 's3://', !Ref ProjectBucket, '/spark/bootstrap_actions.sh' ] ]

Below, we see the EMR bootstrap shell script, bootstrap_actions.sh.

#!/bin/bash
# Purpose: EMR bootstrap script
# Author: Gary A. Stafford
# Date: 2021-09-10
# arg passed in by CloudFormation
if [ $# -eq 0 ]
then
echo "No arguments supplied"
fi
SPARK_BUCKET=$1
# update yum packages, install jq
sudo yum update -y
sudo yum install -y jq
# jsk truststore for connecting to msk
sudo cp /usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/security/cacerts \
/tmp/kafka.client.truststore.jks
# set region for boto3
aws configure set region \
"$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)"
# install python packages for pyspark scripts
sudo python3 -m pip install boto3 botocore ec2-metadata
# install required jars for spark
sudo aws s3 cp \
"s3://${SPARK_BUCKET}/jars/" /usr/lib/spark/jars/ \
–recursive –exclude "*" –include "*.jar"
view raw bootstrap_actions.sh hosted with ❤ by GitHub

The bootstrap script performed several tasks, including deploying the additional JAR files we copied to Amazon S3 earlier to EMR cluster nodes.

Amazon EMR cluster ‘bootstrap actions’ tab

Parameter Store

The PySpark scripts in this demonstration will obtain configuration values from the AWS Systems Manager (AWS SSM) Parameter Store. Configuration values include a list of Amazon MSK bootstrap brokers, the Amazon S3 bucket that contains the EMR/Spark assets, and the Apicurio Registry REST API base URL. Using the Parameter Store ensures that no sensitive or environment-specific configuration is hard-coded into the PySpark scripts. Modify and execute the ssm_params.sh script to create the AWS SSM Parameter Store parameters.

aws ssm put-parameter \
–name /kafka_spark_demo/kafka_servers \
–type String \
–value "<b-1.your-brokers.kafka.us-east-1.amazonaws.com:9098,b-2…>" \
–description "Amazon MSK Kafka broker list" \
–overwrite
aws ssm put-parameter \
–name /kafka_spark_demo/kafka_demo_bucket \
–type String \
–value "<your-bucket-111222333444-us-east-1>" \
–description "Amazon S3 bucket" \
–overwrite
aws ssm put-parameter \
–name /kafka_spark_demo/schema_resistry_url_int \
–type String \
–value "http://<your_host&gt;:<your_port>" \
–description "Apicurio Registry REST API base URL (Internal Address)" \
–overwrite
view raw ssm_params.sh hosted with ❤ by GitHub

Create Schemas in Apricurio Registry

To create the schemas necessary for this demonstration, a Python script is included in the project, 10_create_schemas.py. The script uses Apricurio Registry’s REST API to create six new Avro-based schema artifacts.

Apricurio Registry supports several common artifact types, including AsyncAPI specification, Apache Avro schema, GraphQL schema, JSON Schema, Apache Kafka Connect schema, OpenAPI specification, Google protocol buffers schema, Web Services Definition Language, and XML Schema Definition. We will use the registry to store Avro schemas for use with Kafka and CSV data sources and sinks.

Although Apricurio Registry does not support CSV Schema, we can store the schemas for the CSV-format sales and sales region data in the registry as JSON-format Avro schemas.

{
"name": "Sales",
"type": "record",
"doc": "Schema for CSV-format sales data",
"fields": [
{
"name": "payment_id",
"type": "int"
},
{
"name": "customer_id",
"type": "int"
},
{
"name": "amount",
"type": "float"
},
{
"name": "payment_date",
"type": "string"
},
{
"name": "city",
"type": [
"string",
"null"
]
},
{
"name": "district",
"type": [
"string",
"null"
]
},
{
"name": "country",
"type": "string"
}
]
}

We can then retrieve the JSON-format Avro schema from the registry, convert it to PySpark StructType, and associate it to the DataFrame used to persist the sales data from the CSV files.

root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- amount: float (nullable = true)
|-- payment_date: string (nullable = true)
|-- city: string (nullable = true)
|-- district: string (nullable = true)
|-- country: string (nullable = true)

Using the registry allows us to avoid hard-coding the schema as a StructType in the PySpark scripts in advance.

# Purpose: Create Avro schemas in Apicurio Registry.
# Author: Gary A. Stafford
# Date: 2021-09-28
import json
import os
import boto3
import requests
params = {}
os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
ssm_client = boto3.client("ssm")
def main():
global params
params = get_parameters()
artifact_id = "pagila.sales.csv"
data = '''{"name":"Sales","type":"record",
"doc":"Schema for CSV-format sales data",
"fields":[
{"name":"payment_id","type":"int"},
{"name":"customer_id","type":"int"},
{"name":"amount","type":"float"},
{"name":"payment_date","type":"string"},
{"name":"city","type":["string","null"]},
{"name":"district","type":["string","null"]},
{"name":"country","type":"string"}]}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.regions.csv"
data = '''{"name":"Regions","type":"record",
"doc":"Schema for CSV-format sales regions data",
"fields":[
{"name":"country","type":"string"},
{"name":"region","type":"string"}]}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.avro-key"
data = '''{"name":"Key","type":"int",
"doc":"Schema for pagila.sales.avro Kafka topic key"}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.avro-value"
data = '''{"name":"Value","type":"record",
"doc":"Schema for pagila.sales.avro Kafka topic value",
"fields":[
{"name":"payment_id","type":"int"},
{"name":"customer_id","type":"int"},
{"name":"amount","type":"float"},
{"name":"payment_date","type":"long","logicalType":"timestamp-millis"},
{"name":"city","type":["string","null"]},
{"name":"district","type":["string","null"]},
{"name":"country","type":"string"}]}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.summary.avro-key"
data = '''{"name":"Key","type":"int",
"doc":"Schema for pagila.sales.summary.avro Kafka topic key"}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.summary.avro-value"
data = '''{"name":"Value","type":"record",
"doc":"Schema for pagila.sales.summary.avro Kafka topic value",
"fields":[
{"name":"region","type":"string"},
{"name":"sales","type":"float"},
{"name":"orders","type":"int"},
{"name":"window_start","type":"long","logicalType":"timestamp-millis"},
{"name":"window_end","type":"long","logicalType":"timestamp-millis"}]}'''
create_schema(artifact_id, data)
def create_schema(artifact_id, data):
"""Delete existing Avro schema, create new schema, and retrieve the schema"""
delete_schema(artifact_id)
print(json.dumps(json.loads(post_schema(artifact_id, data)), indent=4))
print(json.dumps(json.loads(get_schema(artifact_id)), indent=4))
def post_schema(artifact_id, data):
"""Post Avro schema to Apicurio Registry"""
response = requests.post(
url=f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts",
data=data,
headers={"X-Registry-ArtifactId": artifact_id})
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def get_schema(artifact_id):
"""Get Avro schema from Apicurio Registry"""
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def delete_schema(artifact_id):
"""Delete Avro schema from Apicurio Registry"""
try:
response = requests.delete(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
return response.content.decode("utf-8")
except:
return f"Schema not found: {artifact_id}"
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
parameters = {
"schema_registry_url": ssm_client.get_parameter(
Name="/kafka_spark_demo/schema_registry_url_int")["Parameter"]["Value"],
}
return parameters
if __name__ == "__main__":
main()
view raw 10_create_schemas.py hosted with ❤ by GitHub

Add the PySpark script as an EMR Step. EMR will run the Python script the same way it runs PySpark jobs.

export CLUSTER_ID="<your-cluster-id>"
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='create-schemas',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/10_create_schemas.py]"""

The Python script creates six schema artifacts in Apricurio Registry, shown below in Apricurio Registry’s browser-based user interface. Schemas include two key/value pairs for two Kafka topics and two for CSV-format sales and sales region data.

Artifacts in Apricurio Registry’s browser-based UI

You have the option of enabling validation and compatibility rules for each schema with Apricurio Registry.

Content Rules options in Apricurio Registry’s browser-based UI

Each Avro schema artifact is stored as a JSON object in the registry.

Detailed view of Avro schema as JSON in Apricurio Registry’s browser-based UI

Simulate Sales Event Stream

Next, we will simulate an event stream of sales data published to Kafka over 15–20 minutes. The PySpark script, 11_incremental_sales_avro.py, reads 1,800 sales records into a DataFrame (pyspark.sql.DataFrame) from a CSV file located in S3. The script then takes each Row (pyspark.sql.Row) of the DataFrame, one row at a time, and writes them to the Kafka topic, pagila.sales.avro, adding a slight delay between each write.

# Purpose: Write sales data from CSV to a new Kafka topic in Avro format.
# Use a delay between each message to simulate an event stream of sales data.
# Author: Gary A. Stafford
# Date: 2021-09-28
import os
import time
import boto3
import pyspark.sql.functions as F
import requests
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import to_avro
from pyspark.sql.types import LongType
sink_topic = "pagila.sales.avro"
# 1800 messages * .75 second delay = ~22.5 minutes added latency
delay_between_messages = 0.75
params = {}
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
global params
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-incremental-sales") \
.getOrCreate()
csv_sales_schema = get_schema("pagila.sales.csv")
schema = struct_from_json(spark, csv_sales_schema)
df_sales = read_from_csv(spark, "sales_incremental_large.csv", schema, "|")
df_sales.show(5, truncate=False)
write_to_kafka(spark, df_sales)
def write_to_kafka(spark, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
sink_topic,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
sales_schema_key = get_schema("pagila.sales.avro-key")
sales_schema_value = get_schema("pagila.sales.avro-value")
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message \
.drop("payment_date") \
.withColumn("payment_date",
F.unix_timestamp(F.current_timestamp()).cast(LongType())) \
.select(to_avro("customer_id", sales_schema_key).alias("key"),
to_avro(F.struct("*"), sales_schema_value).alias("value")) \
.write \
.format("kafka") \
.options(**options_write) \
.save()
time.sleep(delay_between_messages)
# ***** utility methods *****
def read_from_csv(spark, source_data, schema, sep):
"""Read CSV data from S3"""
df = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{source_data}",
schema=schema, header=True, sep=sep)
return df
def struct_from_json(spark, json_format_schema):
"""Returns a schema as a pyspark.sql.types.StructType from Avro schema"""
df = spark \
.read \
.format("avro") \
.option("avroSchema", json_format_schema) \
.load()
df.printSchema()
return df.schema
def get_schema(artifact_id):
"""Get Avro schema from Apicurio Registry"""
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
parameters = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
"schema_registry_url": ssm_client.get_parameter(
Name="/kafka_spark_demo/schema_registry_url_int")["Parameter"]["Value"],
}
return parameters
if __name__ == "__main__":
main()

The PySpark scripts first retrieve the JSON-format Avro schema for the CSV data from Apricurio Registry using the Python requests module and Apricurio Registry’s REST API (get_schema()).

{
"name": "Sales",
"type": "record",
"doc": "Schema for CSV-format sales data",
"fields": [
{
"name": "payment_id",
"type": "int"
},
{
"name": "customer_id",
"type": "int"
},
{
"name": "amount",
"type": "float"
},
{
"name": "payment_date",
"type": "string"
},
{
"name": "city",
"type": [
"string",
"null"
]
},
{
"name": "district",
"type": [
"string",
"null"
]
},
{
"name": "country",
"type": "string"
}
]
}

The script then creates a StructType from the JSON-format Avro schema using an empty DataFrame (struct_from_json()). Avro column types are converted to Spark SQL types. The only apparent issue is how Spark mishandles the nullable value for each column. Recognize, column nullability in Spark is an optimization statement, not an enforcement of the object type.

root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- amount: float (nullable = true)
|-- payment_date: string (nullable = true)
|-- city: string (nullable = true)
|-- district: string (nullable = true)
|-- country: string (nullable = true)

The resulting StructType is used to read the CSV data into a DataFrame (read_from_csv()).

csv_sales_schema = get_schema("pagila.sales.csv")
schema = struct_from_json(spark, csv_sales_schema)
df_sales = read_from_csv(spark, "sales_incremental_large.csv", schema, "|")
write_to_kafka(spark, df_sales)
def get_schema(artifact_id):
"""Get Avro schema from Apicurio Registry"""
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def struct_from_json(spark, json_format_schema):
"""Returns a schema as a pyspark.sql.types.StructType from Avro schema"""
df = spark \
.read \
.format("avro") \
.option("avroSchema", json_format_schema) \
.load()
df.printSchema()
return df.schema
def read_from_csv(spark, source_data, schema, sep):
"""Read CSV data from S3"""
df = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{source_data}",
schema=schema, header=True, sep=sep)
return df
view raw utility_methods.py hosted with ❤ by GitHub
Code snippet from PySpark script, 10_create_schemas.py

For Avro-format Kafka key and value schemas, we use the same method, get_schema(). The resulting JSON-format schemas are then passed to the to_avro() and from_avro() methods to read and write Avro-format messages to Kafka. Both methods are part of the pyspark.sql.avro.functions module. Avro column types are converted to and from Spark SQL types.

def get_schema(artifact_id):
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def write_to_kafka(spark, df_sales):
sales_schema_key = get_schema("pagila.sales.avro-key")
sales_schema_value = get_schema("pagila.sales.avro-value")
df_message \
.select(to_avro("customer_id", sales_schema_key).alias("key"),
to_avro(F.struct("*"), sales_schema_value).alias("value")) \
.write \
.format("kafka") \
.options(**options_write) \
.save()
Code snippet from PySpark script, 11_incremental_sales_avro.py

We must run this PySpark script, 11_incremental_sales_avro.py, concurrently with the PySpark script, 12_streaming_enrichment_avro.py, to simulate an event stream. We will start both scripts in the next part of the post.

Stream Processing with Structured Streaming

The PySpark script, 12_streaming_enrichment_avro.py, uses a streaming query to read sales data messages from the Kafka topic, pagila.sales.avro, in real-time, enriches the sales data, aggregates regional sales results, and writes the results back to Kafka in micro-batches every two minutes.

# Purpose: Streaming read from Kafka topic in Avro format. Enrich and aggregate
# current sales by sales region to second Kafka topic every n minutes.
# Author: Gary A. Stafford
# Date: 2021-09-28
import os
import boto3
import pyspark.sql.functions as F
import requests
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.types import IntegerType, FloatType, LongType
source_topic = "pagila.sales.avro"
sink_topic = "pagila.sales.summary.avro"
params = {}
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
global params
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales") \
.getOrCreate()
csv_sales_regions_schema = get_schema("pagila.sales.regions.csv")
schema = struct_from_json(spark, csv_sales_regions_schema)
df_regions = read_from_csv(spark, "sales_regions.csv", schema, ",")
df_regions.cache()
df_regions.show(5, truncate=False)
df_sales = read_from_kafka(spark)
summarize_sales(df_sales, df_regions)
def read_from_kafka(spark):
sales_schema_value = get_schema("pagila.sales.avro-value")
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
source_topic,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load() \
.select(from_avro("value", sales_schema_value).alias("data"), "timestamp") \
.select("data.*", "timestamp")
return df_sales
def summarize_sales(df_sales, df_regions):
sales_summary_key = get_schema("pagila.sales.summary.avro-key")
sales_summary_value = get_schema("pagila.sales.summary.avro-value")
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
sink_topic,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
ds_sales = df_sales \
.join(df_regions, on=["country"], how="leftOuter") \
.na.fill("Unassigned") \
.withWatermark("timestamp", "10 minutes") \
.groupBy("region", F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.sum("amount"), F.count("amount")) \
.orderBy(F.col("window").desc(), F.col("sum(amount)").desc()) \
.select("region",
F.col("sum(amount)").cast(FloatType()).alias("sales"),
F.col("count(amount)").cast(IntegerType()).alias("orders"),
F.unix_timestamp("window.start").cast(LongType()).alias("window_start"),
F.unix_timestamp("window.end").cast(LongType()).alias("window_end")) \
.coalesce(1) \
.select(to_avro(F.col("window_start").cast(IntegerType()), sales_summary_key).alias("key"),
to_avro(F.struct("*"), sales_summary_value).alias("value")) \
.writeStream \
.trigger(processingTime="2 minute") \
.queryName("streaming_to_kafka") \
.outputMode("complete") \
.format("kafka") \
.options(**options_write) \
.option("checkpointLocation", "/checkpoint/kafka/") \
.start()
ds_sales.awaitTermination()
# ***** utility methods *****
def read_from_csv(spark, source_data, schema, sep):
"""Read CSV data from S3"""
df = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{source_data}",
schema=schema, header=True, sep=sep)
return df
def struct_from_json(spark, json_format_schema):
"""Returns a schema as a pyspark.sql.types.StructType from Avro schema"""
df = spark \
.read \
.format("avro") \
.option("avroSchema", json_format_schema) \
.load()
df.printSchema()
return df.schema
def get_schema(artifact_id):
"""Get Avro schema from Apicurio Registry"""
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
parameters = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
"schema_registry_url": ssm_client.get_parameter(
Name="/kafka_spark_demo/schema_registry_url_int")["Parameter"]["Value"],
}
return parameters
if __name__ == "__main__":
main()

The PySpark script performs a stream-to-batch join between the streaming sales data from the Kafka topic, pagila.sales.avro, and a CSV file that contains sales regions based on the common country column. Schemas for the CSV data and the Kafka message keys and values are retrieved from Apicurio Registry using the REST API identically to the previous PySpark script.

+———-+———–+——+—————————–+————+——————-+——–+
|payment_id|customer_id|amount|payment_date |city |district |country |
+———-+———–+——+—————————–+————+——————-+——–+
|16666 |204 |3.99 |2021-05-10 13:04:06.996577+00|Usak |Usak |Turkey |
|17044 |187 |10.99 |2021-05-10 13:08:19.996577+00|Sumy |Sumy |Ukraine |
|16330 |416 |3.99 |2021-05-10 13:11:40.996577+00|Dadu |Sind |Pakistan|
|16283 |390 |7.99 |2021-05-10 13:12:14.996577+00|Nakhon Sawan|Nakhon Sawan |Thailand|
|16910 |114 |7.99 |2021-05-10 13:20:41.996577+00|Duisburg |Nordrhein-Westfalen|Germany |
+———-+———–+——+—————————–+————+——————-+——–+
only showing top 5 rows
view raw csv_data.txt hosted with ❤ by GitHub
Sales data from the streaming query of the first Kafka topic
+————–+————–+
|country |region |
+————–+————–+
|Afghanistan |Asia & Pacific|
|Aland Islands |Europe |
|Albania |Europe |
|Algeria |Arab States |
|American Samoa|Asia & Pacific|
+————–+————–+
only showing top 5 rows
view raw csv_regions.txt hosted with ❤ by GitHub
Sales regions data from the CSV file in Amazon S3

The PySpark script then performs a streaming aggregation of the sale amount and order quantity over a sliding 10-minute event-time window, writing results to the Kafka topic, pagila.sales.summary.avro, every two minutes. Below is a sample of the resulting streaming DataFrame, written to external storage, Kafka in this case, using a DataStreamWriter interface (pyspark.sql.streaming.DataStreamWriter).

+————–+———-+——+————+———-+
|region |sales |orders|window_start|window_end|
+————–+———-+——+————+———-+
|Asia & Pacific|1065.47 |153 |1633296600 |1633297200|
|Europe |632.16 |84 |1633296600 |1633297200|
|Latin America |443.34998 |65 |1633296600 |1633297200|
|North America |189.7 |30 |1633296600 |1633297200|
|Africa |137.81 |19 |1633296600 |1633297200|
|Middle East |111.829994|17 |1633296600 |1633297200|
|Unassigned |50.92 |8 |1633296600 |1633297200|
|Arab States |36.96 |4 |1633296600 |1633297200|
|Asia & Pacific|2632.26 |374 |1633296300 |1633296900|
|Europe |1415.0599 |194 |1633296300 |1633296900|
|Latin America |1260.1799 |182 |1633296300 |1633296900|
|North America |436.31998 |68 |1633296300 |1633296900|
|Africa |419.41998 |58 |1633296300 |1633296900|
|Middle East |276.61 |39 |1633296300 |1633296900|
|Unassigned |151.78 |22 |1633296300 |1633296900|
|Arab States |96.869995 |13 |1633296300 |1633296900|
|Asia & Pacific|3130.5898 |441 |1633296000 |1633296600|
|Europe |1679.6499 |235 |1633296000 |1633296600|
|Latin America |1350.0499 |195 |1633296000 |1633296600|
|Africa |603.18 |82 |1633296000 |1633296600|
|North America |573.19 |81 |1633296000 |1633296600|
|Middle East |291.6 |40 |1633296000 |1633296600|
|Unassigned |205.68999 |31 |1633296000 |1633296600|
|Arab States |162.78 |22 |1633296000 |1633296600|
+————–+———-+——+————+———-+
view raw sales_results.txt hosted with ❤ by GitHub
Aggregated, windowed sales results streamed back to the second Kafka topic

Once again, schemas for the second Kafka topic’s message key and value are retrieved from Apicurio Registry using its REST API. The key schema:

{
"name": "Key",
"type": "int",
"doc": "Schema for pagila.sales.summary.avro Kafka topic key"
}

And, the value schema:

{
"name": "Value",
"type": "record",
"doc": "Schema for pagila.sales.summary.avro Kafka topic value",
"fields": [
{
"name": "region",
"type": "string"
},
{
"name": "sales",
"type": "float"
},
{
"name": "orders",
"type": "int"
},
{
"name": "window_start",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "window_end",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}

The schema as applied to the streaming DataFrame utilizing the to_avro() method.

root
|-- region: string (nullable = false)
|-- sales: float (nullable = true)
|-- orders: integer (nullable = false)
|-- window_start: long (nullable = true)
|-- window_end: long (nullable = true)

Submit this streaming PySpark script, 12_streaming_enrichment_avro.py, as an EMR Step.

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='streaming-query',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/12_streaming_enrichment_avro.py]"""

Wait about two minutes to give this third PySpark script time to start its streaming query fully.

PySpark Structured Streaming job running on Amazon EMR cluster

Then, submit the second PySpark script, 11_incremental_sales_avro.py, as an EMR Step. Both PySpark scripts will run concurrently on your Amazon EMR cluster or using two different clusters.

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='sales-event-stream',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/11_incremental_sales_avro.py]"""

The PySpark script, 11_incremental_sales_avro.py, should run for approximately 15–20 minutes.

Simulated event stream of sales data completed on a second Amazon EMR cluster

During that time, every two minutes, the script, 12_streaming_enrichment_avro.py, will write micro-batches of aggregated sales results to the second Kafka topic, pagila.sales.summary.avroin Avro format. An example of a micro-batch recorded in PySpark’s stdout log is shown below.

{
"id" : "bc44379f-9c1a-4d14-8392-4a8b860b24f1",
"runId" : "5ed235ac-5ff6-47d3-bb38-28e04f7ab752",
"name" : "streaming_to_kafka",
"timestamp" : "2021-10-03T22:38:00.000Z",
"batchId" : 24,
"numInputRows" : 127,
"inputRowsPerSecond" : 1.0583333333333333,
"processedRowsPerSecond" : 8.653584082856364,
"durationMs" : {
"addBatch" : 12888,
"getBatch" : 0,
"latestOffset" : 5,
"queryPlanning" : 84,
"triggerExecution" : 14676,
"walCommit" : 1676
},
"eventTime" : {
"avg" : "2021-10-03T22:36:59.638Z",
"max" : "2021-10-03T22:37:59.350Z",
"min" : "2021-10-03T22:36:00.268Z",
"watermark" : "2021-10-03T22:25:59.333Z"
},
"stateOperators" : [ {
"numRowsTotal" : 192,
"numRowsUpdated" : 28,
"memoryUsedBytes" : 204696,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"loadedMapCacheHitCount" : 5364,
"loadedMapCacheMissCount" : 400,
"stateOnCurrentVersionSizeBytes" : 80112
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[pagila.sales.avro]]",
"startOffset" : {
"pagila.sales.avro" : {
"0" : 1627
}
},
"endOffset" : {
"pagila.sales.avro" : {
"0" : 1754
}
},
"numInputRows" : 127,
"inputRowsPerSecond" : 1.0583333333333333,
"processedRowsPerSecond" : 8.653584082856364
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@2c6d1341",
"numOutputRows" : 96
}
}
view raw micro_batch.json hosted with ❤ by GitHub
Streaming query results of a micro-batch written to Kafka as Avro

Once this script completes, wait another two minutes, then stop the streaming PySpark script, 12_streaming_enrichment_avro.py.

Review the Results

To retrieve and display the results of the previous PySpark script’s streaming computations from Kafka, we can use the final PySpark script, 13_batch_read_results_avro.py.

# Purpose: Batch read and display sales totals from Kafka in Avro format.
# Author: Gary A. Stafford
# Date: 2021-09-28
import os
import boto3
import pyspark.sql.functions as F
import requests
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.window import Window
source_topic = "pagila.sales.summary.avro"
params = {}
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
global params
params = get_parameters()
df_sales = read_from_kafka()
df_sales.show(100, truncate=False)
def read_from_kafka():
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales") \
.getOrCreate()
sales_summary_key = get_schema("pagila.sales.summary.avro-key")
sales_summary_value = get_schema("pagila.sales.summary.avro-value")
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
source_topic,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
window = Window.partitionBy("region", "window_start").orderBy(F.col("timestamp").desc())
df_sales = spark.read \
.format("kafka") \
.options(**options_read) \
.load() \
.select("timestamp",
from_avro("key", sales_summary_key).alias("key"),
from_avro("value", sales_summary_value).alias("data")) \
.select("timestamp", "key", "data.*") \
.withColumn("row", F.row_number().over(window)) \
.where(F.col("row") == 1).drop("row") \
.select(F.col("region").alias("sales_region"),
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders"),
F.from_unixtime("window_start", format="yyyy-MM-dd HH:mm").alias("window_start"),
F.from_unixtime("window_end", format="yyyy-MM-dd HH:mm").alias("window_end")) \
.orderBy(F.col("window_start").desc(), F.regexp_replace("sales", ",", "").cast("float").desc())
return df_sales
# ***** utility methods *****
def get_schema(artifact_id):
"""Get Avro schema from Apicurio Registry"""
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
parameters = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
"schema_registry_url": ssm_client.get_parameter(
Name="/kafka_spark_demo/schema_registry_url_int")["Parameter"]["Value"],
}
return parameters
if __name__ == "__main__":
main()

Run the final script PySpark as EMR Step.

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='display-sales-results',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/13_batch_read_results_avro.py]"""

This final PySpark script reads all the Avro-format aggregated sales messages from the Kafka topic, using schemas from Apicurio Registry, using a batch read. The script then summarizes the final sales results for each sliding 10-minute event-time window, by sales region, to the stdout job log.

+————–+——–+——+—————-+—————-+
|sales_region |sales |orders|window_start |window_end |
+————–+——–+——+—————-+—————-+
|Asia & Pacific|1,593.74|226 |2021-10-03 22:30|2021-10-03 22:40|
|Europe |833.89 |111 |2021-10-03 22:30|2021-10-03 22:40|
|Latin America |686.00 |100 |2021-10-03 22:30|2021-10-03 22:40|
|North America |274.57 |43 |2021-10-03 22:30|2021-10-03 22:40|
|Africa |216.70 |30 |2021-10-03 22:30|2021-10-03 22:40|
|Middle East |164.77 |23 |2021-10-03 22:30|2021-10-03 22:40|
|Unassigned |86.88 |12 |2021-10-03 22:30|2021-10-03 22:40|
|Arab States |58.92 |8 |2021-10-03 22:30|2021-10-03 22:40|
+————–+——–+——+—————-+—————-+
|Asia & Pacific|1,828.41|259 |2021-10-03 22:25|2021-10-03 22:35|
|Europe |878.79 |121 |2021-10-03 22:25|2021-10-03 22:35|
|Latin America |861.76 |124 |2021-10-03 22:25|2021-10-03 22:35|
|Africa |284.60 |40 |2021-10-03 22:25|2021-10-03 22:35|
|North America |284.56 |44 |2021-10-03 22:25|2021-10-03 22:35|
|Middle East |175.76 |24 |2021-10-03 22:25|2021-10-03 22:35|
|Unassigned |93.87 |13 |2021-10-03 22:25|2021-10-03 22:35|
|Arab States |78.89 |11 |2021-10-03 22:25|2021-10-03 22:35|
+————–+——–+——+—————-+—————-+
|Asia & Pacific|1,744.52|248 |2021-10-03 22:20|2021-10-03 22:30|
|Europe |948.64 |136 |2021-10-03 22:20|2021-10-03 22:30|
|Latin America |840.81 |119 |2021-10-03 22:20|2021-10-03 22:30|
|Africa |299.59 |41 |2021-10-03 22:20|2021-10-03 22:30|
|North America |282.59 |41 |2021-10-03 22:20|2021-10-03 22:30|
|Middle East |181.74 |26 |2021-10-03 22:20|2021-10-03 22:30|
|Unassigned |101.84 |16 |2021-10-03 22:20|2021-10-03 22:30|
|Arab States |64.92 |8 |2021-10-03 22:20|2021-10-03 22:30|
+————–+——–+——+—————-+—————-+
Tabulated sales results by the event-time windows (window breaks added for clarity)

Conclusion

In this post, we learned how to get started with Spark Structured Streaming on Amazon EMR using PySpark, the Apache Avro format, and Apircurio Registry. We decoupled Kafka message key and value schemas and the schemas of data stored in S3 as CSV, storing those schemas in a registry.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR

Exploring Apache Spark with Apache Kafka using both batch queries and Spark Structured Streaming

Introduction

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. Using Structured Streaming, you can express your streaming computation the same way you would express a batch computation on static data. In this post, we will learn how to use Apache Spark and Spark Structured Streaming with Apache Kafka. Specifically, we will utilize Structured Streaming on Amazon EMR (fka Amazon Elastic MapReduce) with Amazon Managed Streaming for Apache Kafka (Amazon MSK). We will consume from and publish to Kafka using both batch and streaming queries. Spark jobs will be written in Python with PySpark for this post.

High-level AWS architecture for this post’s demonstration

Apache Spark

According to the documentation, Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python (PySpark), and R, and an optimized engine that supports general execution graphs. In addition, Spark supports a rich set of higher-level tools, including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Apache Spark and PySpark versus Apache Hive and Presto interest over time, according to Google Trends

Spark Structured Streaming

According to the documentation, Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end, exactly-once stream processing without the user having to reason about streaming.

Amazon EMR

According to the documentation, Amazon EMR (fka Amazon Elastic MapReduce) is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hadoop, Hive, HBase, Flink, and Hudi, and Presto. Amazon EMR is a fully managed AWS service that makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters.

A deployment option for Amazon EMR since December 2020, Amazon EMR on EKS, allows you to run Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). With the EKS deployment option, you can focus on running analytics workloads while Amazon EMR on EKS builds, configures, and manages containers for open-source applications.

If you are new to Amazon EMR for Spark, specifically PySpark, I recommend an earlier two-part series of posts, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

Apache Kafka

According to the documentation, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Amazon MSK

Apache Kafka clusters are challenging to set up, scale, and manage in production. According to the documentation, Amazon MSK is a fully managed AWS service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning and analytics applications.

Prerequisites

This post will focus primarily on configuring and running Apache Spark jobs on Amazon EMR. To follow along, you will need the following resources deployed and configured on AWS:

  1. Amazon S3 bucket (holds Spark resources and output);
  2. Amazon MSK cluster (using IAM Access Control);
  3. Amazon EKS container or an EC2 instance with the Kafka APIs installed and capable of connecting to Amazon MSK;
  4. Connectivity between the Amazon EKS cluster or EC2 and Amazon MSK cluster;
  5. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true; this setting is false by default;

As shown in the architectural diagram above, the demonstration uses three separate VPCs within the same AWS account and AWS Region, us-east-1, for Amazon EMR, Amazon MSK, and Amazon EKS. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports and the corresponding CIDR ranges within your Amazon EMR, Amazon MSK, and Amazon EKS Security Groups. For additional security and cost savings, use a VPC endpoint for private communications between Amazon EMR and Amazon S3.

Source Code

All source code for this post and the two previous posts in the Amazon MSK series, including the Python/PySpark scripts demonstrated here, are open-sourced and located on GitHub.

PySpark Scripts

According to the Apache Spark documentation, PySpark is an interface for Apache Spark in Python. It allows you to write Spark applications using Python API. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core.

There are nine Python/PySpark scripts covered in this post:

  1. Initial sales data published to Kafka
    01_seed_sales_kafka.py
  2. Batch query of Kafka
    02_batch_read_kafka.py
  3. Streaming query of Kafka using grouped aggregation
    03_streaming_read_kafka_console.py
  4. Streaming query using sliding event-time window
    04_streaming_read_kafka_console_window.py
  5. Incremental sales data published to Kafka
    05_incremental_sales_kafka.py
  6. Streaming query from/to Kafka using grouped aggregation
    06_streaming_read_kafka_kafka.py
  7. Batch query of streaming query results in Kafka
    07_batch_read_kafka.py
  8. Streaming query using static join and sliding window
    08_streaming_read_kafka_join_window.py
  9. Streaming query using static join and grouped aggregation 
    09_streaming_read_kafka_join.py

Amazon MSK Authentication and Authorization

Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For this post, the PySpark scripts use Kafka connection properties specific to IAM Access Control. You can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In a recent post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK:Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.

Language Choice

According to the latest Spark 3.1.2 documentation, Spark runs on Java 8/11, Scala 2.12, Python 3.6+, and R 3.5+. The Spark documentation contains code examples written in all four languages and provides sample code on GitHub for Scala, Java, Python, and R. Spark is written in Scala.

Spark language interest over time, according to Google Trends

There are countless posts and industry opinions on choosing the best language for Spark. Taking no sides, I have selected the language I use most frequently for data analytics, Python using PySpark. Compared to Scala, these two languages exhibit some of the significant differences: compiled versus interpreted, statically-typed versus dynamically-typed, JVM- versus non-JVM-based, Scala’s support for concurrency and true multi-threading, and Scala’s 10x raw performance versus the perceived ease-of-use, larger community, and relative maturity of Python.

Preparation

Amazon S3

We will start by gathering and copying the necessary files to your Amazon S3 bucket. The bucket will serve as the location for the Amazon EMR bootstrap script, additional JAR files required by Spark, PySpark scripts, CSV-format data files, and eventual output from the Spark jobs.

There are a small set of additional JAR files required by the Spark jobs we will be running. Download the JARs from Maven Central and GitHub, and place them in the emr_jars project directory. The JARs will include AWS MSK IAM Auth, AWS SDK, Kafka Client, Spark SQL for Kafka, Spark Streaming, and other dependencies.

cd ./pyspark/emr_jars/
wget https://github.com/aws/aws-msk-iam-auth/releases/download/1.1.0/aws-msk-iam-auth-1.1.0-all.jar
wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.28/bundle-2.17.28.jar
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.0/kafka-clients-2.8.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.12/3.1.2/spark-streaming_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.1.2/spark-tags_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.1.2/spark-token-provider-kafka-0-10_2.12-3.1.2.jar

Next, update the SPARK_BUCKET environment variable, then upload the JARs and all necessary project files from your copy of the GitHub project repository to your Amazon S3 bucket using the AWS s3 API.

cd ./pyspark/
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"

aws s3 cp emr_jars/ \
"s3://${SPARK_BUCKET}/jars/" --recursive
aws s3 cp pyspark_scripts/ \
"s3://${SPARK_BUCKET}/spark/" --recursive
aws s3 cp emr_bootstrap/ \
"s3://${SPARK_BUCKET}/spark/" --recursive
aws s3 cp data/ \
"s3://${SPARK_BUCKET}/spark/" --recursive

Amazon EMR

The GitHub project repository includes a sample AWS CloudFormation template and an associated JSON-format CloudFormation parameters file. The template, stack.yml, accepts several parameters. To match your environment, you will need to update the parameter values such as SSK key, Subnet, and S3 bucket. The template will build a minimally-sized Amazon EMR cluster with one master and two core nodes in an existing VPC. The template can be easily modified to meet your requirements and budget.

aws cloudformation deploy \
--stack-name spark-kafka-demo-dev \
--template-file ./cloudformation/stack.yml \
--parameter-overrides file://cloudformation/dev.json \
--capabilities CAPABILITY_NAMED_IAM

Whether you decide to use the CloudFormation template, two essential Spark configuration items in the EMR template are the list of applications to install and the bootstrap script deployment.

Applications:
Name: 'Hadoop'
Name: 'Spark'
Name: 'JupyterEnterpriseGateway'
Name: 'Livy'
BootstrapActions:
Name: bootstrap-script
ScriptBootstrapAction:
Path: !Join [ '', [ 's3://', !Ref ProjectBucket, '/spark/bootstrap_actions.sh' ] ]

Below, we see the EMR bootstrap shell script, bootstrap_actions.sh, deployed and executed on the cluster’s nodes.

#!/bin/bash
# Purpose: EMR bootstrap script
# Author: Gary A. Stafford
# Date: 2021-09-10
# arg passed in by CloudFormation
if [ $# -eq 0 ]
then
echo "No arguments supplied"
fi
SPARK_BUCKET=$1
# update yum packages, install jq
sudo yum update -y
sudo yum install -y jq
# jsk truststore for connecting to msk
sudo cp /usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/security/cacerts \
/tmp/kafka.client.truststore.jks
# set region for boto3
aws configure set region \
"$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)"
# install python packages for pyspark scripts
sudo python3 -m pip install boto3 botocore ec2-metadata
# install required jars for spark
sudo aws s3 cp \
"s3://${SPARK_BUCKET}/jars/" /usr/lib/spark/jars/ \
–recursive –exclude "*" –include "*.jar"
view raw bootstrap_actions.sh hosted with ❤ by GitHub

The script performed several tasks, including deploying the additional JAR files we copied to Amazon S3 earlier.

Amazon EMR cluster bootstrap actions tab

AWS Systems Manager Parameter Store

The PySpark scripts in this demonstration will obtain two parameters from the AWS Systems Manager (AWS SSM) Parameter Store. They include the Amazon MSK bootstrap brokers and the Amazon S3 bucket that contains the Spark assets. Using the Parameter Store ensures that no sensitive or environment-specific configuration is hard-coded into the PySpark scripts. Modify and execute the ssm_params.sh script to create two AWS SSM Parameter Store parameters.

aws ssm put-parameter \
--name /kafka_spark_demo/kafka_servers \
--type String \
--value "<b-1.your-brokers.kafka.us-east-1.amazonaws.com:9098,b-2.your-brokers.kafka.us-east-1.amazonaws.com:9098>" \
--description "Amazon MSK Kafka broker list" \
--overwrite

aws ssm put-parameter \
--name /kafka_spark_demo/kafka_demo_bucket \
--type String \
--value "<your-bucket-111222333444-us-east-1>" \
--description "Amazon S3 bucket" \
--overwrite

Spark Submit Options with Amazon EMR

Amazon EMR provides multiple options to run Spark jobs. The recommended method for PySpark scripts is to use Amazon EMR Steps from the EMR console or AWS CLI to submit work to Spark installed on an EMR cluster. In the console and CLI, you do this using a Spark application step, which runs the spark-submit script as a step on your behalf. With the API, you use a Step to invoke spark-submit using command-runner.jar. Alternately, you can SSH into the EMR cluster’s master node and run spark-submit. We will employ both techniques to run the PySpark jobs.

Securely Accessing Amazon MSK from Spark

Each of the PySpark scripts demonstrated in this post uses a common pattern for accessing Amazon MSK from Amazon EMR using IAM Authentication. Whether producing or consuming messages from Kafka, the same security-related options are used to configure Spark (starting at line 10, below). The details behind each option are outlined in the Security section of the Spark Structured Streaming + Kafka Integration Guide and the Configure clients for IAM access control section of the Amazon MSK IAM access control documentation.

options_read = {
"kafka.bootstrap.servers":
"<your_kafka_brokers>",
"subscribe":
"<your_topic_name>",
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
view raw read_options.json hosted with ❤ by GitHub

Data Source and Analysis Objective

For this post, we will continue to use data from PostgreSQL’s sample Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less than ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytical query costs.

payment_id customer_id amount payment_date city district country
16940 130 5.99 2021-05-08 21:21:56.996577 +00:00 guas Lindas de Gois Gois Brazil
16406 459 5.99 2021-05-08 21:22:59.996577 +00:00 Qomsheh Esfahan Iran
16315 408 6.99 2021-05-08 21:32:05.996577 +00:00 Jaffna Northern Sri Lanka
16185 333 7.99 2021-05-08 21:33:07.996577 +00:00 Baku Baki Azerbaijan
17097 222 9.99 2021-05-08 21:33:47.996577 +00:00 Jaroslavl Jaroslavl Russian Federation
16579 549 3.99 2021-05-08 21:36:33.996577 +00:00 Santiago de Compostela Galicia Spain
16050 269 4.99 2021-05-08 21:40:19.996577 +00:00 Salinas California United States
17126 239 7.99 2021-05-08 22:00:12.996577 +00:00 Ciomas West Java Indonesia
16933 126 7.99 2021-05-08 22:29:06.996577 +00:00 Po So Paulo Brazil
16297 399 8.99 2021-05-08 22:30:47.996577 +00:00 Okara Punjab Pakistan
view raw sales_seed.csv hosted with ❤ by GitHub

According to mastersindatascience.org, data analytics is “…the process of analyzing raw data to find trends and answer questions…” Using Spark, we can analyze the movie rental sales data as a batch or in near-real-time using Structured Streaming to answer different questions. For example, using batch computations on static data, we could answer the question, how do the current total all-time sales for France compare to the rest of Europe? Or, what were the total sales for India during August? Using streaming computations, we can answer questions like, what are the sales volumes for the United States during this current four-hour marketing promotional period? Or, are sales to North America beginning to slow as the Olympics are aired during prime time?

Data analytics — the process of analyzing raw data to find trends and answer questions. (mastersindatascience.org)

Batch Queries

Before exploring the more advanced topic of streaming computations with Spark Structured Streaming, let’s first use a simple batch query and a batch computation to consume messages from the Kafka topic, perform a basic aggregation, and write the output to both the console and Amazon S3.

PySpark Job 1: Initial Sales Data

Kafka supports Protocol Buffers, JSON Schema, and Avro. However, to keep things simple in this first post, we will use JSON. We will seed a new Kafka topic with an initial batch of 250 JSON-format messages. This first batch of messages represents previous online movie rental sale transaction records. We will use these sales transactions for both batch and streaming queries.

The PySpark script, 01_seed_sales_kafka.py, and the seed data file, sales_seed.csv, are both read from Amazon S3 by Spark, running on Amazon EMR. The location of the Amazon S3 bucket name and the Amazon MSK’s broker list values are pulled from AWS SSM Parameter Store using the parameters created earlier. The Kafka topic that stores the sales data, pagila.sales.spark.streaming, is created automatically by the script the first time it runs.

# Purpose: Batch write initial sales data from S3 to a new Kafka topic
# Author: Gary A. Stafford
# Date: 2021-09-22
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType
from pyspark.sql.window import Window
sales_data = "sales_seed.csv"
topic_output = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-seed-sales") \
.getOrCreate()
df_sales = read_from_csv(spark, params)
write_to_kafka(params, df_sales)
def read_from_csv(spark, params):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{sales_data}",
schema=schema, header=True, sep="|")
df_sales = update_payment_date(df_sales)
return df_sales
def write_to_kafka(params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
df_sales \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
def update_payment_date(df):
"""Update existing payment date to a current timestamp for streaming simulation"""
record_count = 250
window = Window.orderBy("payment_id")
df = df \
.drop("payment_date") \
.withColumn("index", F.row_number().over(window)) \
.withColumn("payment_date",
(F.unix_timestamp(F.current_timestamp())
(record_count F.col("index"))).cast(IntegerType())) \
.drop("index")
return df
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Update the two environment variables, then submit your first Spark job as an Amazon EMR Step using the AWS CLI and the emr API:

export CLUSTER_ID="<your-cluster-id>"
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-seed-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/01_seed_sales_kafka.py]"""
view raw kafka-seed-sales.sh hosted with ❤ by GitHub
Successfully adding a Step (Spark job) to the Amazon EMR cluster

From the Amazon EMR console, we should observe the Spark job has been completed successfully in about 30–90 seconds.

Amazon EMR Step (Spark job) completed successfully

The Kafka Consumer API allows applications to read streams of data from topics in the Kafka cluster. Using the Kafka Consumer API, from within a Kubernetes container running on Amazon EKS or an EC2 instance, we can observe that the new Kafka topic has been successfully created and that messages (initial sales data) have been published to the new Kafka topic.

export BBROKERS="b-1.your-cluster.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.kafka.us-east-1.amazonaws.com:9098, ..."
bin/kafka-console-consumer.sh \
--topic pagila.sales.spark.streaming \
--from-beginning \
--property print.key=true \
--property print.value=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.timestamp=true \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
Initial sales data as messages in Kafka topic

PySpark Job 2: Batch Query of Amazon MSK Topic

The PySpark script, 02_batch_read_kafka.py, performs a batch query of the initial 250 messages in the Kafka topic. When run, the PySpark script parses the JSON-format messages, then aggregates the data by both total sales and order count, by country, and finally, sorts by total sales.

window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.where(F.col("row") == 1).drop("row") \

The results are written to both the console as stdout and to Amazon S3 in CSV format.

# Purpose: Batch read Kafka topic, aggregate sales and orders by country,
# and output to console and Amazon S3 as CSV
# Author: Gary A. Stafford
# Date: 2021-09-22
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
topic_input = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-batch-sales") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(params, df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(params, df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_output = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.where(F.col("row") == 1).drop("row") \
.select("country", (F.format_number(F.col("sales"), 2)).alias("sales"), "orders") \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
df_output \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
df_output \
.write \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark_output/sales_by_country",
header=True, sep="|") \
.mode("overwrite")
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Again, submit this job as an Amazon EMR Step using the AWS CLI and the emr API:

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-batch-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/02_batch_read_kafka.py]"""
view raw batch_read_sales.sh hosted with ❤ by GitHub

To view the console output, click on ‘View logs’ in the Amazon EMR console, then click on the stdout logfile, as shown below.

Logs from successful Amazon EMR Step (Spark job)

The stdout logfile should contain the top 25 total sales and order counts, by country, based on the initial 250 sales records.

+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+------------------+------+------+
only showing top 25 rows

The PySpark script also wrote the same results to Amazon S3 in CSV format.

CSV file written to Amazon S3 as a result of the Spark job

The total sales and order count for 69 countries were computed, sorted, and coalesced into a single CSV file.

country sales orders
India 138.80 20
China 133.80 20
Mexico 106.86 14
Japan 100.86 14
Brazil 96.87 13
Russian Federation 94.87 13
United States 92.86 14
Nigeria 58.93 7
Philippines 58.92 8
South Africa 46.94 6
Argentina 42.93 7
Germany 39.96 4
Indonesia 38.95 5
Italy 35.95 5
Iran 33.95 5
South Korea 33.94 6
Poland 30.97 3
Pakistan 25.97 3
Taiwan 25.96 4
Mozambique 23.97 3
Vietnam 23.96 4
Ukraine 23.96 4
Venezuela 22.97 3
France 20.98 2
Peru 19.98 2
view raw batch_read_sales.csv hosted with ❤ by GitHub

Streaming Queries

To demonstrate streaming queries with Spark Structured Streaming, we will use a combination of two PySpark scripts. The first script, 03_streaming_read_kafka_console.py, will perform a streaming query and computation of messages in the Kafka topic, aggregating the total sales and number of orders. Concurrently, the second PySpark script, 04_incremental_sales_kafka.py, will read additional Pagila sales data from a CSV file located on Amazon S3 and write messages to the Kafka topic at a rate of two messages per second. The first script, 03_streaming_read_kafka_console.py, will stream aggregations in micro-batches of one-minute increments to the console. Spark Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small, batch jobs.

Note that this first script performs grouped aggregations as opposed to aggregations over a sliding event-time window. The aggregated results represent the total, all-time sales at a point in time, based on all the messages currently in the topic when the micro-batch was computed.

To follow along with this part of the demonstration, you can run the two Spark jobs as concurrent steps on the existing Amazon EMR cluster, or create a second EMR cluster, identically configured to the existing cluster, to run the second PySpark script, 04_incremental_sales_kafka.py. Using a second cluster, you can use a minimally-sized single master node cluster with no core nodes to save cost.

PySpark Job 3: Streaming Query to Console

The first PySpark scripts, 03_streaming_read_kafka_console.py, performs a streaming query of messages in the Kafka topic. The script then aggregates the data by both total sales and order count, by country, and finally, sorts by total sales.

.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select("country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \

The results are streamed to the console using the processingTime trigger parameter. A trigger defines how often a streaming query should be executed and emit new data. The processingTime parameter sets a trigger that runs a micro-batch query periodically based on the processing time (e.g. ‘5 minutes’ or ‘1 hour’). The trigger is currently set to a minimal processing time of one minute for ease of demonstration.

.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 25) \
# Purpose: Streaming read from Kafka topic and summarize top 25
# all-time total sales by country to the console every minute
# Author: Gary A. Stafford
# Date: 2021-09-08
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType, TimestampType
topic_input = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales-console") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select("country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \
.coalesce(1) \
.writeStream \
.queryName("streaming_to_console") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

For demonstration purposes, we will run the Spark job directly from the master node of the EMR Cluster. This method will allow us to easily view the micro-batches and associated logs events as they are output to the console. The console is normally used for testing purposes. Submitting the PySpark script from the cluster’s master node is an alternative to submitting an Amazon EMR Step. Connect to the master node of the Amazon EMR cluster using SSH, as the hadoop user:

export EMR_MASTER=<your-emr-master-dns.compute-1.amazonaws.com>
export EMR_KEY_PATH=path/to/key/<your-ssk-key.pem>
ssh -i ${EMR_KEY_PATH} hadoop@${EMR_MASTER}

Submit the PySpark script, 03_streaming_read_kafka_console.py, to Spark:

export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
spark-submit s3a://${SPARK_BUCKET}/spark/03_streaming_read_kafka_console.py

Before running the second PySpark script, 04_incremental_sales_kafka.py, let the first script run long enough to pick up the existing sales data in the Kafka topic. Within about two minutes, you should see the first micro-batch of aggregated sales results, labeled ‘Batch: 0’ output to the console. This initial micro-batch should contain the aggregated results of the existing 250 messages from Kafka. The streaming query’s first micro-batch results should be identical to the previous batch query results.

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+------------------+------+------+
only showing top 25 rows

Immediately below the batch output, there will be a log entry containing information about the batch. In the log entry snippet below, note the starting and ending offsets of the topic for the Spark job’s Kafka consumer group, 0 (null) to 250, representing the initial sales data.

{
"id" : "e0168615-dd39-4025-9811-c001a324ed58",
"runId" : "ed76fe07-032c-42ab-881c-57b44f561a29",
"name" : "streaming_to_console",
"timestamp" : "2021-09-08T17:37:58.116Z",
"batchId" : 0,
"numInputRows" : 250,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 14.104372355430183,
"durationMs" : {
"addBatch" : 12298,
"getBatch" : 39,
"latestOffset" : 4710,
"queryPlanning" : 542,
"triggerExecution" : 17724,
"walCommit" : 33
},
"stateOperators" : [ {
"numRowsTotal" : 136,
"numRowsUpdated" : 136,
"memoryUsedBytes" : 119008,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 61408
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[pagila.sales.spark.streaming]]",
"startOffset" : null,
"endOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 250
}
},
"numInputRows" : 250,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 14.104372355430183
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@13b8bba3",
"numOutputRows" : 68
}
}

PySpark Job 4: Incremental Sales Data

As described earlier, the second PySpark script, 04_incremental_sales_kafka.py, reads 1,800 additional sales records from a second CSV file located on Amazon S3, sales_incremental_large.csv. The script then publishes messages to the Kafka topic at a deliberately throttled rate of two messages per second. Concurrently, the first PySpark job, still running and performing a streaming query, will consume the new Kafka messages and stream aggregated total sales and orders in micro-batches of one-minute increments to the console over a period of about 15 minutes.

# Purpose: Batch write incremental sales data from S3 to a new Kafka topic
# Use a delay between each message to simulate real-time streaming data
# Author: Gary A. Stafford
# Date: 2021-09-26
import os
import time
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType
sales_data = "sales_incremental_large.csv"
topic_output = "pagila.sales.spark.streaming"
time_between_messages = 0.5 # 1800 messages * .5 seconds = ~15 minutes
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-incremental-sales") \
.getOrCreate()
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = read_from_csv(spark, params, schema)
df_sales.cache()
write_to_kafka(spark, params, df_sales)
def read_from_csv(spark, params, schema):
df_sales = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{sales_data}",
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp()) \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
df_message.show(1)
time.sleep(time_between_messages)
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Submit the second PySpark script as a concurrent Amazon EMR Step to the first EMR cluster, or submit as a step to the second Amazon EMR cluster.

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-incremental-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/04_incremental_sales_kafka.py]"""

The job sends a total of 1,800 messages to Kafka at a rate of two messages per second for 15 minutes. The total runtime of the job should be approximately 19 minutes, given a few minutes for startup and shutdown. Why run for so long? We want to make sure the job’s runtime will span multiple, overlapping, sliding event-time windows.

After about two minutes, return to the terminal output of the first Spark job, 03_streaming_read_kafka_console.py, running on the master node of the first cluster. As long as new messages are consumed every minute, you should see a new micro-batch of aggregated sales results stream to the console. Below we see an example of Batch 3, which reflects additional sales compared to Batch 0, shown previously. The results reflect the current all-time sales by country in real-time as the sales are published to Kafka.

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|China |473.35|65 |
|India |393.44|56 |
|Japan |292.60|40 |
|Mexico |262.64|36 |
|United States |252.65|35 |
|Russian Federation|243.65|35 |
|Brazil |220.69|31 |
|Philippines |191.75|25 |
|Indonesia |142.81|19 |
|South Africa |110.85|15 |
|Nigeria |108.86|14 |
|Argentina |89.86 |14 |
|Germany |85.89 |11 |
|Israel |68.90 |10 |
|Ukraine |65.92 |8 |
|Turkey |58.91 |9 |
|Iran |58.91 |9 |
|Saudi Arabia |56.93 |7 |
|Poland |50.94 |6 |
|Pakistan |50.93 |7 |
|Italy |48.93 |7 |
|French Polynesia |47.94 |6 |
|Peru |45.95 |5 |
|United Kingdom |45.94 |6 |
|Colombia |44.94 |6 |
+------------------+------+------+
only showing top 25 rows

If we fast forward to a later micro-batch, sometime after the second incremental sales job is completed, we should see the top 25 aggregated sales by country of 2,050 messages — 250 seed plus 1,800 incremental messages.

-------------------------------------------
Batch: 20
-------------------------------------------
+------------------+--------+------+
|country |sales |orders|
+------------------+--------+------+
|China |1,379.05|195 |
|India |1,338.10|190 |
|United States |915.69 |131 |
|Mexico |855.80 |120 |
|Japan |831.88 |112 |
|Russian Federation|723.95 |105 |
|Brazil |613.12 |88 |
|Philippines |528.27 |73 |
|Indonesia |381.46 |54 |
|Turkey |350.52 |48 |
|Argentina |298.57 |43 |
|Nigeria |294.61 |39 |
|South Africa |279.61 |39 |
|Taiwan |221.67 |33 |
|Germany |199.73 |27 |
|United Kingdom |196.75 |25 |
|Poland |182.77 |23 |
|Spain |170.77 |23 |
|Ukraine |160.79 |21 |
|Iran |160.76 |24 |
|Italy |156.79 |21 |
|Pakistan |152.78 |22 |
|Saudi Arabia |146.81 |19 |
|Venezuela |145.79 |21 |
|Colombia |144.78 |22 |
+------------------+--------+------+
only showing top 25 rows

Compare the informational output below for Batch 20 to Batch 0, previously. Note the starting offset of the Kafka consumer group on the topic is 1986, and the ending offset is 2050. This is because all messages have been consumed from the topic and aggregated. If additional messages were streamed to Kafka while the streaming job is still running, additional micro-batches would continue to be streamed to the console every one minute.

"sources" : [ {
"description" : "KafkaV2[Subscribe[pagila.sales.spark.streaming]]",
"startOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 1986
}
},
"endOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 2050
}
},
"numInputRows" : 64,
"inputRowsPerSecond" : 1.0666666666666667,
"processedRowsPerSecond" : 13.772326231977619
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@13b8bba3",
"numOutputRows" : 105
}

PySpark Job 5: Aggregations over Sliding Event-time Window

In the previous example, we analyzed total all-time sales in real-time (e.g., show me the current, total, all-time sales for France compared to the rest of Europe, at regular intervals). This approach is opposed to sales made during a sliding event-time window (e.g., are the total sales for the United States trending better during this current four-hour marketing promotional period than the previous promotional period). In many cases, real-time sales during a distinct period or event window is probably a more commonly tracked KPI than total all-time sales.

If we add a sliding event-time window to the PySpark script, we can easily observe the total sales and order counts made during the sliding event-time window in real-time.

.withWatermark("timestamp", "10 minutes") \
.groupBy("country",
F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("window").desc(),
F.col("sum(amount)").desc()) \

Windowed totals would not include sales (messages) present in the Kafka topic before the streaming query beginning, nor in previous sliding windows. Constructing the correct query always starts with a clear understanding of the question you are trying to answer.

Below, in the abridged console output of the micro-batch from the script, 05_streaming_read_kafka_console_window.py, we see the results of three ten-minute sliding event-time windows with a five-minute overlap. The sales and order totals represent the volume sold during that window, with this micro-batch falling within the active current window, 19:30 to 19:40 UTC.

——————————————-
Batch: 14
——————————————-
+————————————-+——+——+——————-+——————-+
|country |sales |orders|start |end |
+————————————-+——+——+——————-+——————-+
|India |286.60|40 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|China |285.61|39 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|United States |205.69|31 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Japan |189.74|26 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Russian Federation |182.74|26 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Philippines |163.77|23 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Mexico |159.76|24 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Brazil |155.77|23 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Argentina |118.84|16 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Indonesia |82.88 |12 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|India |600.13|87 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|China |509.27|73 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|United States |416.42|58 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Japan |329.56|44 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Mexico |311.54|46 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Russian Federation |301.55|45 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Brazil |256.64|36 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Philippines |219.67|33 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Turkey |171.76|24 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Argentina |159.78|22 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|China |353.53|47 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|India |264.62|38 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Japan |191.74|26 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|United States |173.77|23 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Mexico |159.77|23 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Russian Federation |148.78|22 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Philippines |132.83|17 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Brazil |123.82|18 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Indonesia |103.86|14 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|South Africa |63.91 |9 |2021-09-08 19:20:00|2021-09-08 19:30:00|
+————————————-+——+——+——————-+——————-+

Plotting the total sales over time using sliding event-time windows, we will observe the results do not reflect a running total. Total sales only accumulate within a sliding window.

Cumulative sales within a 5-minute sliding event-time windows

Compare these results to the results of the previous script, whose total sales reflect a running total.

Running total of sales (no sliding windows)

PySpark Job 6: Streaming Query from/to Amazon MSK

The PySpark script, 06_streaming_read_kafka_kafka.py, performs the same streaming query and grouped aggregation as the previous script, 03_streaming_read_kafka_console.py. However, instead of outputting results to the console, the results of this job will be written to a new Kafka topic on Amazon MSK.

.format("kafka") \
.options(**options_write) \
.option("checkpointLocation", "/checkpoint/kafka/") \
# Purpose: Streaming read from Kafka topic and aggregate
# sales and orders by country to Kafka every minute
# Author: Gary A. Stafford
# Date: 2021-09-08
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType,
StringType, FloatType, TimestampType
topic_input = "pagila.sales.spark.streaming.in"
topic_output = "pagila.sales.spark.streaming.out"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales-kafka") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(params, df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select(F.sha1("country").alias("id"),
"country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \
.coalesce(1) \
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.trigger(processingTime="1 minute") \
.queryName("streaming_to_kafka") \
.outputMode("complete") \
.format("kafka") \
.options(**options_write) \
.option("checkpointLocation", "/checkpoint/kafka/") \
.start()
ds_sales.awaitTermination()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Repeat the same process used with the previous script. Re-run the seed data script, 01_seed_sales_kafka.py, but update the input topic to a new name, such as pagila.sales.spark.streaming.in. Next, run the new script, 06_streaming_read_kafka_kafka.py. Give the script time to start and consume the 250 seed messages from Kafka. Then, update the input topic name and re-run the incremental data PySpark script, 04_incremental_sales_kafka.py, concurrent to the new script on the same cluster or run on the second cluster.

When run, the script, 06_streaming_read_kafka_kafka.py, will continuously consume messages from the new pagila.sales.spark.streaming.in topic and publish grouped aggregation results to a new topic, pagila.sales.spark.streaming.out.

Use the Kafka Consumer API to view new messages as the Spark job publishes them in near real-time to Kafka.

export BBROKERS="b-1.your-cluster.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.kafka.us-east-1.amazonaws.com:9098, ..."
bin/kafka-console-consumer.sh \
--topic pagila.sales.spark.streaming.out \
--from-beginning \
--property print.key=true \
--property print.value=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.timestamp=true \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
Aggregated sales results (messages) being published to Kafka by streaming Spark job

PySpark Job 7: Batch Query of Streaming Results from MSK

When run, the previous script produces Kafka messages containing non-windowed sales aggregations to the Kafka topic every minute. Using the next PySpark script, 07_batch_read_kafka.py, we can consume those aggregated messages using a batch query and display the most recent sales totals to the console. Each country’s most recent all-time sales totals and order counts should be identical to the previous script’s results, representing the aggregation of all 2,050 Kafka messages — 250 seed plus 1,800 incremental messages.

To get the latest total sales by country, we will consume all the messages from the output topic, group the results by country, find the maximum (max) value from the sales column for each country, and finally, display the results sorted sales in descending order.

window = Window.partitionBy("country") \
.orderBy(F.col("timestamp").desc())
.withColumn("row", F.row_number().over(window)) \
.where(F.col("row") == 1).drop("row") \
.select("country", "sales", "orders") \
# Purpose: Batch read Kafka output topic and display
# top 25 total sales by country to console
# Author: Gary A. Stafford
# Date: 2021-09-09
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, TimestampType
from pyspark.sql.window import Window
topic_input = "pagila.sales.spark.streaming.out"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-batch-sales") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(df_sales)
def read_from_kafka(spark, params):
schema = StructType([
StructField("country", StringType(), False),
StructField("sales", StringType(), False),
StructField("orders", IntegerType(), False),
StructField("start", TimestampType(), False),
StructField("end", TimestampType(), True),
])
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
window = Window.partitionBy("country").orderBy(F.col("timestamp").desc())
df_sales = spark.read \
.format("kafka") \
.options(**options_read) \
.load() \
.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
.select("data.*", "timestamp") \
.withColumn("row", F.row_number().over(window)) \
.where(F.col("row") == 1).drop("row") \
.select("country", "sales", "orders") \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
return df_sales
def summarize_sales(df_sales):
df_sales \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Writing the top 25 results to the console, we should see the same results as we saw in the final micro-batch (Batch 20, shown above) of the PySpark script, 03_streaming_read_kafka_console.py.

+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|India |948.63|190 |
|China |936.67|195 |
|United States |915.69|131 |
|Mexico |855.80|120 |
|Japan |831.88|112 |
|Russian Federation|723.95|105 |
|Brazil |613.12|88 |
|Philippines |528.27|73 |
|Indonesia |381.46|54 |
|Turkey |350.52|48 |
|Argentina |298.57|43 |
|Nigeria |294.61|39 |
|South Africa |279.61|39 |
|Taiwan |221.67|33 |
|Germany |199.73|27 |
|United Kingdom |196.75|25 |
|Poland |182.77|23 |
|Spain |170.77|23 |
|Ukraine |160.79|21 |
|Iran |160.76|24 |
|Italy |156.79|21 |
|Pakistan |152.78|22 |
|Saudi Arabia |146.81|19 |
|Venezuela |145.79|21 |
|Colombia |144.78|22 |
+------------------+------+------+
only showing top 25 rows

PySpark Job 8: Streaming Query with Static Join and Sliding Window

The PySpark script, 08_streaming_read_kafka_join_window.py, performs the same streaming query and computations over sliding event-time windows as the previous script, 05_streaming_read_kafka_console_window.py. However, instead of totaling sales and orders by country, the script totals by sales and orders sales region. A sales region is composed of multiple countries in the same geographical area. The PySpark script reads in a static list of sales regions and countries from Amazon S3, sales_regions.csv.

country region
Afghanistan Asia & Pacific
Aland Islands Europe
Albania Europe
Algeria Arab States
American Samoa Asia & Pacific
Andorra Europe
Angola Africa
Anguilla Latin America
Antarctica Asia & Pacific
view raw sales_regions.csv hosted with ❤ by GitHub

The script then performs a join operation between the results of the streaming query and the static list of regions, joining on country. Using the join, the streaming sales data from Kafka is enriched with the sales category. Any sales record whose country does not have an assigned sales region is categorized as ‘Unassigned.’

.join(df_regions, on=["country"], how="leftOuter") \
.na.fill("Unassigned") \

Sales and orders are then aggregated by sales region, and the top 25 are output to the console every minute.

# Purpose: Streaming read from Kafka topic, join with static data,
# and aggregate in windows by sales region to the console every minute
# Show 24 = 8 regions x 3 windows
# Author: Gary A. Stafford
# Date: 2021-09-08
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
topic_input = "pagila.sales.spark.streaming.region.53"
regions_data = "sales_regions.csv"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales-join") \
.getOrCreate()
df_regions = read_from_csv(spark, params)
df_regions.cache()
df_sales = read_from_kafka(spark, params)
summarize_sales(df_sales, df_regions)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def read_from_csv(spark, params):
schema = StructType([
StructField("country", StringType(), False),
StructField("region", StringType(), False)
])
df_sales = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{regions_data}",
schema=schema, header=True, sep=",")
return df_sales
def summarize_sales(df_sales, df_regions):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
.select("data.*", "timestamp") \
.join(df_regions, on=["country"], how="leftOuter") \
.na.fill("Unassigned") \
.withWatermark("timestamp", "10 minutes") \
.groupBy("region", F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("window").desc(), F.col("sum(amount)").desc()) \
.select(F.col("region").alias("sales_region"),
F.format_number(F.col("sum(amount)"), 2).alias("sales"),
F.col("count(amount)").alias("orders"),
F.from_unixtime("window_start", format="yyyy-MM-dd HH:mm").alias("window_start"),
F.from_unixtime("window_end", format="yyyy-MM-dd HH:mm").alias("window_end")) \
.coalesce(1) \
.writeStream \
.queryName("streaming_regional_sales") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 24) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

To run the job, repeat the previous process of renaming the topic (e.g., pagila.sales.spark.streaming.region), then running the initial sales data job, this script, and finally, concurrent with this script, the incremental sales data job. Below, we see a later micro-batch output to the console from the Spark job. We see three sets of sales results, by sales region, from three different ten-minute sliding event-time windows with a five-minute overlap.

——————————————-
Batch: 20
——————————————-
+————–+——–+——+—————-+—————-+
|sales_region |sales |orders|start |end |
+————–+——–+——+—————-+—————-+
|Asia & Pacific|936.66 |134 |2021-09-08 21:35|2021-09-08 21:45|
|Europe |537.28 |72 |2021-09-08 21:35|2021-09-08 21:45|
|Latin America |399.41 |59 |2021-09-08 21:35|2021-09-08 21:45|
|North America |176.72 |28 |2021-09-08 21:35|2021-09-08 21:45|
|Middle east |101.85 |15 |2021-09-08 21:35|2021-09-08 21:45|
|Africa |99.86 |14 |2021-09-08 21:35|2021-09-08 21:45|
|Unassigned |50.92 |8 |2021-09-08 21:35|2021-09-08 21:45|
|Arab States |36.96 |4 |2021-09-08 21:35|2021-09-08 21:45|
+————–+——–+——+—————-+—————-+
|Asia & Pacific|2,271.78|322 |2021-09-08 21:30|2021-09-08 21:40|
|Europe |1,199.38|162 |2021-09-08 21:30|2021-09-08 21:40|
|Latin America |1,122.40|160 |2021-09-08 21:30|2021-09-08 21:40|
|North America |390.38 |62 |2021-09-08 21:30|2021-09-08 21:40|
|Africa |325.54 |46 |2021-09-08 21:30|2021-09-08 21:40|
|Middle east |212.69 |31 |2021-09-08 21:30|2021-09-08 21:40|
|Unassigned |118.83 |17 |2021-09-08 21:30|2021-09-08 21:40|
|Arab States |82.89 |11 |2021-09-08 21:30|2021-09-08 21:40|
+————–+——–+——+—————-+—————-+
|Asia & Pacific|2,667.23|377 |2021-09-08 21:25|2021-09-08 21:35|
|Europe |1,416.03|197 |2021-09-08 21:25|2021-09-08 21:35|
|Latin America |1,197.28|172 |2021-09-08 21:25|2021-09-08 21:35|
|Africa |475.35 |65 |2021-09-08 21:25|2021-09-08 21:35|
|North America |435.37 |63 |2021-09-08 21:25|2021-09-08 21:35|
|Middle east |272.62 |38 |2021-09-08 21:25|2021-09-08 21:35|
|Unassigned |172.75 |25 |2021-09-08 21:25|2021-09-08 21:35|
|Arab States |127.83 |17 |2021-09-08 21:25|2021-09-08 21:35|
+————–+——–+——+—————-+—————-+

PySpark Script 9: Static Join with Grouped Aggregations

As a comparison, we can exclude the sliding event-time window operations from the previous streaming query script, 08_streaming_read_kafka_join_window.py, to obtain the current, total, all-time sales by sales region. See the script, 09_streaming_read_kafka_join.py, in the project repository for details.

-------------------------------------------
Batch: 20
-------------------------------------------
+--------------+--------+------+
|sales_region |sales |orders|
+--------------+--------+------+
|Asia & Pacific|5,780.88|812 |
|Europe |3,081.74|426 |
|Latin America |2,545.34|366 |
|Africa |1,029.59|141 |
|North America |997.57 |143 |
|Middle east |541.23 |77 |
|Unassigned |352.47 |53 |
|Arab States |244.68 |32 |
+--------------+--------+------+

Conclusion

In this post, we learned how to get started with Spark Structured Streaming on Amazon EMR. First, we explored how to run jobs written in Python with PySpark on Amazon EMR as Steps and directly from the EMR cluster’s master node. Next, we discovered how to produce and consume messages with Apache Kafka on Amazon MSK, using batch and streaming queries. Finally, we learned about aggregations over a sliding event-time window compared to grouped aggregations and how Structured Streaming queries are processed using a micro-batch.

In a subsequent post, we will learn how to use Apache Avro and the Apicurio Registry with PySpark on Amazon EMR to read and write Apache Avro format messages to Amazon MSK.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , , , , , ,

Leave a comment

Hydrating a Data Lake using Log-based Change Data Capture (CDC) with Debezium, Apicurio, and Kafka Connect on AWS

Import data from Amazon RDS into Amazon S3 using Amazon MSK, Apache Kafka Connect, Debezium, Apicurio Registry, and Amazon EKS

Introduction

In the last post, Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS, we utilized Kafka Connect to export data from an Amazon RDS for PostgreSQL relational database and import the data into a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 was converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect. To improve data freshness, as data was added or updated in the PostgreSQL database, Kafka Connect automatically detected those changes and streamed them into the data lake using query-based Change Data Capture (CDC).

This follow-up post will examine log-based CDC as a marked improvement over query-based CDC to continuously stream changes from the PostgreSQL database to the data lake. We will perform log-based CDC using Debezium’s Kafka Connect Source Connector for PostgreSQL rather than Confluent’s Kafka Connect JDBC Source connector, which was used in the previous post for query-based CDC. We will store messages as Apache Avro in Kafka running on Amazon Managed Streaming for Apache Kafka (Amazon MSK). Avro message schemas will be stored in Apicurio Registry. The schema registry will run alongside Kafka Connect on Amazon Elastic Kubernetes Service (Amazon EKS).

High-level architecture for this post’s demonstration

Change Data Capture

According to Gunnar Morling, Principal Software Engineer at Red Hat, who works on the Debezium and Hibernate projects, and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

Joker 2021: Change data capture pipelines with Debezium and Kafka Streams (image: YouTube)

You can find another excellent explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.

Query-based vs. Log-based CDC

To demonstrate the high-level differences between query-based and log-based CDC, let’s examine the results of a simple SQL UPDATE statement captured with both CDC methods.

UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;

Here is how that change is represented as a JSON message payload using the query-based CDC method described in the previous post.

{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}

Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.

{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}

Avro and Schema Registry

Apache Avro is a compact, fast, binary data format, according to the documentation. Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic scripting languages since data, together with its schema, is fully self-describing.

We can decouple the data from its schema by using schema registries like the Confluent Schema Registry or Apicurio Registry. According to Apicurio, in a messaging and event streaming architecture, data published to topics and queues must often be serialized or validated using a schema (e.g., Apache Avro, JSON Schema, or Google Protocol Buffers). Of course, schemas can be packaged in each application. Still, it is often a better architectural pattern to register schemas in an external system [schema registry] and then reference them from each application.

It is often a better architectural pattern to register schemas in an external system and then reference them from each application.

Using Debezium’s PostgreSQL source connector, we will store changes from the PostgreSQL database’s write-ahead log (WAL) as Avro in Kafka, running on Amazon MSK. The message’s schema will be stored separately in Apicurio Registry as opposed to with the message, thus reducing the size of the messages in Kafka and allowing for schema validation and schema evolution.

Apicurio Registry showing versions of the pagila.public.film schema

Debezium

Debezium, according to their website, continuously monitors your databases and lets any of your applications stream every row-level change in the same order they were committed to the database. Event streams can be used to purge caches, update search indexes, generate derived views and data, and keep other data sources in sync. Debezium is a set of distributed services that capture row-level changes in your databases. Debezium records all row-level changes committed to each database table in a transaction log. Then, each application reads the transaction logs they are interested in, and they see all of the events in the same order in which they occurred. Debezium is built on top of Apache Kafka and integrates with Kafka Connect.

The latest version of Debezium includes support for monitoring MySQL database servers, MongoDB replica sets or sharded clusters, PostgreSQL servers, and SQL Server databases. We will be using Debezium’s PostgreSQL connector to capture row-level changes in the Pagila PostgreSQL database. According to Debezium’s documentation, the first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content committed to the database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.

Prerequisites

Similar to the previous post, this post will focus on data movement, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:

  1. Amazon RDS for PostgreSQL instance (data source);
  2. Amazon S3 bucket (data sink);
  3. Amazon MSK cluster;
  4. Amazon EKS cluster;
  5. Connectivity between the Amazon RDS instance and Amazon MSK cluster;
  6. Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
  7. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true. This setting is false by default;
  8. IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);

As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.

Source Code

All source code for this post and the previous post, including the Kafka Connect and connector configuration files and the Helm charts, is open-sourced and located on GitHub.GitHub — garystafford/kafka-connect-msk-demo: For the post, Hydrating a Data Lake using Change Data…
For the post, Hydrating a Data Lake using Change Data Capture (CDC), Apache Kafka, and Kubernetes on AWS — GitHub …github.com

Authentication and Authorization

Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK:Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM
Securely decoupling Go-based microservices on Amazon EKS using Amazon MSK with IRSA, SASL/SCRAM, and data encryptionitnext.io

Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (known as IRSA) allows EKS to access MSK and S3 using IAM (see more details below).

Sample PostgreSQL Database

For this post, we will continue to use PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytical query costs.

Pagila database schema diagram

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.

Last Updated Trigger

Each table in the Pagila database has a last_update field. A simplistic way to detect changes in the Pagila database is to use the last_update field. This is a common technique to determine if and when changes were made to data using query-based CDC, as demonstrated in the previous post. As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.

CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();

Kafka Connect and Schema Registry

There are several options for deploying and managing Kafka Connect, the Kafka management APIs and command-line tools, and the Apicurio Registry. I prefer deploying a containerized solution to Kubernetes on Amazon EKS. Some popular containerized Kafka options include Strimzi, Confluent for Kubernetes (CFK), and Debezium. Another option is building your own Docker Image using the official Apache Kafka binaries. I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries for this post. I then installed the necessary Confluent and Debezium connectors and their associated Java dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf container, building your own image will teach you how Kafka, Kafka Connect, and Debezium work, in my opinion.

In regards to the schema registry, both Confluent and Apicurio offer containerized solutions. Apicurio has three versions of their registry, each with a different storage mechanism: in-memory, SQL, and Kafka. Since we already have an existing Amazon RDS PostgreSQL instance as part of the demonstration, I chose the Apicurio SQL-based registry Docker Image for this post, apicurio/apicurio-registry-sql:2.0.1.Final.

If you choose to use the same Kafka Connect and Apicurio solution I used in this post, a Helm Chart is included in the post’s GitHub repository, kafka-connect-msk-v2. The Helm chart will deploy a single Kubernetes pod to the kafka Namespace on Amazon EKS. The pod comprises both the Kafka Connect and Apicurio Registry containers. The deployment is intended for demonstration purposes and is not designed for use in Production.

apiVersion: v1
kind: Service
metadata:
name: kafka-connect-msk
spec:
type: NodePort
selector:
app: kafka-connect-msk
ports:
- port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.1.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent
- image: apicurio/apicurio-registry-sql:2.0.1.Final
name: apicurio-registry-mem
imagePullPolicy: IfNotPresent
env:
- name: REGISTRY_DATASOURCE_URL
value: jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/apicurio-registry
- name: REGISTRY_DATASOURCE_USERNAME
value: apicurio_registry
- name: REGISTRY_DATASOURCE_PASSWORD
value: 1L0v3Kafka!

Before deploying the chart, create a new PostgreSQL database, user, and grants on your RDS instance for the Apicurio Registry to use for storage:

CREATE DATABASE "apicurio-registry";
CREATE USER apicurio_registry WITH PASSWORD '1L0v3KafKa!';

GRANT CONNECT, CREATE ON DATABASE "apicurio-registry" to apicurio_registry;

Update the Helm chart’s value.yaml file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName) and your RDS URL (registryDatasourceUrl). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on Amazon MSK and to your S3 bucket from Kafka Connect running on Amazon EKS.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kafka-cluster:*",
"Resource": [
"arn:aws:kafka:us-east-1:111222333444:cluster/*/*",
"arn:aws:kafka:us-east-1:111222333444:group/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:transactional-id/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:topic/*/*/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>/*"
}
]
}

Once the variables are updated, use the following command to deploy the Helm chart:

helm install kafka-connect-msk-v2 ./kafka-connect-msk-v2 \
--namespace $NAMESPACE --create-namespace

Confirm the chart was installed successfully by checking the pod’s status:

kubectl get pods -n kafka -l app=kafka-connect-msk
View of the pod running both containers successfully with no errors

If you have any issues with either container while deploying, review the individual container’s logs:

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl logs $KAFKA_CONTAINER -n kafka kafka-connect-msk
kubectl logs $KAFKA_CONTAINER -n kafka apicurio-registry-mem

Kafka Connect

Get a shell to the running Kafka Connect container using the kubectl exec command:

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -c kafka-connect-msk -- bash
Interacting with Kafka Connect container running on EKS

Confirm Access to Registry from Kafka Connect

If the Helm Chart was deployed successfully, you should now observe 11 new tables in the public schema of the new apicurio-registry database. Below, we see the new database and tables, as shown in pgAdmin.

Confirm the registry is running and accessible from the Kafka Connect container by calling the registry’s system/info REST API endpoint:

curl -s http://localhost:8080/apis/registry/v2/system/info | jq
Calling Apicurio Registry’s REST API from Kafka Connect container

The Apicurio Registry’s Service targets TCP port 8080. The Service is exposed on the Kubernetes worker node’s external IP address at a static port, the NodePort. To get the NodePort of the service, use the following command:

kubectl describe services kafka-client-msk -n kafka

To access the Apicurio Registry’s web-based UI, add the NodePort to the Security Group of the EKS nodes with the source being your IP address, a /32 CIDR block.

To get the external IP address (EXTERNAL-IP) of any Amazon EKS worker nodes, use the following command:

kubectl get nodes -o wide

Use the <NodeIP>:<NodePort> combination to access the UI from your web browser, for example, http://54.237.41.128:30433. The registry will be empty at this point in the demonstration.

Apicurio Registry UI

Configure Bootstrap Brokers

Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone or distributed modes. Since we will be using Kafka Connect’s distributed mode, modify the config/connect-distributed.properties file. A complete sample of the configuration file I used in this post is shown below.

Kafka Connect and the schema registry will run on Amazon EKS, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:

# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')

Update the config/connect-distributed.properties file.

# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

For convenience when executing Kafka commands, set the BBROKERS environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:

export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"

Confirm Access to Amazon MSK from Kafka Connect

To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties

You can also try listing the existing Kafka consumer groups:

bin/kafka-consumer-groups.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties

If either of these fails, you likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.

Once configured, start Kafka Connect as a background process.

Kafka Connect

bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &

To confirm Kafka Connect starts properly, immediately tail the connect.log file. The log will capture any startup errors for troubleshooting.

tail -f logs/connect.log
Kafka Connect log showing Kafka Connect starting as a background process

You can also examine the background process with the ps command to confirm Kafka Connect is running. Note the process with PID 4915, shown below. Use the kill command along with the PID to stop Kafka Connect if necessary.

Kafka Connect running as a background process

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, when Kafka Connect starts up. The topics are defined in the config/connect-distributed.properties file: connect-configs, connect-offsets, and connect-status. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-

Kafka Connect Connectors

This post demonstrates the use of a set of Kafka Connect source and sink connectors. The source connector is based on the Debezium Source Connector for PostgreSQL and the Apicurio Registry. The sink connector is based on the Confluent Amazon S3 Sink connector and the Apicurio Registry.

Connector Source

Create or modify the file, config/debezium_avro_source_connector_postgresql_05.json. Update lines 3–6, as shown below, to reflect your RDS instance connection details.

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "your-pagila-database-url.us-east-1.rds.amazonaws.com",
"database.port": "5432",
"database.user": "your-username",
"database.password": "your-password",
"database.dbname": "pagila",
"database.server.name": "pagila",
"table.include.list": "public.actor,public.film,public.film_actor,public.category,public.film_category,public.language",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}

The source connector exports existing data and ongoing changes from six related tables within the Pagila database’s public schema: actor , film, film_actor , category, film_category, and language. Data will be imported into a corresponding set of six new Kafka topics: pagila.public.actor, pagila.public.film, and so forth. (see line 9, above).

Schema diagram showing six tables to be exported

Data from the tables is stored in Apache Avro format in Kafka, and the schemas are stored separately in the Apicurio Registry (lines 11–18, above).

Connector Sink

Create or modify the file, config/s3_sink_connector_05_debezium_avro.json. Update line 7, as shown below to reflect your Amazon S3 bucket’s name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "pagila.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "your-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 300,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "gzip",
"schema.compatibility": "NONE",
"behavior.on.null.values": "ignore",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}

The sink connector flushes new data to S3 every 300 records or 60 seconds from the six Kafka topics (lines 4–5, 9–10, above). The schema for the data being written to S3 is extracted from the Apicurio Registry (lines 17–24, above).

The sink connector optimizes the raw data imported into S3 for downstream processing by writing GZIP-compressed Apache Parquet files to Amazon S3. Using Parquet’s columnar file format and file compression should help optimize ELT against the raw data once in S3 (lines 12–13, above).

Deploy Connectors

Deploy the source and sink connectors using the Kafka Connect REST Interface:

curl -s -d @"config/debezium_avro_source_connector_postgresql_05.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/debezium_avro_source_connector_postgresql_05/config | jq
curl -s -d @"config/s3_sink_connector_05_debezium_avro.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_05_debezium_avro/config | jq

Confirming the Deployment

Use the following commands to confirm the new set of connectors are deployed and running correctly.

curl -s -X GET http://localhost:8083/connectors | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/debezium_avro_source_connector_postgresql_05/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_05_debezium_avro/status | jq
Kafka Connect source and sink connectors running successfully

The items stored in Apicurio Registry, such as event schemas and API designs, are known as registry artifacts. If we re-visit the Apicurio Registry’s UI, we should observe 12 artifacts — a ‘key’ and ‘value’ artifact for each of the six tables we exported from the Pagila database.

Examing the Amazon S3, you should note six sets of S3 objects within the /topics/ object key prefix organized by topic name.

Amazon S3 bucket showing results of Kafka Connect S3 sink connector, organized by topic names

Within each topic name key, there should be a set of GZIP-compressed Parquet files.

Amazon S3 bucket showing GZIP-compressed Apache Parquet-format files

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the AWS CLI with the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.film/partition=0/pagila.public.film+0+0000000000.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

In the sample data below, note the metadata-rich structure of the log-based CDC messages as compared to the query-based messages we observed in the previous post:

{
"after": {
"special_features": [
"Deleted Scenes",
"Behind the Scenes"
],
"rental_duration": 6,
"rental_rate": 0.99,
"release_year": 2006,
"length": 86,
"replacement_cost": 20.99,
"rating": "PG",
"description": "A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies",
"language_id": 1,
"title": "ACADEMY DINOSAUR",
"original_language_id": null,
"last_update": "2017-09-10T17:46:03.905795Z",
"film_id": 1
},
"source": {
"schema": "public",
"sequence": "[null,\"1177089474560\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177089474560,
"name": "pagila",
"txId": 18422,
"version": "1.6.1.Final",
"ts_ms": 1629340334432,
"snapshot": "true",
"db": "pagila",
"table": "film"
},
"op": "r",
"ts_ms": 1629340334434
}

Database Changes with Log-based CDC

What happens when we change data within the tables that Debezium and Kafka Connect are monitoring? To answer this question, let’s make a few DML changes to the Pagila database: inserts, updates, and deletes:

INSERT INTO public.category (name)
VALUES ('Techno Thriller');
UPDATE public.film
SET release_year = 2021,
rental_rate = 2.99
WHERE film_id = 1;
UPDATE public.film
SET rental_duration = 3
WHERE film_id = 2;
UPDATE public.film_category
SET category_id = (
SELECT DISTINCT category_id
FROM public.category
WHERE name = 'Techno Thriller')
WHERE film_id = 3;
UPDATE public.actor
SET first_name = upper('Kate'),
last_name = upper('Winslet')
WHERE actor_id = 6;
DELETE
FROM public.film_actor
WHERE film_id = 375;

To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The Kafka Connect source connector detects changes, which are then exported from PostgreSQL to Kafka. The sink connector then writes these changes to Amazon S3.

Kafka Connect log showing changes to Pagila database being exported/imported

We can view the S3 bucket, which should now have new Parquet files corresponding to our changes. For example, the two updates we made to the film record with film_id of 1. Note the operation is an update ("op": "u") and the presence of the data in after block.

{
"after": {
"special_features": [
"Deleted Scenes",
"Behind the Scenes"
],
"rental_duration": 6,
"rental_rate": 2.99,
"release_year": 2021,
"length": 86,
"replacement_cost": 20.99,
"rating": "PG",
"description": "A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies",
"language_id": 1,
"title": "ACADEMY DINOSAUR",
"original_language_id": null,
"last_update": "2021-08-19T03:19:57.073053Z",
"film_id": 1
},
"source": {
"schema": "public",
"sequence": "[\"1177693455424\",\"1177693455424\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177693471392,
"name": "pagila",
"txId": 18445,
"version": "1.6.1.Final",
"ts_ms": 1629343197100,
"snapshot": "false",
"db": "pagila",
"table": "film"
},
"op": "u",
"ts_ms": 1629343197389
}

In another example, we see the delete made in the film_actor table, to the record with the film_id of 375. Note the operation is a delete ("op": "d") and the presence of the before block but no after block.

{
"before": {
"last_update": "1970-01-01T00:00:00Z",
"actor_id": 5,
"film_id": 375
},
"source": {
"schema": "public",
"sequence": "[\"1177693516520\",\"1177693516520\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177693516520,
"name": "pagila",
"txId": 18449,
"version": "1.6.1.Final",
"ts_ms": 1629343198400,
"snapshot": "false",
"db": "pagila",
"table": "film_actor"
},
"op": "d",
"ts_ms": 1629343198426
}

Debezium Event Flattening SMT

The challenge with the Debezium message structure shown above in S3 is the verbosity of the payload and the nested nature of the data. As a result, developing SQL queries against such records would be difficult. For example, given the message structure shown above, even the simplest query in Amazon Athena becomes significantly more complex:

SELECT after.actor_id, after.first_name, after.last_name, after.last_update
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY after.actor_id
ORDER BY after.last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_actor") AS x
WHERE x.row_num = 1
ORDER BY after.actor_id;

To specifically address the needs of different consumers, Debezium offers the event flattening single message transformation (SMT). The event flattening transformation is a Kafka Connect SMT. We covered Kafka Connect SMTs in the previous post. Using the event flattening SMT, we can shape the message received by Kafka to be more attuned to the specific consumers of our data lake. To implement the event flattening SMT, modify and redeploy the source connector, adding additional configuration (lines 19–23, below).

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "your-pagila-database-url.us-east-1.rds.amazonaws.com",
"database.port": "5432",
"database.user": "your-username",
"database.password": "your-password",
"database.dbname": "pagila",
"database.server.name": "pagila",
"table.include.list": "public.actor,public.film,public.film_actor,public.category,public.film_category,public.language",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,db,table,schema,lsn,source.ts_ms"
}

We will include the op, db, schema, lsn, and source.ts_ms metadata fields, along with the actual record data (table) in the transformed message. This means we have chosen to exclude all other fields from the messages. The transform will flatten the message’s nested structure.

Making this change to the message structure by adding the transformation results in new versions of the message’s schemas automatically being added to the Apicurio Registry by the source connector:

Apicurio Registry showing revised versions of the pagila.public.film schema

As a result of the event flattening SMT by the source connector, our message structure is significantly simplified:

{
"actor_id": 7,
"first_name": "BOB",
"last_name": "MOSTEL",
"last_update": "2021-08-19T21:01:55.090858Z",
"__op": "u",
"__db": "pagila",
"__schema": "public",
"__table": "actor",
"__lsn": 1191920555344,
"__source_ts_ms": 1629406915091,
"__deleted": "false"
}

Note the new __deleted field, which results from lines 21–22 of the source connector configuration, shown above. Debezium keeps tombstone records for DELETE operations in the event stream and adds __deleted , set to true or false. Below, we see an example of two DELETE operations on the film_actor table.

{
"actor_id": 52,
"film_id": 376,
"last_update": "1970-01-01T00:00:00Z",
"__op": "d",
"__db": "pagila",
"__schema": "public",
"__table": "film_actor",
"__lsn": 1192390296016,
"__source_ts_ms": 1629408869556,
"__deleted": "true"
}
{
"actor_id": 60,
"film_id": 376,
"last_update": "1970-01-01T00:00:00Z",
"__op": "d",
"__db": "pagila",
"__schema": "public",
"__table": "film_actor",
"__lsn": 1192390298976,
"__source_ts_ms": 1629408869556,
"__deleted": "true"
}

Viewing Data in the Data Lake

A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

AWS Glue Data Catalog (metastore) showing six new tables

With the data crawled and cataloged in Glue, let’s perform some additional changes to the Pagila database’s film table.

UPDATE public.film
SET release_year = 2019,
rental_rate = 3.99
WHERE film_id = 1;

UPDATE public.film
SET rental_duration = 4
WHERE film_id = 2;

UPDATE public.film
SET rental_duration = 7
WHERE film_id = 2;
INSERT INTO public.category (name)
VALUES ('Steampunk');
UPDATE public.film_category
SET category_id = (
SELECT DISTINCT category_id
FROM public.category
WHERE name = 'Steampunk')
WHERE film_id = 3;
UPDATE public.film
SET release_year = 2017,
rental_rate = 3.99
WHERE film_id = 4;
UPDATE public.film_actor
SET film_id = 100
WHERE film_id = 5;

UPDATE public.film_category
SET film_id = 100
WHERE film_id = 5;

UPDATE public.inventory
SET film_id = 100
WHERE film_id = 5;

DELETE
FROM public.film
WHERE film_id = 5;

We should be able to almost immediately observe these database changes by executing a query with Amazon Athena. The changes are propagated from PostgreSQL to Kafka to S3 within seconds or less by Kafka Connect based on the connector configurations. Performing a typical query in Athena will return all of the original records as well as any updates or deletes we made as duplicate records (records identical film_id primary keys).

SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp
FROM "pagila_kafka_connect"."pagila_public_film"
ORDER BY film_id, timestamp
Amazon Athena showing SQL query and the result set with duplicate records

Note the original records as well as each change we made earlier. The timestamp field, derived from the __source_ts_ms metadata field represents the server time at which the transaction was committed, according to Debezium. Also, note the records with their film_id of 5 in the query results — the record we deleted from the film table. The field values are (mostly) null in the latest record, except for any fields with default values in the Pagila table definition. If there are default values (e.g., rental_duration smallint default 3 not null or rental_rate numeric(4,2) default 4.99 not null) set on a field, those values end up in the deleted record when using the event flattening SMT. It doesn’t negatively impact anything except adding additional size to a tombstone record (unclear if this is expected behavior with Debezium or an artifact of the WAL entry).

film_id title release_year rental_rate rental_duration timestamp
1 ACADEMY DINOSAUR 2021 2.99 6 2021-08-20 01:43:37
1 ACADEMY DINOSAUR 2019 3.99 6 2021-08-20 02:41:32
2 ACE GOLDFINGER 2006 4.99 3 2021-08-20 02:49:17
2 ACE GOLDFINGER 2006 4.99 4 2021-08-20 02:49:33
2 ACE GOLDFINGER 2006 4.99 7 2021-08-20 02:49:33
3 ADAPTATION HOLES 2006 2.99 7 2021-08-20 01:43:37
4 AFFAIR PREJUDICE 2006 2.99 5 2021-08-20 01:43:37
4 AFFAIR PREJUDICE 2017 3.99 5 2021-08-20 02:55:23
5 AFRICAN EGG 2006 2.99 6 2021-08-20 01:43:37
5 4.99 3 2021-08-20 03:00:49
view raw films_query.csv hosted with ❤ by GitHub

To view only the most current data and ignore deleted records, we can use the ROW_NUMBER() function and add a predicate to check the value of the __deleted field:

SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY film_id
ORDER BY __source_ts_ms DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_film") AS x
WHERE x.row_num = 1
AND __deleted != 'true'
ORDER BY film_id
Amazon Athena showing SQL query and the result set with the latest records

Now we only see the latest records, including the removal of any deleted records. Although this method is effective for a single set of records, the query is far too intricate to apply to complex joins and aggregations, in my opinion.

film_id title release_year rental_rate rental_duration timestamp
1 ACADEMY DINOSAUR 2019 3.99 6 2021-08-20 02:41:32
2 ACE GOLDFINGER 2006 4.99 7 2021-08-20 02:49:33
3 ADAPTATION HOLES 2006 2.99 7 2021-08-20 01:43:37
4 AFFAIR PREJUDICE 2017 3.99 5 2021-08-20 02:55:23

Data Movement

Using Amazon Athena, we can easily write the results of our ROW_NUMBER() query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT (CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

High-level architecture for this post’s demonstration

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by the movie rating, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.

CREATE TABLE pagila_kafka_connect.pagila_public_film_refined
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['rating'],
external_location='s3://my-s3-table/refined/film/'
) AS
SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp, rating
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY film_id
ORDER BY __source_ts_ms DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_film") AS x
WHERE x.row_num = 1
AND __deleted = 'false'
ORDER BY film_id
Amazon Athena showing CTAS statement and the resulting new table to the left

Examing the Amazon S3 bucket, again, you should observe a new set of S3 objects within the /refined/film/ key path, partitioned by rating.

Amazon S3 bucket showing results of CTAS statement

We should also see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS statement. We can perform additional queries on the refined dataset.

SELECT *
FROM "pagila_kafka_connect"."pagila_public_film_refined"
ORDER BY film_id
Amazon Athena showing query results from the refined film data

CRUD Operations in the Data Lake

To fully take advantage of CDC and maximize the freshness of data in the data lake, we would need to also adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level upserts and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.

Conclusion

This post explored how log-based CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon MSK, Amazon EKS, Apache Kafka Connect, Debezium, Apache Avro, and Apicurio Registry. In a subsequent post, we will learn how data lake file formats like Apache Hudi, Apache Iceberg, and Delta Lake, along with Apache Spark Structured Streaming, can help us actively manage the data in our data lake.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS

Import data from an Amazon RDS database into an Amazon S3-based data lake using Amazon EKS, Amazon MSK, and Apache Kafka Connect

Introduction

A data lake, according to AWS, is a centralized repository that allows you to store all your structured and unstructured data at any scale. Data is collected from multiple sources and moved into the data lake. Once in the data lake, data is organized, cataloged, transformed, enriched, and converted to common file formats, optimized for analytics and machine learning.

One of an organization’s first challenges when building a data lake is how to continually import data from different data sources, such as relational and non-relational database engines, enterprise ERP, SCM, CRM, and SIEM software, flat-files, messaging platforms, IoT devices, and logging and metrics collection systems. Each data source will have its own unique method of connectivity, security, data storage format, and data export capabilities. There are many closed- and open-source tools available to help extract data from different data sources.

A popular open-source tool is Kafka Connect, part of the Apache Kafka ecosystem. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. Kafka Connect makes it simple to quickly define connectors that move large collections of data into and out of Kafka.

In the following post, we will learn how to use Kafka Connect to export data from our data source, an Amazon RDS for PostgreSQL relational database, into Kafka. We will then export that data from Kafka into our data sink — a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 will be converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect.

Best of all, to maintain data freshness of the data lake, as data is added or updated in PostgreSQL, Kafka Connect will automatically detect those changes and stream those changes into the data lake. This process is commonly referred to as Change Data Capture (CDC).

High-level architecture for this post’s demonstration

Change Data Capture

According to Gunnar Morling, Principal Software Engineer at Red Hat who works on the Debezium and Hibernate projects and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

Joker 2021: Change data capture pipelines with Debezium and Kafka Streams (image: YouTube)

You can find another good explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.

Query-based vs. Log-based CDC

To effectively demonstrate the difference between query-based and log-based CDC, examine the results of a SQL UPDATE statement, captured with both methods.

UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;

Here is how the change is represented as a JSON message payload using the query-based CDC method described in this post.

{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}

Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.

{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}

In an upcoming post, we will explore Debezium along with Apache Arvo and a schema registry to build a log-based CDC solution using PostgreSQL’s write-ahead log (WAL). In this post, we will examine query-based CDC using the ‘update timestamp’ technique.

Kafka Connect Connectors

In this post, we will use source and sink connectors from Confluent. Confluent is the undisputed leader in providing enterprise-grade managed Kafka through their Confluent Cloud and Confluent Platform products. Confluent offers dozens of source and sink connectors that cover the most popular data sources and sinks. Connectors used in this post will include:

  • Confluent’s Kafka Connect JDBC Source connector imports data from any relational database with a JDBC driver into an Apache Kafka topic. The Kafka Connect JDBC Sink connector exports data from Kafka topics to any relational database with a JDBC driver.
  • Confluent’s Kafka Connect Amazon S3 Sink connector exports data from Apache Kafka topics to S3 objects in either Avro, Parquet, JSON, or Raw Bytes.

Prerequisites

This post will focus on data movement with Kafka Connect, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:

  1. Amazon RDS for PostgreSQL instance (data source);
  2. Amazon S3 bucket (data sink);
  3. Amazon MSK cluster;
  4. Amazon EKS cluster;
  5. Connectivity between the Amazon RDS instance and Amazon MSK cluster;
  6. Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
  7. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true. This setting is false by default;
  8. IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);

As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.

Source Code

All source code for this post, including the Kafka Connect configuration files and the Helm chart, is open-sourced and located on GitHub.

Authentication and Authorization

Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK, Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.

Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (IRSA) allows EKS to access MSK and S3 using IAM (see more details below).

Sample PostgreSQL Database

There are many sample PostgreSQL databases we could use to explore Kafka Connect. One of my favorite, albeit a bit dated, is PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytics costs.

Pagila database schema diagram

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.

Last Updated Trigger

Each table in the Pagila database has a last_update field. A convenient way to detect changes in the Pagila database, and ensure those changes make it from RDS to S3, is to have Kafka Connect use the last_update field. This is a common technique to determine if and when changes were made to data using query-based CDC.

As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.

CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();

Kubernetes-based Kafka Connect

There are several options for deploying and managing Kafka Connect and other required Kafka management tools to Kubernetes on Amazon EKS. Popular solutions include Strimzi and Confluent for Kubernetes (CFK) or building your own Docker Image using the official Apache Kafka binaries. For this post, I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries. I then installed Confluent’s connectors and their dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf OSS container, building your own image can really teach you how Kafka and Kafka Connect work, in my opinion.

If you chose to use the same Kafka Connect Image used in this post, a Helm Chart is included in the post’s GitHub repository. The Helm chart will deploy a single Kubernetes pod to the kafka Namespace on Amazon EKS.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.0.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent

Before deploying the chart, update the value.yaml file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on MSK and to S3 from Kafka Connect running on EKS.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kafka-cluster:*",
"Resource": [
"arn:aws:kafka:us-east-1:111222333444:cluster/*/*",
"arn:aws:kafka:us-east-1:111222333444:group/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:transactional-id/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:topic/*/*/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>/*"
}
]
}

Once the Service Account variable is updated, use the following command to deploy the Helm chart:

helm install kafka-connect-msk ./kafka-connect-msk \
--namespace $NAMESPACE --create-namespace

To get a shell to the running Kafka Connect container, use the following kubectl exec command:

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash
Interacting with Kafka Connect container running on EKS

Configure Bootstrap Brokers

Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone and distributed modes. Since we will use Kafka Connect’s distributed mode, modify the config/connect-distributed.properties file. A complete sample of the configuration file I used in this post is shown below.

Kafka Connect will run within the pod’s container, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:

# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')

Update the config/connect-distributed.properties file.

# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

For convenience when executing Kafka commands, set the BBROKERS environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:

export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"

Confirm Access to Amazon MSK from Kafka Connect

To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties

You can also try listing the existing Kafka consumer groups:

bin/kafka-consumer-groups.sh --list \
  --bootstrap-server $BBROKERS \
  --command-config config/client-iam.properties

If either of these fails, you will likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.

Kafka Connect

I recommend starting Kafka Connect as a background process using either method shown below.

bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
# alternately use nohup
nohup bin/connect-distributed.sh \
config/connect-distributed.properties &

To confirm Kafka Connect started properly, immediately tail the connect.log file. The log will capture any startup errors for troubleshooting.

tail -f logs/connect.log
Kafka Connect log showing Kafka Connect starting as a background process

You can also examine the background process with the ps command to confirm Kafka Connect is running. Note the process with PID 4915, below. Use the kill command along with the PID to stop Kafka Connect if necessary.

Kafka Connect running as a background process

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, the first time it starts up, as defined in the config/connect-distributed.properties file: connect-configs, connect-offsets, and connect-status. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-

Kafka Connect Connectors

This post demonstrates three progressively more complex Kafka Connect source and sink connectors. Each will demonstrate different connector capabilities to import/export and transform data between Amazon RDS for PostgreSQL and Amazon S3.

Connector Source #1

Create a new file (or modify the existing file if using my Kafka Connect container) named config/jdbc_source_connector_postgresql_00.json. Modify lines 3–5, as shown below, to reflect your RDS instance’s JDBC connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.public.",
"poll.interval.ms": 5000,
"mode": "timestamp",
"catalog.pattern": "public",
"table.whitelist": "address, city, country",
"timestamp.column.name": "last_update"
}

This first Kafka Connect source connector uses Confluent’s Kafka Connect JDBC Source connector (io.confluent.connect.jdbc.JdbcSourceConnector) to export data from RDS with a JDBC driver and import that data into a series of Kafka topics. We will be exporting data from three tables in Pagila’s public schema: address, city, and country. We will write that data to a series of topics, arbitrarily prefixed with database name and schema, pagila.public.. The source connector will create the three new topics automatically: pagila.public.address , pagila.public.city , and pagila.public.country.

Note the connector’s mode property value is set to timestamp, and the last_update field is referenced in the timestamp.column.name property. Recall we added the database function and triggers to these three tables earlier in the post, which will update the last_update field whenever a record is created or updated in the Pagila database. In addition to an initial export of the entire table, the source connector will poll the database every 5 seconds (poll.interval.ms property), looking for changes that are newer than the most recently exported last_modified date. This is accomplished by the source connector, using a parameterized query, such as:

SELECT *
FROM "public"."address"
WHERE "public"."address"."last_update" > ?
AND "public"."address"."last_update" < ?
ORDER BY "public"."address"."last_update" ASC

Connector Sink #1

Next, create and configure the first Kafka Connect sink connector. Create a new file or modify config/s3_sink_connector_00.json. Modify line 7, as shown below to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "pagila.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "your-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}

This first Kafka Connect sink connector uses Confluent’s Kafka Connect Amazon S3 Sink connector (io.confluent.connect.s3.S3SinkConnector) to export data from Kafka topics to Amazon S3 objects in JSON format.

Deploy Connectors #1

Deploy the source and sink connectors using the Kafka Connect REST Interface. Many tutorials demonstrate a POST method against the /connectors endpoint. However, this then requires a DELETE and an additional POST to update the connector. Using a PUT against the /config endpoint, you can update the connector without first issuing a DELETE.

curl -s -d @"config/jdbc_source_connector_postgresql_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/config | jq
curl -s -d @"config/s3_sink_connector_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_00/config | jq

You can confirm the source and sink connectors are deployed and running using the following commands:

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_00/status | jq
Kafka Connect source connector running successfully

Errors preventing the connector from starting correctly will be displayed using the /status endpoint, as shown in the example below. In this case, the Kubernetes Service Account associated with the pod lacked the proper IAM permissions to the Amazon S3 target bucket.

Kafka Connect sink connector failed to run with errors

Confirming Success of Connectors #1

The entire contents of the three tables will be exported from RDS to Kafka by the source connector, then exported from Kafka to S3 by the sink connector. To confirm the source connector worked, verify the existence of three new Kafka topics that should have been created: pagila.public.address, pagila.public.city, and pagila.public.country.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep pagila.public.

To confirm the sink connector worked, verify the new S3 objects have been created in the data lake’s S3 bucket. If you use the AWS CLI v2’s s3 API, we can view the contents of our target S3 bucket:

aws s3api list-objects \
--bucket your-s3-bucket \
--query 'Contents[].{Key: Key}' \
--output text

You should see approximately 15 new S3 objects (JSON files) in the S3 bucket, whose keys are organized by their topic names. The sink connector flushes new data to S3 every 100 records, or 60 seconds.

topics/pagila.public.address/partition=0/pagila.public.address+0+0000000000.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000200.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000300.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000400.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000500.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000600.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000000.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000100.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000200.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000300.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000400.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000500.json
topics/pagila.public.country/partition=0/pagila.public.country+0+0000000000.json
topics/pagila.public.country/partition=0/pagila.public.country+0+0000000100.json

You could also use the AWS Management Console to view the S3 bucket’s contents.

Amazon S3 bucket showing results of Kafka Connect S3 sink connector, organized by topic names

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

For example, the address table’s data will look similar to the following using the ‘Query with S3 Select’ feature via the console or API:

{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000
}

Congratulations, you have successfully imported data from a relational database into your data lake using Kafka Connect!

Connector Source #2

Create a new file or modify config/jdbc_source_connector_postgresql_01.json. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.public.alt.",
"poll.interval.ms": 5000,
"mode": "timestamp",
"timestamp.column.name": "last_update",
"catalog.pattern": "public",
"table.whitelist": "address",
"numeric.mapping": "best_fit",
"transforms": "createKey,extractInt,InsertTopic,InsertSourceDetails",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "address_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "address_id",
"validate.non.null": "false",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertSourceDetails.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field": "message_source",
"transforms.InsertSourceDetails.static.value": "pagila"
}

This second Kafka Connect source connector also uses Confluent’s Kafka Connect JDBC Source connector to export data from the just address table with a JDBC driver and import that data into a new Kafka topic, pagila.public.alt.address. The difference with this source connector is