Posts Tagged Amazon EMR Serverless

Serverless Analytics on AWS: Getting Started with Amazon EMR Serverless and Amazon MSK Serverless

Utilizing the recently released Amazon EMR Serverless and Amazon MSK Serverless for batch and streaming analytics with Apache Spark and Apache Kafka

Introduction

Amazon EMR Serverless

AWS recently announced the general availability (GA) of Amazon EMR Serverless on June 1, 2022. EMR Serverless is a new serverless deployment option in Amazon EMR, in addition to EMR on EC2EMR on EKS, and EMR on AWS Outposts. EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. According to AWS, with EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks.

Amazon MSK Serverless

Similarly, on April 28, 2022, AWS announced the general availability of Amazon MSK Serverless. According to AWS, Amazon MSK Serverless is a cluster type for Amazon MSK that makes it easy to run Apache Kafka without managing and scaling cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources, so you can use Apache Kafka on demand and only pay for the data you stream and retain.

Serverless Analytics

In the following post, we will learn how to use these two new, powerful, cost-effective, and easy-to-operate serverless technologies to perform batch and streaming analytics. The PySpark examples used in this post are similar to those featured in two earlier posts, which featured non-serverless alternatives Amazon EMR on EC2 and Amazon MSK: Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR and Stream Processing with Apache Spark, Kafka, Avro, and Apicurio Registry on AWS using Amazon MSK and EMR.

Source Code

All the source code demonstrated in this post is open-source and available on GitHub.

git clone --depth 1 -b main \
https://github.com/garystafford/emr-msk-serverless-demo.git

Architecture

The post’s high-level architecture consists of an Amazon EMR Serverless Application, Amazon MSK Serverless Cluster, and Amazon EC2 Kafka client instance. To support these three resources, we will need two Amazon Virtual Private Clouds (VPCs), a minimum of three subnets, an AWS Internet Gateway (IGW) or equivalent, an Amazon S3 Bucket, multiple AWS Identity and Access Management (IAM) Roles and Policies, Security Groups, and Route Tables, and a VPC Gateway Endpoint for S3. All resources are constrained to a single AWS account and a single AWS Region, us-east-1.

High-level AWS serverless analytics architecture used in this post

Prerequisites

As a prerequisite for this post, you will need to create the following resources:

  1. (1) Amazon EMR Serverless Application;
  2. (1) Amazon MSK Serverless Cluster;
  3. (1) Amazon S3 Bucket;
  4. (1) VPC Endpoint for S3;
  5. (3) Apache Kafka topics;
  6. PySpark applications, related JAR dependencies, and sample data files uploaded to Amazon S3 Bucket;

Let’s walk through each of these prerequisites.

Amazon EMR Serverless Application

Before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon EMR Serverless, especially, What is Amazon EMR Serverless? Create a new EMR Serverless Application by following the AWS documentation, Getting started with Amazon EMR Serverless. The creation of the EMR Serverless Application includes the following resources:

  1. Amazon S3 bucket for storage of Spark resources;
  2. Amazon VPC with at least two private subnets and associated Security Group(s);
  3. EMR Serverless runtime AWS IAM Role and associated IAM Policy;
  4. Amazon EMR Serverless Application;

For this post, use the latest version of EMR available in the EMR Studio Serverless Application console, the newly released version 6.7.0, to create a Spark application.

EMR Studio Serverless Application creation console

Keep the default pre-initialized capacity, application limits, and application behavior settings.

EMR Studio Serverless Application creation console

Since we are connecting to MSK Serverless from EMR Serverless, we need to configure VPC access. Select the new VPC and at least two private subnets in different Availability Zones (AZs).

EMR Studio Serverless Application creation console

According to the documentation, the subnets selected for EMR Serverless must be private subnets. The associated route tables for the subnets should not contain direct routes to the Internet.

Error resulting from trying to associate a public subnet with EMR Serverless
EMR Studio Serverless Application details console showing new Application

Amazon MSK Serverless Cluster

Similarly, before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon MSK Serverless, especially MSK Serverless. Create a new MSK Serverless Cluster by following the AWS documentation, Getting started using MSK Serverless clusters. The creation of the MSK Serverless Cluster includes the following resources:

  1. AWS IAM Role and associated IAM Policy for the Amazon EC2 Kafka client instance;
  2. VPC with at least one public subnet and associated Security Group(s);
  3. Amazon EC2 instance used as Apache Kafka client, provisioned in the public subnet of the above VPC;
  4. Amazon MSK Serverless Cluster;
Amazon MSK Serverless Create cluster console

Associate the new MSK Serverless Cluster with the EMR Serverless Application’s VPC and two private subnets. Also, associate the cluster with the EC2-based Kafka client instance’s VPC and its public subnet.

Amazon MSK Serverless Create cluster console — VPC 1
Amazon MSK Serverless Create cluster console — VPC 2

According to the AWS documentation, Amazon MSK does not support all AZs. For example, I tried to use a subnet in us-east-1e threw an error. If this happens, choose an alternative AZ.

Error resulting from using an unsupported AZ
Successfully created Amazon MSK Serverless Cluster

VPC Endpoint for S3

To access the Spark resource in Amazon S3 from EMR Serverless running in the two private subnets, we need a VPC Endpoint for S3. Specifically, a Gateway Endpoint, which sends traffic to Amazon S3 or DynamoDB using private IP addresses. A gateway endpoint for Amazon S3 enables you to use private IP addresses to access Amazon S3 without exposure to the public Internet. EMR Serverless does not require public IP addresses, and you don’t need an internet gateway (IGW), a NAT device, or a virtual private gateway in your VPC to connect to S3.

VPC Endpoint for S3 associated with route table for private subnets

Create the VPC Endpoint for S3 (Gateway Endpoint) and add the route table for the two EMR Serverless private subnets. You can add additional routes to that route table, such as VPC peering connections to data sources such as Amazon Redshift or Amazon RDS. However, do not add routes that provide direct Internet access.

Route table for private subnets showing VPC Endpoint to S3 route (first route shown)

Kafka Topics and Sample Messages

Once the MSK Serverless Cluster and EC2-based Kafka client instance are provisioned and running, create the three required Kafka topics using the EC2-based Kafka client instance. I recommend using AWS Systems Manager Session Manager to connect to the client instance as the ec2-user user. Session Manager provides secure and auditable node management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys. Alternatively, you can SSH into the client instance.

Before creating the topics, use a utility like telnet to confirm connectivity between the Kafka client and the MSK Serverless Cluster. Verifying connectivity will save you a lot of frustration with potential security and networking issues.

sudo yum install telnet -y
telnet <your_bootstrap_server_host> 9098
# > Trying 192.168.XX.XX…
# > Connected to boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com.
# > Escape character is '^]'.

With MSK Serverless Cluster connectivity confirmed, create the three Kafka topics: topicAtopicB, and topicC. I am using the default partitioning and replication settings from the AWS Getting Started Tutorial.

cd kafka_2.12-2.8.1
# *** CHANGE ME ***
export BOOTSTRAP_SERVER=<your_bootstrap_server> # e.g., boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com:9098
bin/kafka-topics.sh –create –topic topicA \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicB \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicC \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
# list topics to confirm creation
bin/kafka-topics.sh –list \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties

To create some quick sample data, we will copy and paste 250 messages from a file included in the GitHub project, sample_data/sales_messages.txt, into topicA. The messages are simple mock sales transactions.

{"payment_id":16940,"customer_id":130,"amount":5.99,"payment_date":"2021-05-08 21:21:56.996577 +00:00","city":"guas Lindas de Gois","district":"Gois","country":"Brazil"}
{"payment_id":16406,"customer_id":459,"amount":5.99,"payment_date":"2021-05-08 21:22:59.996577 +00:00","city":"Qomsheh","district":"Esfahan","country":"Iran"}
{"payment_id":16315,"customer_id":408,"amount":6.99,"payment_date":"2021-05-08 21:32:05.996577 +00:00","city":"Jaffna","district":"Northern","country":"Sri Lanka"}
{"payment_id":16185,"customer_id":333,"amount":7.99,"payment_date":"2021-05-08 21:33:07.996577 +00:00","city":"Baku","district":"Baki","country":"Azerbaijan"}
{"payment_id":17097,"customer_id":222,"amount":9.99,"payment_date":"2021-05-08 21:33:47.996577 +00:00","city":"Jaroslavl","district":"Jaroslavl","country":"Russian Federation"}
{"payment_id":16579,"customer_id":549,"amount":3.99,"payment_date":"2021-05-08 21:36:33.996577 +00:00","city":"Santiago de Compostela","district":"Galicia","country":"Spain"}
{"payment_id":16050,"customer_id":269,"amount":4.99,"payment_date":"2021-05-08 21:40:19.996577 +00:00","city":"Salinas","district":"California","country":"United States"}
{"payment_id":17126,"customer_id":239,"amount":7.99,"payment_date":"2021-05-08 22:00:12.996577 +00:00","city":"Ciomas","district":"West Java","country":"Indonesia"}
{"payment_id":16933,"customer_id":126,"amount":7.99,"payment_date":"2021-05-08 22:29:06.996577 +00:00","city":"Po","district":"So Paulo","country":"Brazil"}
{"payment_id":16297,"customer_id":399,"amount":8.99,"payment_date":"2021-05-08 22:30:47.996577 +00:00","city":"Okara","district":"Punjab","country":"Pakistan"}

Use the kafka-console-producer Shell script to publish the messages to the Kafka topic. Use the kafka-console-consumer Shell script to validate the messages made it to the topic by consuming a few messages.

bin/kafka-console-producer.sh \
–topic topicA \
–bootstrap-server $BOOTSTRAP_SERVER \
–producer.config config/client.properties
# copy and paste contents of 'sales_messages.txt' and then Ctrl+C to exit
# check for messages in topic
bin/kafka-console-consumer.sh \
–topic topicA \
–from-beginning –max-messages 5 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVER \
–consumer.config config/client.properties

The output should look similar to the following example.

Sample message output from Kafka topic

Spark Resources in Amazon S3

To submit and run the five Spark Jobs included in the project, you will need to copy the following resources to your Amazon S3 bucket: (5) Apache Spark jobs, (5) related JAR dependencies, and (2) sample data files.

PySpark Applications
To start, copy the five PySpark applications to a scripts/ subdirectory within your Amazon S3 bucket.

PySpark applications uploaded to the Amazon S3 bucket

Sample Data
Next, copy the two sample data files to a sample_data/ subdirectory within your Amazon S3 bucket. The large file contains 2,000 messages, while the small file contains 600 messages. These two files can be used interchangeably with the post’s final streaming example.

Sample sales data uploaded to the Amazon S3 bucket

PySpark Dependencies
Lastly, the PySpark applications have a handful of JAR dependencies that must be available when the job runs, which are not on the EMR Serverless classpath by default. If you are unsure which JARs are already on the EMR Serverless classpath, you can check the Spark UI’s Environment tab’s Classpath Entries section. Accessing the Spark UI is demonstrated in the first PySpark application example, below.

Spark UI’s Environment tab showing Classpath Entries

It is critical to choose the correct version of each JAR dependency based on the version of libraries used with the EMR and MSK. Using the wrong version or inconsistent versions, especially Scala, can result in job failures. Specifically, we are targeting Spark 3.2.1 and Scala 2.12 (EMR v6.7.0: Amazon’s Spark 3.2.1, Scala 2.12.15, Amazon Corretto 8 version of OpenJDK), and Apache Kafka 2.8.1 (MSK Serverless: Kafka 2.8.1).

Download the seven JAR files locally, then copy them to a jars/ subdirectory within your Amazon S3 bucket.

Dependency JARs uploaded to the Amazon S3 bucket

PySpark Applications Examples

With the EMR Serverless Application, MSK Serverless Cluster, Kafka topics, and sample data created, and the Spark resources uploaded to Amazon S3, we are ready to explore four different Spark examples.

Example 1: Kafka Batch Aggregation to the Console

The first PySpark application, 01_example_console.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to the console (stdout).

There are no hard-coded values in any of the PySpark application examples. All required environment-specific variables, such as your MSK Serverless bootstrap server (host and port) and Amazon S3 bucket name, will be passed to the running Spark jobs as arguments from the spark-submit command.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to the console (stdout)
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("01-example-console") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your first PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. You will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 01-example-console \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/01_example_console.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see the new Spark job you just submitted in one of several job states.

EMR Studio Serverless Application details console

You can click on the Spark job to get more details. Note the Script arguments and Spark properties passed in from the spark-submit command.

EMR Studio Serverless Application details Job details view

From the Spark job details tab, access the Spark UI, aka Spark Web UI, from a button in the upper right corner of the screen. If you have experience with Spark, you are most likely familiar with the Spark Web UI to monitor and tune Spark jobs.

Spark History Server UI

From the initial screen, the Spark History Server tab, click on the App ID. You can access an enormous amount of Spark-related information about your job and EMR environment from the Spark Web UI.

Spark UI’s Stages tab
Spark UI’s Stages tab showing a Directed acyclic graph (DAG) Visualization
Spark UI’s Environment tab showing environment variables, including versions of Spark, Java, and Scala

The Executors tab will give you access to the Spark job’s output. The output we are most interested in is the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI ‘s Executors tab

The stderr contains output related to the running Spark job. Below we see an example of Kafka consumer configuration values output to stderr. Several of these values were passed in from the Spark job, including items such as kafka.bootstrap.serverssecurity.protocolsasl.mechanism, and sasl.jaas.config.

driver executor’s stderr output to the console

The stdout from the driver executor contains the console output as directed from the Spark job. Below we see the successfully aggregated results of the first Spark job, output to stdout.

+——————+——+——+
|country |sales |orders|
+——————+——+——+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+——————+——+——+
only showing top 25 rows

Example 2: Kafka Batch Aggregation to CSV in S3

Although the console is useful for development and debugging, it is typically not used in Production. Instead, Spark typically sends results to S3 as CSV, JSON, Parquet, or Arvo formatted files, to Kafka, to a database, or to an API endpoint. The second PySpark application, 02_example_csv_s3.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a CSV file in Amazon S3.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to CSV file in Amazon S3
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("02-example-csv-s3") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.csv(path=f"s3a://{args.s3_bucket}/output/", header=True, sep="|")
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your second PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first example, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 02-example-csv-s3 \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/02_example_csv_s3.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

If successful, the Spark job should create a single CSV file in the designated Amazon S3 key (directory path) and an empty _SUCCESS indicator file. The presence of an empty _SUCCESS file signifies that the save() operation completed normally.

Amazon S3 bucket showing CSV file output by Spark job

Below we see the expected pipe-delimited output from the second Spark job.

country|sales|orders
India|138.80|20
China|133.80|20
Mexico|106.86|14
Japan|100.86|14
Brazil|96.87|13
Russian Federation|94.87|13
United States|92.86|14
Nigeria|58.93|7
Philippines|58.92|8
South Africa|46.94|6
Argentina|42.93|7
Germany|39.96|4
Indonesia|38.95|5
Italy|35.95|5
Iran|33.95|5

Example 3: Kafka Batch Aggregation to Kafka

The third PySpark application, 03_example_kafka.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a second Kafka topic, topicB. This job now has both read and write options.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to topicB
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("03-example-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"topic":
args.write_topic,
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.select(F.to_json(F.struct("*"))).toDF("value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
parser.add_argument("–write_topic", default="topicB", required=False, help="Kafka topic to write to")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your next PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first two examples, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 03-example-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/03_example_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Once the job completes, you can confirm the results by returning to your EC2-based Kafka client. Use the same kafka-console-consumer command you used previously to show messages from topicB.

bin/kafka-console-consumer.sh \
–topic topicB \
–from-beginning –max-messages 10 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVERS \
–consumer.config config/client.properties

If the Spark job and the Kafka client command worked successfully, you should see aggregated messages similar to the example output below. Note we are not using keys with the Kafka messages, only values for these simple examples.

Aggregated messages from Kafka topic

Example 4: Spark Structured Streaming

For our final example, we will switch from batch to streaming — from read to readstream and from write to writestream. Before continuing, I suggest reading the Structured Streaming Programming Guide.

In this example, we will demonstrate how to continuously measure a common business metric — real-time sales volumes. Imagine you are sell products globally and want to understand the relationship between the time of day and buying patterns in different geographic regions in real-time. For any given window of time — this 15-minute period, this hour, this day, or this week— you want to know the current sales volumes by country. You are not reviewing previous sales periods or examing running sales totals, but real-time sales during a sliding time window.

We will use two PySpark jobs running concurrently to simulate this metric. The first application, 04_stream_sales_to_kafka.py, simulates streaming data by continuously writing messages to topicC — 2,000 messages with a 0.5-second delay between messages. In my tests, the job ran for ~28–29 minutes.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Write messages from a CSV file to Kafka topicC
# to simulate real-time streaming sales data
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import time
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("04-stream-sales-to-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = read_from_csv(spark, schema, args)
df_sales.cache()
write_to_kafka(spark, df_sales, args)
def read_from_csv(spark, schema, args):
df_sales = spark.read \
.csv(path=f"s3a://{args.s3_bucket}/sample_data/{args.sample_data_file}",
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"topic":
args.write_topic,
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp())
df_message \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
df_message.show(1)
time.sleep(args.message_delay)
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–write_topic", default="topicC", required=False, help="Kafka topic to write to")
parser.add_argument("–sample_data_file", default="sales_incremental_large.csv", required=False, help="data file")
parser.add_argument("–message_delay", default=0.5, required=False, help="message publishing delay")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

Simultaneously, the PySpark application, 05_streaming_kafka.py, continuously consumes the sales transaction messages from the same topic, topicC. Then, Spark aggregates messages over a sliding event-time window and writes the results to the console.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads stream of messages from Kafka topicC and
# writes stream of aggregations over sliding event-time window to console (stdout)
# References: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("05-streaming-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
.select("data.*", "timestamp") \
.withWatermark("timestamp", "10 minutes") \
.groupBy("country",
F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.sum("amount"), F.count("amount")) \
.orderBy(F.col("window").desc(),
F.col("sum(amount)").desc()) \
.select("country",
F.format_number("sum(amount)", 2).alias("sales"),
F.format_number("count(amount)", 0).alias("orders"),
"window.start", "window.end") \
.coalesce(1) \
.writeStream \
.queryName("streaming_to_console") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 10) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicC", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit the two PySpark jobs to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Again, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

# run 04 and 05 simultaneously
aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 04-stream-sales-to-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/04_stream_sales_to_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'
aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 05-streaming-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/05_streaming_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see both Spark jobs you just submitted in one of several job states.

EMR Studio Serverless Application details console

Using the Spark UI again, we can review the output from the second job, 05_streaming_kafka.py.

Spark UI’s Jobs tab

With Spark Structured Streaming jobs, we have an extra tab in the Spark UI, Structured Streaming. This tab displays all running jobs with their latest [micro]batch number, the aggregate rate of data arriving, and the aggregate rate at which Spark is processing data. Unfortunately, with MSK Serverless, AWS doesn’t appear to allow access to the detailed streaming query statistics via the Run ID, which greatly reduces its value. You receive a 502 error when clicking on the Run ID hyperlink.

Spark UI’s Structured Streaming tab

The output we are most interested in, again, is contained in the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI’s Executors tab

Below we see sample output from stderr. The output shows the results of a micro-batch. According to the Apache Spark documentation, internally, by default, Structured Streaming queries are processed using a micro-batch processing engine. The engine processes data streams as a series of small batch jobs, achieving end-to-end latencies as low as 100ms and exactly-once fault-tolerance guarantees.

22/07/25 14:29:04 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-1b3adb9c-766a-4aec-97a9-decfd7be10e7/commits/.10.b017bcdc-f142-4b28-8891-d3f2d471b740.tmp to file:/tmp/temporary-1b3adb9c-766a-4aec-97a9-decfd7be10e7/commits/10
22/07/25 14:29:04 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "bec9640f-ac16-4d00-bd4a-ed8d29b5f768",
"runId" : "a2e00c3a-924c-4728-b25a-56ee855da7da",
"name" : "streaming_to_console",
"timestamp" : "2022-07-25T14:29:00.000Z",
"batchId" : 10,
"numInputRows" : 73,
"inputRowsPerSecond" : 1.2166666666666666,
"processedRowsPerSecond" : 16.375056078959172,
"durationMs" : {
"addBatch" : 4359,
"getBatch" : 1,
"latestOffset" : 5,
"queryPlanning" : 37,
"triggerExecution" : 4458,
"walCommit" : 27
},
"eventTime" : {
"avg" : "2022-07-25T14:28:29.947Z",
"max" : "2022-07-25T14:28:59.290Z",
"min" : "2022-07-25T14:28:00.559Z",
"watermark" : "2022-07-25T14:17:59.737Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 476,
"numRowsUpdated" : 132,
"allUpdatesTimeMs" : 319,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 14173,
"memoryUsedBytes" : 262776,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 400,
"numStateStoreInstances" : 400,
"customMetrics" : {
"loadedMapCacheHitCount" : 15600,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 154208
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topicC]]",
"startOffset" : {
"topicC" : {
"2" : 101,
"5" : 96,
"4" : 88,
"1" : 84,
"3" : 112,
"0" : 100
}
},
"endOffset" : {
"topicC" : {
"2" : 114,
"5" : 110,
"4" : 98,
"1" : 98,
"3" : 124,
"0" : 110
}
},
"latestOffset" : {
"topicC" : {
"2" : 114,
"5" : 110,
"4" : 98,
"1" : 98,
"3" : 124,
"0" : 110
}
},
"numInputRows" : 73,
"inputRowsPerSecond" : 1.2166666666666666,
"processedRowsPerSecond" : 16.375056078959172,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@52e344bf",
"numOutputRows" : 238
}
}
view raw microbatch.txt hosted with ❤ by GitHub
Example of a Spark Structured Streaming MicroBatch output

The corresponding output to the micro-batch output above is shown below. We see the initial micro-batch results, starting with the first micro-batch before any messages are streamed to topicC.

——————————————-
Batch: 0
——————————————-
+——-+—–+——+—–+—+
|country|sales|orders|start|end|
+——-+—–+——+—–+—+
+——-+—–+——+—–+—+
——————————————-
Batch: 1
——————————————-
+———-+—–+——+——————-+——————-+
|country |sales|orders|start |end |
+———-+—–+——+——————-+——————-+
|Azerbaijan|7.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Sri Lanka |6.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Iran |5.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Brazil |5.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Azerbaijan|7.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Sri Lanka |6.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Brazil |5.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Iran |5.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
+———-+—–+——+——————-+——————-+
——————————————-
Batch: 2
——————————————-
+——————+—–+——+——————-+——————-+
|country |sales|orders|start |end |
+——————+—–+——+——————-+——————-+
|Russian Federation|43.94|6 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|China |37.94|6 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Mexico |34.96|4 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|India |33.95|5 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|United States |26.96|4 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Philippines |22.97|3 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Nigeria |22.97|3 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Iran |14.98|2 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Vietnam |13.98|2 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|United Kingdom |11.99|1 |2022-07-25 14:20:00|2022-07-25 14:30:00|
+——————+—–+——+——————-+——————-+
only showing top 10 rows
Example of a Spark Structured Streaming MicroBatch results to console

If you are familiar with Spark Structured Streaming, you are likely aware that these Spark jobs run continuously. In other words, the streaming jobs will not stop; they continually await more streaming data.

EMR Studio Serverless Application details console

The first job, 04_stream_sales_to_kafka.py, will run for ~28–29 minutes and stop with a status of Sucess. However, the second job, 05_streaming_kafka.py, the Spark Structured Streaming job, must be manually canceled.

EMR Studio Serverless Application details console

Cleaning Up

You can delete your resources from the AWS Management Console or AWS CLI. However, to delete your Amazon S3 bucket, all objects (including all object versions and delete markers) in the bucket must be deleted before the bucket itself can be deleted.

# delete applicatiom, cluster, and ec2 client
aws kafka delete-cluster –cluster-arn <your_msk_serverless_cluster_arn>
aws emr-serverless delete-application –application-id <your_application_id>
aws ec2 terminate-instances –instance-ids <your_ec2_instance_id>
# all objects (including all object versions and delete markers) in the bucket
# must be deleted before the bucket itself can be deleted.
aws s3api delete-bucket –bucket <your_s3_bucket>

Conclusion

In this post, we discovered how easy it is to adopt a serverless approach to Analytics on AWS. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks. With MSK Serverless, you can use Apache Kafka on demand and pay for the data you stream and retain. In addition, MSK Serverless automatically provisions and scales compute and storage resources. Given suitable analytics use cases, EMR Serverless with MSK Serverless will likely save you time, effort, and expense.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , ,

Leave a comment