Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR

Exploring Apache Spark with Apache Kafka using both batch queries and Spark Structured Streaming

Introduction

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. Using Structured Streaming, you can express your streaming computation the same way you would express a batch computation on static data. In this post, we will learn how to use Apache Spark and Spark Structured Streaming with Apache Kafka. Specifically, we will utilize Structured Streaming on Amazon EMR (fka Amazon Elastic MapReduce) with Amazon Managed Streaming for Apache Kafka (Amazon MSK). We will consume from and publish to Kafka using both batch and streaming queries. Spark jobs will be written in Python with PySpark for this post.

High-level AWS architecture for this post’s demonstration

Apache Spark

According to the documentation, Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python (PySpark), and R, and an optimized engine that supports general execution graphs. In addition, Spark supports a rich set of higher-level tools, including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Apache Spark and PySpark versus Apache Hive and Presto interest over time, according to Google Trends

Spark Structured Streaming

According to the documentation, Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end, exactly-once stream processing without the user having to reason about streaming.

Amazon EMR

According to the documentation, Amazon EMR (fka Amazon Elastic MapReduce) is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hadoop, Hive, HBase, Flink, and Hudi, and Presto. Amazon EMR is a fully managed AWS service that makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters.

A deployment option for Amazon EMR since December 2020, Amazon EMR on EKS, allows you to run Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). With the EKS deployment option, you can focus on running analytics workloads while Amazon EMR on EKS builds, configures, and manages containers for open-source applications.

If you are new to Amazon EMR for Spark, specifically PySpark, I recommend an earlier two-part series of posts, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

Apache Kafka

According to the documentation, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Amazon MSK

Apache Kafka clusters are challenging to set up, scale, and manage in production. According to the documentation, Amazon MSK is a fully managed AWS service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning and analytics applications.

Prerequisites

This post will focus primarily on configuring and running Apache Spark jobs on Amazon EMR. To follow along, you will need the following resources deployed and configured on AWS:

  1. Amazon S3 bucket (holds Spark resources and output);
  2. Amazon MSK cluster (using IAM Access Control);
  3. Amazon EKS container or an EC2 instance with the Kafka APIs installed and capable of connecting to Amazon MSK;
  4. Connectivity between the Amazon EKS cluster or EC2 and Amazon MSK cluster;
  5. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true; this setting is false by default;

As shown in the architectural diagram above, the demonstration uses three separate VPCs within the same AWS account and AWS Region, us-east-1, for Amazon EMR, Amazon MSK, and Amazon EKS. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports and the corresponding CIDR ranges within your Amazon EMR, Amazon MSK, and Amazon EKS Security Groups. For additional security and cost savings, use a VPC endpoint for private communications between Amazon EMR and Amazon S3.

Source Code

All source code for this post and the two previous posts in the Amazon MSK series, including the Python/PySpark scripts demonstrated here, are open-sourced and located on GitHub.

PySpark Scripts

According to the Apache Spark documentation, PySpark is an interface for Apache Spark in Python. It allows you to write Spark applications using Python API. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core.

There are nine Python/PySpark scripts covered in this post:

  1. Initial sales data published to Kafka
    01_seed_sales_kafka.py
  2. Batch query of Kafka
    02_batch_read_kafka.py
  3. Streaming query of Kafka using grouped aggregation
    03_streaming_read_kafka_console.py
  4. Streaming query using sliding event-time window
    04_streaming_read_kafka_console_window.py
  5. Incremental sales data published to Kafka
    05_incremental_sales_kafka.py
  6. Streaming query from/to Kafka using grouped aggregation
    06_streaming_read_kafka_kafka.py
  7. Batch query of streaming query results in Kafka
    07_batch_read_kafka.py
  8. Streaming query using static join and sliding window
    08_streaming_read_kafka_join_window.py
  9. Streaming query using static join and grouped aggregation 
    09_streaming_read_kafka_join.py

Amazon MSK Authentication and Authorization

Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For this post, the PySpark scripts use Kafka connection properties specific to IAM Access Control. You can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In a recent post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK:Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.

Language Choice

According to the latest Spark 3.1.2 documentation, Spark runs on Java 8/11, Scala 2.12, Python 3.6+, and R 3.5+. The Spark documentation contains code examples written in all four languages and provides sample code on GitHub for Scala, Java, Python, and R. Spark is written in Scala.

Spark language interest over time, according to Google Trends

There are countless posts and industry opinions on choosing the best language for Spark. Taking no sides, I have selected the language I use most frequently for data analytics, Python using PySpark. Compared to Scala, these two languages exhibit some of the significant differences: compiled versus interpreted, statically-typed versus dynamically-typed, JVM- versus non-JVM-based, Scala’s support for concurrency and true multi-threading, and Scala’s 10x raw performance versus the perceived ease-of-use, larger community, and relative maturity of Python.

Preparation

Amazon S3

We will start by gathering and copying the necessary files to your Amazon S3 bucket. The bucket will serve as the location for the Amazon EMR bootstrap script, additional JAR files required by Spark, PySpark scripts, CSV-format data files, and eventual output from the Spark jobs.

There are a small set of additional JAR files required by the Spark jobs we will be running. Download the JARs from Maven Central and GitHub, and place them in the emr_jars project directory. The JARs will include AWS MSK IAM Auth, AWS SDK, Kafka Client, Spark SQL for Kafka, Spark Streaming, and other dependencies.

cd ./pyspark/emr_jars/
wget https://github.com/aws/aws-msk-iam-auth/releases/download/1.1.0/aws-msk-iam-auth-1.1.0-all.jar
wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.28/bundle-2.17.28.jar
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.0/kafka-clients-2.8.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.12/3.1.2/spark-streaming_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.1.2/spark-tags_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.1.2/spark-token-provider-kafka-0-10_2.12-3.1.2.jar

Next, update the SPARK_BUCKET environment variable, then upload the JARs and all necessary project files from your copy of the GitHub project repository to your Amazon S3 bucket using the AWS s3 API.

cd ./pyspark/
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"

aws s3 cp emr_jars/ \
"s3://${SPARK_BUCKET}/jars/" --recursive
aws s3 cp pyspark_scripts/ \
"s3://${SPARK_BUCKET}/spark/" --recursive
aws s3 cp emr_bootstrap/ \
"s3://${SPARK_BUCKET}/spark/" --recursive
aws s3 cp data/ \
"s3://${SPARK_BUCKET}/spark/" --recursive

Amazon EMR

The GitHub project repository includes a sample AWS CloudFormation template and an associated JSON-format CloudFormation parameters file. The template, stack.yml, accepts several parameters. To match your environment, you will need to update the parameter values such as SSK key, Subnet, and S3 bucket. The template will build a minimally-sized Amazon EMR cluster with one master and two core nodes in an existing VPC. The template can be easily modified to meet your requirements and budget.

aws cloudformation deploy \
--stack-name spark-kafka-demo-dev \
--template-file ./cloudformation/stack.yml \
--parameter-overrides file://cloudformation/dev.json \
--capabilities CAPABILITY_NAMED_IAM

Whether you decide to use the CloudFormation template, two essential Spark configuration items in the EMR template are the list of applications to install and the bootstrap script deployment.

Applications:
– Name: 'Hadoop'
– Name: 'Spark'
– Name: 'JupyterEnterpriseGateway'
– Name: 'Livy'
BootstrapActions:
– Name: bootstrap-script
ScriptBootstrapAction:
Path: !Join [ '', [ 's3://', !Ref ProjectBucket, '/spark/bootstrap_actions.sh' ] ]

Below, we see the EMR bootstrap shell script, bootstrap_actions.sh, deployed and executed on the cluster’s nodes.

#!/bin/bash
# Purpose: EMR bootstrap script
# Author: Gary A. Stafford
# Date: 2021-09-10
# arg passed in by CloudFormation
if [ $# -eq 0 ]
then
echo "No arguments supplied"
fi
SPARK_BUCKET=$1
# update yum packages, install jq
sudo yum update -y
sudo yum install -y jq
# jsk truststore for connecting to msk
sudo cp /usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/security/cacerts \
/tmp/kafka.client.truststore.jks
# set region for boto3
aws configure set region \
"$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)"
# install python packages for pyspark scripts
sudo python3 -m pip install boto3 botocore ec2-metadata
# install required jars for spark
sudo aws s3 cp \
"s3://${SPARK_BUCKET}/jars/" /usr/lib/spark/jars/ \
–recursive –exclude "*" –include "*.jar"

The script performed several tasks, including deploying the additional JAR files we copied to Amazon S3 earlier.

Amazon EMR cluster bootstrap actions tab

AWS Systems Manager Parameter Store

The PySpark scripts in this demonstration will obtain two parameters from the AWS Systems Manager (AWS SSM) Parameter Store. They include the Amazon MSK bootstrap brokers and the Amazon S3 bucket that contains the Spark assets. Using the Parameter Store ensures that no sensitive or environment-specific configuration is hard-coded into the PySpark scripts. Modify and execute the ssm_params.sh script to create two AWS SSM Parameter Store parameters.

aws ssm put-parameter \
--name /kafka_spark_demo/kafka_servers \
--type String \
--value "<b-1.your-brokers.kafka.us-east-1.amazonaws.com:9098,b-2.your-brokers.kafka.us-east-1.amazonaws.com:9098>" \
--description "Amazon MSK Kafka broker list" \
--overwrite

aws ssm put-parameter \
--name /kafka_spark_demo/kafka_demo_bucket \
--type String \
--value "<your-bucket-111222333444-us-east-1>" \
--description "Amazon S3 bucket" \
--overwrite

Spark Submit Options with Amazon EMR

Amazon EMR provides multiple options to run Spark jobs. The recommended method for PySpark scripts is to use Amazon EMR Steps from the EMR console or AWS CLI to submit work to Spark installed on an EMR cluster. In the console and CLI, you do this using a Spark application step, which runs the spark-submit script as a step on your behalf. With the API, you use a Step to invoke spark-submit using command-runner.jar. Alternately, you can SSH into the EMR cluster’s master node and run spark-submit. We will employ both techniques to run the PySpark jobs.

Securely Accessing Amazon MSK from Spark

Each of the PySpark scripts demonstrated in this post uses a common pattern for accessing Amazon MSK from Amazon EMR using IAM Authentication. Whether producing or consuming messages from Kafka, the same security-related options are used to configure Spark (starting at line 10, below). The details behind each option are outlined in the Security section of the Spark Structured Streaming + Kafka Integration Guide and the Configure clients for IAM access control section of the Amazon MSK IAM access control documentation.

Data Source and Analysis Objective

For this post, we will continue to use data from PostgreSQL’s sample Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less than ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytical query costs.

payment_id customer_id amount payment_date city district country
16940 130 5.99 2021-05-08 21:21:56.996577 +00:00 guas Lindas de Gois Gois Brazil
16406 459 5.99 2021-05-08 21:22:59.996577 +00:00 Qomsheh Esfahan Iran
16315 408 6.99 2021-05-08 21:32:05.996577 +00:00 Jaffna Northern Sri Lanka
16185 333 7.99 2021-05-08 21:33:07.996577 +00:00 Baku Baki Azerbaijan
17097 222 9.99 2021-05-08 21:33:47.996577 +00:00 Jaroslavl Jaroslavl Russian Federation
16579 549 3.99 2021-05-08 21:36:33.996577 +00:00 Santiago de Compostela Galicia Spain
16050 269 4.99 2021-05-08 21:40:19.996577 +00:00 Salinas California United States
17126 239 7.99 2021-05-08 22:00:12.996577 +00:00 Ciomas West Java Indonesia
16933 126 7.99 2021-05-08 22:29:06.996577 +00:00 Po So Paulo Brazil
16297 399 8.99 2021-05-08 22:30:47.996577 +00:00 Okara Punjab Pakistan
view raw sales_seed.csv hosted with ❤ by GitHub

According to mastersindatascience.org, data analytics is “…the process of analyzing raw data to find trends and answer questions…” Using Spark, we can analyze the movie rental sales data as a batch or in near-real-time using Structured Streaming to answer different questions. For example, using batch computations on static data, we could answer the question, how do the current total all-time sales for France compare to the rest of Europe? Or, what were the total sales for India during August? Using streaming computations, we can answer questions like, what are the sales volumes for the United States during this current four-hour marketing promotional period? Or, are sales to North America beginning to slow as the Olympics are aired during prime time?

Data analytics — the process of analyzing raw data to find trends and answer questions. (mastersindatascience.org)

Batch Queries

Before exploring the more advanced topic of streaming computations with Spark Structured Streaming, let’s first use a simple batch query and a batch computation to consume messages from the Kafka topic, perform a basic aggregation, and write the output to both the console and Amazon S3.

PySpark Job 1: Initial Sales Data

Kafka supports Protocol Buffers, JSON Schema, and Avro. However, to keep things simple in this first post, we will use JSON. We will seed a new Kafka topic with an initial batch of 250 JSON-format messages. This first batch of messages represents previous online movie rental sale transaction records. We will use these sales transactions for both batch and streaming queries.

The PySpark script, 01_seed_sales_kafka.py, and the seed data file, sales_seed.csv, are both read from Amazon S3 by Spark, running on Amazon EMR. The location of the Amazon S3 bucket name and the Amazon MSK’s broker list values are pulled from AWS SSM Parameter Store using the parameters created earlier. The Kafka topic that stores the sales data, pagila.sales.spark.streaming, is created automatically by the script the first time it runs.

# Purpose: Batch write initial sales data from S3 to a new Kafka topic
# Author: Gary A. Stafford
# Date: 2021-09-22
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType
from pyspark.sql.window import Window
sales_data = "sales_seed.csv"
topic_output = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-seed-sales") \
.getOrCreate()
df_sales = read_from_csv(spark, params)
write_to_kafka(params, df_sales)
def read_from_csv(spark, params):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{sales_data}",
schema=schema, header=True, sep="|")
df_sales = update_payment_date(df_sales)
return df_sales
def write_to_kafka(params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
df_sales \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
def update_payment_date(df):
"""Update existing payment date to a current timestamp for streaming simulation"""
record_count = 250
window = Window.orderBy("payment_id")
df = df \
.drop("payment_date") \
.withColumn("index", F.row_number().over(window)) \
.withColumn("payment_date",
(F.unix_timestamp(F.current_timestamp()) –
(record_count – F.col("index"))).cast(IntegerType())) \
.drop("index")
return df
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Update the two environment variables, then submit your first Spark job as an Amazon EMR Step using the AWS CLI and the emr API:

export CLUSTER_ID="<your-cluster-id>"
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-seed-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/01_seed_sales_kafka.py]"""
Successfully adding a Step (Spark job) to the Amazon EMR cluster

From the Amazon EMR console, we should observe the Spark job has been completed successfully in about 30–90 seconds.

Amazon EMR Step (Spark job) completed successfully

The Kafka Consumer API allows applications to read streams of data from topics in the Kafka cluster. Using the Kafka Consumer API, from within a Kubernetes container running on Amazon EKS or an EC2 instance, we can observe that the new Kafka topic has been successfully created and that messages (initial sales data) have been published to the new Kafka topic.

export BBROKERS="b-1.your-cluster.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.kafka.us-east-1.amazonaws.com:9098, ..."
bin/kafka-console-consumer.sh \
--topic pagila.sales.spark.streaming \
--from-beginning \
--property print.key=true \
--property print.value=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.timestamp=true \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
Initial sales data as messages in Kafka topic

PySpark Job 2: Batch Query of Amazon MSK Topic

The PySpark script, 02_batch_read_kafka.py, performs a batch query of the initial 250 messages in the Kafka topic. When run, the PySpark script parses the JSON-format messages, then aggregates the data by both total sales and order count, by country, and finally, sorts by total sales.

window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.where(F.col("row") == 1).drop("row") \

The results are written to both the console as stdout and to Amazon S3 in CSV format.

# Purpose: Batch read Kafka topic, aggregate sales and orders by country,
# and output to console and Amazon S3 as CSV
# Author: Gary A. Stafford
# Date: 2021-09-22
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
topic_input = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-batch-sales") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(params, df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(params, df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_output = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.where(F.col("row") == 1).drop("row") \
.select("country", (F.format_number(F.col("sales"), 2)).alias("sales"), "orders") \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
df_output \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
df_output \
.write \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark_output/sales_by_country",
header=True, sep="|") \
.mode("overwrite")
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Again, submit this job as an Amazon EMR Step using the AWS CLI and the emr API:

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-batch-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/02_batch_read_kafka.py]"""

To view the console output, click on ‘View logs’ in the Amazon EMR console, then click on the stdout logfile, as shown below.

Logs from successful Amazon EMR Step (Spark job)

The stdout logfile should contain the top 25 total sales and order counts, by country, based on the initial 250 sales records.

+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+------------------+------+------+
only showing top 25 rows

The PySpark script also wrote the same results to Amazon S3 in CSV format.

CSV file written to Amazon S3 as a result of the Spark job

The total sales and order count for 69 countries were computed, sorted, and coalesced into a single CSV file.

country sales orders
India 138.80 20
China 133.80 20
Mexico 106.86 14
Japan 100.86 14
Brazil 96.87 13
Russian Federation 94.87 13
United States 92.86 14
Nigeria 58.93 7
Philippines 58.92 8
South Africa 46.94 6
Argentina 42.93 7
Germany 39.96 4
Indonesia 38.95 5
Italy 35.95 5
Iran 33.95 5
South Korea 33.94 6
Poland 30.97 3
Pakistan 25.97 3
Taiwan 25.96 4
Mozambique 23.97 3
Vietnam 23.96 4
Ukraine 23.96 4
Venezuela 22.97 3
France 20.98 2
Peru 19.98 2

Streaming Queries

To demonstrate streaming queries with Spark Structured Streaming, we will use a combination of two PySpark scripts. The first script, 03_streaming_read_kafka_console.py, will perform a streaming query and computation of messages in the Kafka topic, aggregating the total sales and number of orders. Concurrently, the second PySpark script, 04_incremental_sales_kafka.py, will read additional Pagila sales data from a CSV file located on Amazon S3 and write messages to the Kafka topic at a rate of two messages per second. The first script, 03_streaming_read_kafka_console.py, will stream aggregations in micro-batches of one-minute increments to the console. Spark Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small, batch jobs.

Note that this first script performs grouped aggregations as opposed to aggregations over a sliding event-time window. The aggregated results represent the total, all-time sales at a point in time, based on all the messages currently in the topic when the micro-batch was computed.

To follow along with this part of the demonstration, you can run the two Spark jobs as concurrent steps on the existing Amazon EMR cluster, or create a second EMR cluster, identically configured to the existing cluster, to run the second PySpark script, 04_incremental_sales_kafka.py. Using a second cluster, you can use a minimally-sized single master node cluster with no core nodes to save cost.

PySpark Job 3: Streaming Query to Console

The first PySpark scripts, 03_streaming_read_kafka_console.py, performs a streaming query of messages in the Kafka topic. The script then aggregates the data by both total sales and order count, by country, and finally, sorts by total sales.

.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select("country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \

The results are streamed to the console using the processingTime trigger parameter. A trigger defines how often a streaming query should be executed and emit new data. The processingTime parameter sets a trigger that runs a micro-batch query periodically based on the processing time (e.g. ‘5 minutes’ or ‘1 hour’). The trigger is currently set to a minimal processing time of one minute for ease of demonstration.

.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 25) \
# Purpose: Streaming read from Kafka topic and summarize top 25
# all-time total sales by country to the console every minute
# Author: Gary A. Stafford
# Date: 2021-09-08
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType, TimestampType
topic_input = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales-console") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select("country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \
.coalesce(1) \
.writeStream \
.queryName("streaming_to_console") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

For demonstration purposes, we will run the Spark job directly from the master node of the EMR Cluster. This method will allow us to easily view the micro-batches and associated logs events as they are output to the console. The console is normally used for testing purposes. Submitting the PySpark script from the cluster’s master node is an alternative to submitting an Amazon EMR Step. Connect to the master node of the Amazon EMR cluster using SSH, as the hadoop user:

export EMR_MASTER=<your-emr-master-dns.compute-1.amazonaws.com>
export EMR_KEY_PATH=path/to/key/<your-ssk-key.pem>
ssh -i ${EMR_KEY_PATH} hadoop@${EMR_MASTER}

Submit the PySpark script, 03_streaming_read_kafka_console.py, to Spark:

export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
spark-submit s3a://${SPARK_BUCKET}/spark/03_streaming_read_kafka_console.py

Before running the second PySpark script, 04_incremental_sales_kafka.py, let the first script run long enough to pick up the existing sales data in the Kafka topic. Within about two minutes, you should see the first micro-batch of aggregated sales results, labeled ‘Batch: 0’ output to the console. This initial micro-batch should contain the aggregated results of the existing 250 messages from Kafka. The streaming query’s first micro-batch results should be identical to the previous batch query results.

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+------------------+------+------+
only showing top 25 rows

Immediately below the batch output, there will be a log entry containing information about the batch. In the log entry snippet below, note the starting and ending offsets of the topic for the Spark job’s Kafka consumer group, 0 (null) to 250, representing the initial sales data.

{
"id" : "e0168615-dd39-4025-9811-c001a324ed58",
"runId" : "ed76fe07-032c-42ab-881c-57b44f561a29",
"name" : "streaming_to_console",
"timestamp" : "2021-09-08T17:37:58.116Z",
"batchId" : 0,
"numInputRows" : 250,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 14.104372355430183,
"durationMs" : {
"addBatch" : 12298,
"getBatch" : 39,
"latestOffset" : 4710,
"queryPlanning" : 542,
"triggerExecution" : 17724,
"walCommit" : 33
},
"stateOperators" : [ {
"numRowsTotal" : 136,
"numRowsUpdated" : 136,
"memoryUsedBytes" : 119008,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 61408
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[pagila.sales.spark.streaming]]",
"startOffset" : null,
"endOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 250
}
},
"numInputRows" : 250,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 14.104372355430183
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@13b8bba3",
"numOutputRows" : 68
}
}

PySpark Job 4: Incremental Sales Data

As described earlier, the second PySpark script, 04_incremental_sales_kafka.py, reads 1,800 additional sales records from a second CSV file located on Amazon S3, sales_incremental_large.csv. The script then publishes messages to the Kafka topic at a deliberately throttled rate of two messages per second. Concurrently, the first PySpark job, still running and performing a streaming query, will consume the new Kafka messages and stream aggregated total sales and orders in micro-batches of one-minute increments to the console over a period of about 15 minutes.

# Purpose: Batch write incremental sales data from S3 to a new Kafka topic
# Use a delay between each message to simulate real-time streaming data
# Author: Gary A. Stafford
# Date: 2021-09-26
import os
import time
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType
sales_data = "sales_incremental_large.csv"
topic_output = "pagila.sales.spark.streaming"
time_between_messages = 0.5 # 1800 messages * .5 seconds = ~15 minutes
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-incremental-sales") \
.getOrCreate()
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = read_from_csv(spark, params, schema)
df_sales.cache()
write_to_kafka(spark, params, df_sales)
def read_from_csv(spark, params, schema):
df_sales = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{sales_data}",
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp()) \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
df_message.show(1)
time.sleep(time_between_messages)
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Submit the second PySpark script as a concurrent Amazon EMR Step to the first EMR cluster, or submit as a step to the second Amazon EMR cluster.

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-incremental-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/04_incremental_sales_kafka.py]"""

The job sends a total of 1,800 messages to Kafka at a rate of two messages per second for 15 minutes. The total runtime of the job should be approximately 19 minutes, given a few minutes for startup and shutdown. Why run for so long? We want to make sure the job’s runtime will span multiple, overlapping, sliding event-time windows.

After about two minutes, return to the terminal output of the first Spark job, 03_streaming_read_kafka_console.py, running on the master node of the first cluster. As long as new messages are consumed every minute, you should see a new micro-batch of aggregated sales results stream to the console. Below we see an example of Batch 3, which reflects additional sales compared to Batch 0, shown previously. The results reflect the current all-time sales by country in real-time as the sales are published to Kafka.

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|China |473.35|65 |
|India |393.44|56 |
|Japan |292.60|40 |
|Mexico |262.64|36 |
|United States |252.65|35 |
|Russian Federation|243.65|35 |
|Brazil |220.69|31 |
|Philippines |191.75|25 |
|Indonesia |142.81|19 |
|South Africa |110.85|15 |
|Nigeria |108.86|14 |
|Argentina |89.86 |14 |
|Germany |85.89 |11 |
|Israel |68.90 |10 |
|Ukraine |65.92 |8 |
|Turkey |58.91 |9 |
|Iran |58.91 |9 |
|Saudi Arabia |56.93 |7 |
|Poland |50.94 |6 |
|Pakistan |50.93 |7 |
|Italy |48.93 |7 |
|French Polynesia |47.94 |6 |
|Peru |45.95 |5 |
|United Kingdom |45.94 |6 |
|Colombia |44.94 |6 |
+------------------+------+------+
only showing top 25 rows

If we fast forward to a later micro-batch, sometime after the second incremental sales job is completed, we should see the top 25 aggregated sales by country of 2,050 messages — 250 seed plus 1,800 incremental messages.

-------------------------------------------
Batch: 20
-------------------------------------------
+------------------+--------+------+
|country |sales |orders|
+------------------+--------+------+
|China |1,379.05|195 |
|India |1,338.10|190 |
|United States |915.69 |131 |
|Mexico |855.80 |120 |
|Japan |831.88 |112 |
|Russian Federation|723.95 |105 |
|Brazil |613.12 |88 |
|Philippines |528.27 |73 |
|Indonesia |381.46 |54 |
|Turkey |350.52 |48 |
|Argentina |298.57 |43 |
|Nigeria |294.61 |39 |
|South Africa |279.61 |39 |
|Taiwan |221.67 |33 |
|Germany |199.73 |27 |
|United Kingdom |196.75 |25 |
|Poland |182.77 |23 |
|Spain |170.77 |23 |
|Ukraine |160.79 |21 |
|Iran |160.76 |24 |
|Italy |156.79 |21 |
|Pakistan |152.78 |22 |
|Saudi Arabia |146.81 |19 |
|Venezuela |145.79 |21 |
|Colombia |144.78 |22 |
+------------------+--------+------+
only showing top 25 rows

Compare the informational output below for Batch 20 to Batch 0, previously. Note the starting offset of the Kafka consumer group on the topic is 1986, and the ending offset is 2050. This is because all messages have been consumed from the topic and aggregated. If additional messages were streamed to Kafka while the streaming job is still running, additional micro-batches would continue to be streamed to the console every one minute.

"sources" : [ {
"description" : "KafkaV2[Subscribe[pagila.sales.spark.streaming]]",
"startOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 1986
}
},
"endOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 2050
}
},
"numInputRows" : 64,
"inputRowsPerSecond" : 1.0666666666666667,
"processedRowsPerSecond" : 13.772326231977619
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@13b8bba3",
"numOutputRows" : 105
}

PySpark Job 5: Aggregations over Sliding Event-time Window

In the previous example, we analyzed total all-time sales in real-time (e.g., show me the current, total, all-time sales for France compared to the rest of Europe, at regular intervals). This approach is opposed to sales made during a sliding event-time window (e.g., are the total sales for the United States trending better during this current four-hour marketing promotional period than the previous promotional period). In many cases, real-time sales during a distinct period or event window is probably a more commonly tracked KPI than total all-time sales.

If we add a sliding event-time window to the PySpark script, we can easily observe the total sales and order counts made during the sliding event-time window in real-time.

.withWatermark("timestamp", "10 minutes") \
.groupBy("country",
F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("window").desc(),
F.col("sum(amount)").desc()) \

Windowed totals would not include sales (messages) present in the Kafka topic before the streaming query beginning, nor in previous sliding windows. Constructing the correct query always starts with a clear understanding of the question you are trying to answer.

Below, in the abridged console output of the micro-batch from the script, 05_streaming_read_kafka_console_window.py, we see the results of three ten-minute sliding event-time windows with a five-minute overlap. The sales and order totals represent the volume sold during that window, with this micro-batch falling within the active current window, 19:30 to 19:40 UTC.

——————————————-
Batch: 14
——————————————-
+————————————-+——+——+——————-+——————-+
|country |sales |orders|start |end |
+————————————-+——+——+——————-+——————-+
|India |286.60|40 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|China |285.61|39 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|United States |205.69|31 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Japan |189.74|26 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Russian Federation |182.74|26 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Philippines |163.77|23 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Mexico |159.76|24 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Brazil |155.77|23 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Argentina |118.84|16 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Indonesia |82.88 |12 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|India |600.13|87 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|China |509.27|73 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|United States |416.42|58 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Japan |329.56|44 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Mexico |311.54|46 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Russian Federation |301.55|45 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Brazil |256.64|36 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Philippines |219.67|33 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Turkey |171.76|24 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Argentina |159.78|22 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|China |353.53|47 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|India |264.62|38 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Japan |191.74|26 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|United States |173.77|23 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Mexico |159.77|23 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Russian Federation |148.78|22 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Philippines |132.83|17 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Brazil |123.82|18 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Indonesia |103.86|14 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|South Africa |63.91 |9 |2021-09-08 19:20:00|2021-09-08 19:30:00|
+————————————-+——+——+——————-+——————-+

Plotting the total sales over time using sliding event-time windows, we will observe the results do not reflect a running total. Total sales only accumulate within a sliding window.

Cumulative sales within a 5-minute sliding event-time windows

Compare these results to the results of the previous script, whose total sales reflect a running total.

Running total of sales (no sliding windows)

PySpark Job 6: Streaming Query from/to Amazon MSK

The PySpark script, 06_streaming_read_kafka_kafka.py, performs the same streaming query and grouped aggregation as the previous script, 03_streaming_read_kafka_console.py. However, instead of outputting results to the console, the results of this job will be written to a new Kafka topic on Amazon MSK.

.format("kafka") \
.options(**options_write) \
.option("checkpointLocation", "/checkpoint/kafka/") \
# Purpose: Streaming read from Kafka topic and aggregate
# sales and orders by country to Kafka every minute
# Author: Gary A. Stafford
# Date: 2021-09-08
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType,
StringType, FloatType, TimestampType
topic_input = "pagila.sales.spark.streaming.in"
topic_output = "pagila.sales.spark.streaming.out"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales-kafka") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(params, df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select(F.sha1("country").alias("id"),
"country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \
.coalesce(1) \
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.trigger(processingTime="1 minute") \
.queryName("streaming_to_kafka") \
.outputMode("complete") \
.format("kafka") \
.options(**options_write) \
.option("checkpointLocation", "/checkpoint/kafka/") \
.start()
ds_sales.awaitTermination()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Repeat the same process used with the previous script. Re-run the seed data script, 01_seed_sales_kafka.py, but update the input topic to a new name, such as pagila.sales.spark.streaming.in. Next, run the new script, 06_streaming_read_kafka_kafka.py. Give the script time to start and consume the 250 seed messages from Kafka. Then, update the input topic name and re-run the incremental data PySpark script, 04_incremental_sales_kafka.py, concurrent to the new script on the same cluster or run on the second cluster.

When run, the script, 06_streaming_read_kafka_kafka.py, will continuously consume messages from the new pagila.sales.spark.streaming.in topic and publish grouped aggregation results to a new topic, pagila.sales.spark.streaming.out.

Use the Kafka Consumer API to view new messages as the Spark job publishes them in near real-time to Kafka.

export BBROKERS="b-1.your-cluster.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.kafka.us-east-1.amazonaws.com:9098, ..."
bin/kafka-console-consumer.sh \
--topic pagila.sales.spark.streaming.out \
--from-beginning \
--property print.key=true \
--property print.value=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.timestamp=true \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
Aggregated sales results (messages) being published to Kafka by streaming Spark job

PySpark Job 7: Batch Query of Streaming Results from MSK

When run, the previous script produces Kafka messages containing non-windowed sales aggregations to the Kafka topic every minute. Using the next PySpark script, 07_batch_read_kafka.py, we can consume those aggregated messages using a batch query and display the most recent sales totals to the console. Each country’s most recent all-time sales totals and order counts should be identical to the previous script’s results, representing the aggregation of all 2,050 Kafka messages — 250 seed plus 1,800 incremental messages.

To get the latest total sales by country, we will consume all the messages from the output topic, group the results by country, find the maximum (max) value from the sales column for each country, and finally, display the results sorted sales in descending order.

window = Window.partitionBy("country") \
.orderBy(F.col("timestamp").desc())
.withColumn("row", F.row_number().over(window)) \
.where(F.col("row") == 1).drop("row") \
.select("country", "sales", "orders") \
# Purpose: Batch read Kafka output topic and display
# top 25 total sales by country to console
# Author: Gary A. Stafford
# Date: 2021-09-09
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, TimestampType
from pyspark.sql.window import Window
topic_input = "pagila.sales.spark.streaming.out"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-batch-sales") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(df_sales)
def read_from_kafka(spark, params):
schema = StructType([
StructField("country", StringType(), False),
StructField("sales", StringType(), False),
StructField("orders", IntegerType(), False),
StructField("start", TimestampType(), False),
StructField("end", TimestampType(), True),
])
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
window = Window.partitionBy("country").orderBy(F.col("timestamp").desc())
df_sales = spark.read \
.format("kafka") \
.options(**options_read) \
.load() \
.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
.select("data.*", "timestamp") \
.withColumn("row", F.row_number().over(window)) \
.where(F.col("row") == 1).drop("row") \
.select("country", "sales", "orders") \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
return df_sales
def summarize_sales(df_sales):
df_sales \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Writing the top 25 results to the console, we should see the same results as we saw in the final micro-batch (Batch 20, shown above) of the PySpark script, 03_streaming_read_kafka_console.py.

+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|India |948.63|190 |
|China |936.67|195 |
|United States |915.69|131 |
|Mexico |855.80|120 |
|Japan |831.88|112 |
|Russian Federation|723.95|105 |
|Brazil |613.12|88 |
|Philippines |528.27|73 |
|Indonesia |381.46|54 |
|Turkey |350.52|48 |
|Argentina |298.57|43 |
|Nigeria |294.61|39 |
|South Africa |279.61|39 |
|Taiwan |221.67|33 |
|Germany |199.73|27 |
|United Kingdom |196.75|25 |
|Poland |182.77|23 |
|Spain |170.77|23 |
|Ukraine |160.79|21 |
|Iran |160.76|24 |
|Italy |156.79|21 |
|Pakistan |152.78|22 |
|Saudi Arabia |146.81|19 |
|Venezuela |145.79|21 |
|Colombia |144.78|22 |
+------------------+------+------+
only showing top 25 rows

PySpark Job 8: Streaming Query with Static Join and Sliding Window

The PySpark script, 08_streaming_read_kafka_join_window.py, performs the same streaming query and computations over sliding event-time windows as the previous script, 05_streaming_read_kafka_console_window.py. However, instead of totaling sales and orders by country, the script totals by sales and orders sales region. A sales region is composed of multiple countries in the same geographical area. The PySpark script reads in a static list of sales regions and countries from Amazon S3, sales_regions.csv.

country region
Afghanistan Asia & Pacific
Aland Islands Europe
Albania Europe
Algeria Arab States
American Samoa Asia & Pacific
Andorra Europe
Angola Africa
Anguilla Latin America
Antarctica Asia & Pacific

The script then performs a join operation between the results of the streaming query and the static list of regions, joining on country. Using the join, the streaming sales data from Kafka is enriched with the sales category. Any sales record whose country does not have an assigned sales region is categorized as ‘Unassigned.’

.join(df_regions, on=["country"], how="leftOuter") \
.na.fill("Unassigned") \

Sales and orders are then aggregated by sales region, and the top 25 are output to the console every minute.

# Purpose: Streaming read from Kafka topic, join with static data,
# and aggregate in windows by sales region to the console every minute
# Show 24 = 8 regions x 3 windows
# Author: Gary A. Stafford
# Date: 2021-09-08
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
topic_input = "pagila.sales.spark.streaming.region.53"
regions_data = "sales_regions.csv"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales-join") \
.getOrCreate()
df_regions = read_from_csv(spark, params)
df_regions.cache()
df_sales = read_from_kafka(spark, params)
summarize_sales(df_sales, df_regions)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def read_from_csv(spark, params):
schema = StructType([
StructField("country", StringType(), False),
StructField("region", StringType(), False)
])
df_sales = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{regions_data}",
schema=schema, header=True, sep=",")
return df_sales
def summarize_sales(df_sales, df_regions):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
.select("data.*", "timestamp") \
.join(df_regions, on=["country"], how="leftOuter") \
.na.fill("Unassigned") \
.withWatermark("timestamp", "10 minutes") \
.groupBy("region", F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("window").desc(), F.col("sum(amount)").desc()) \
.select(F.col("region").alias("sales_region"),
F.format_number(F.col("sum(amount)"), 2).alias("sales"),
F.col("count(amount)").alias("orders"),
F.from_unixtime("window_start", format="yyyy-MM-dd HH:mm").alias("window_start"),
F.from_unixtime("window_end", format="yyyy-MM-dd HH:mm").alias("window_end")) \
.coalesce(1) \
.writeStream \
.queryName("streaming_regional_sales") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 24) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

To run the job, repeat the previous process of renaming the topic (e.g., pagila.sales.spark.streaming.region), then running the initial sales data job, this script, and finally, concurrent with this script, the incremental sales data job. Below, we see a later micro-batch output to the console from the Spark job. We see three sets of sales results, by sales region, from three different ten-minute sliding event-time windows with a five-minute overlap.

——————————————-
Batch: 20
——————————————-
+————–+——–+——+—————-+—————-+
|sales_region |sales |orders|start |end |
+————–+——–+——+—————-+—————-+
|Asia & Pacific|936.66 |134 |2021-09-08 21:35|2021-09-08 21:45|
|Europe |537.28 |72 |2021-09-08 21:35|2021-09-08 21:45|
|Latin America |399.41 |59 |2021-09-08 21:35|2021-09-08 21:45|
|North America |176.72 |28 |2021-09-08 21:35|2021-09-08 21:45|
|Middle east |101.85 |15 |2021-09-08 21:35|2021-09-08 21:45|
|Africa |99.86 |14 |2021-09-08 21:35|2021-09-08 21:45|
|Unassigned |50.92 |8 |2021-09-08 21:35|2021-09-08 21:45|
|Arab States |36.96 |4 |2021-09-08 21:35|2021-09-08 21:45|
+————–+——–+——+—————-+—————-+
|Asia & Pacific|2,271.78|322 |2021-09-08 21:30|2021-09-08 21:40|
|Europe |1,199.38|162 |2021-09-08 21:30|2021-09-08 21:40|
|Latin America |1,122.40|160 |2021-09-08 21:30|2021-09-08 21:40|
|North America |390.38 |62 |2021-09-08 21:30|2021-09-08 21:40|
|Africa |325.54 |46 |2021-09-08 21:30|2021-09-08 21:40|
|Middle east |212.69 |31 |2021-09-08 21:30|2021-09-08 21:40|
|Unassigned |118.83 |17 |2021-09-08 21:30|2021-09-08 21:40|
|Arab States |82.89 |11 |2021-09-08 21:30|2021-09-08 21:40|
+————–+——–+——+—————-+—————-+
|Asia & Pacific|2,667.23|377 |2021-09-08 21:25|2021-09-08 21:35|
|Europe |1,416.03|197 |2021-09-08 21:25|2021-09-08 21:35|
|Latin America |1,197.28|172 |2021-09-08 21:25|2021-09-08 21:35|
|Africa |475.35 |65 |2021-09-08 21:25|2021-09-08 21:35|
|North America |435.37 |63 |2021-09-08 21:25|2021-09-08 21:35|
|Middle east |272.62 |38 |2021-09-08 21:25|2021-09-08 21:35|
|Unassigned |172.75 |25 |2021-09-08 21:25|2021-09-08 21:35|
|Arab States |127.83 |17 |2021-09-08 21:25|2021-09-08 21:35|
+————–+——–+——+—————-+—————-+

PySpark Script 9: Static Join with Grouped Aggregations

As a comparison, we can exclude the sliding event-time window operations from the previous streaming query script, 08_streaming_read_kafka_join_window.py, to obtain the current, total, all-time sales by sales region. See the script, 09_streaming_read_kafka_join.py, in the project repository for details.

-------------------------------------------
Batch: 20
-------------------------------------------
+--------------+--------+------+
|sales_region |sales |orders|
+--------------+--------+------+
|Asia & Pacific|5,780.88|812 |
|Europe |3,081.74|426 |
|Latin America |2,545.34|366 |
|Africa |1,029.59|141 |
|North America |997.57 |143 |
|Middle east |541.23 |77 |
|Unassigned |352.47 |53 |
|Arab States |244.68 |32 |
+--------------+--------+------+

Conclusion

In this post, we learned how to get started with Spark Structured Streaming on Amazon EMR. First, we explored how to run jobs written in Python with PySpark on Amazon EMR as Steps and directly from the EMR cluster’s master node. Next, we discovered how to produce and consume messages with Apache Kafka on Amazon MSK, using batch and streaming queries. Finally, we learned about aggregations over a sliding event-time window compared to grouped aggregations and how Structured Streaming queries are processed using a micro-batch.

In a subsequent post, we will learn how to use Apache Avro and the Apicurio Registry with PySpark on Amazon EMR to read and write Apache Avro format messages to Amazon MSK.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , , , , , ,

  1. Leave a comment

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.