Earning the AWS Certified Machine Learning — Specialty (MLS-C01) Certification

Introduction

Recently, I earned the AWS Certified Machine Learning — Specialty (MLS-C01) Certification, my ninth AWS certification. Since a few colleagues asked me about my preparation, I thought I would share it with the community, without divulging any details of the exam, of course.

Prerequisite Experience

Several AWS certifications can be earned with minimal to no hands-on AWS experience, but excellent short-term memorization skills. Although you will have technically earned the certification, you will certainly not be competent to practice the particular discipline. Certification does not equal qualification.

In my opinion, the AWS Certified Machine Learning — Specialty certification exam is not one of those where simple memorization of study materials, alone, will guarantee a passing score. If you lack practical experience in data science, machine learning, basic statistics, or data analytics on AWS, you will be challenged to pass this exam, no matter how much you cram.

Consider Data Analytics Certification First

To prepare for the Machine Learning — Specialty exam, I would strongly suggest first earning the AWS Certified Data Analytics — Specialty certification. According to the Machine Learning — Specialty exam’s content outline, “Domain 1: Data Engineering”, accounts for 20% of the exam’s score. Understanding the AWS Analytics services and how they integrate to form the most efficient data pipelines to feed your Machine Learning model training is a requirement for this portion of the exam’s questions. Preparing for the Data Analytics — Specialty certification will provide this adjacent domain knowledge:

  • Amazon Athena
  • Amazon EMR (pka Amazon Elastic MapReduce)
  • Amazon IAM
  • Amazon Kinesis Data Analytics, Data Firehose, Data Streams, Video Streams
  • Amazon Redshift
  • Amazon S3
  • Amazon VPC
  • AWS Data Pipeline
  • AWS Glue Crawlers, Jobs, Data Catalog
  • AWS Lambda
  • AWS Step Functions
My copious notes taken while preparing for the exam.
Take copious notes and review them right before taking the exam

Study Materials

In my case, certification success was a result of practical experience, coursework, completing and reviewing the results of several practice exams, and taking lots of notes. The following is a list of the study materials I found most impactful:

Documentation

I reviewed the Amazon SageMaker and other AWS fully-managed AI/ML service documentation for my preparation.

Carefully review the Choose an Algorithm section of the Amazon SageMaker Developer Guide. According to the exam’s content outline, “Domain 3: Modeling” accounts for 36% of the exam’s score. Understand 1) recommended use cases for each of SageMaker’s built-in algorithms, 2) the algorithm’s required hyperparameters, and 3) the prescribed model evaluation metrics and tuning techniques. Built-in SageMaker algorithms most commonly covered in most training materials include:

  • Tabular
    • XGBoost (eXtreme Gradient Boosting)
    • Linear Learner
    • K-Nearest Neighbors (KNN)
    • Factorization Machines
    • Object2Vec
  • Vision
    • Image Classification
    • Object Detection
    • Semantic Segmentation
  • Clustering
    • K-Means
  • Time-Series Forecast
    • DeepAR
  • Text Classification & Embedding
    • BlazingText
  • Text Transformation
    • Sequence-to-Sequence (Seq2Seq)
  • Text Topic Modeling
    • Neural Topic Modeling (NTM)
    • Latent Dirichlet Allocation (LDA)
  • Dimensionality Reduction
    • Principal Component Analysis (PCA)
  • Anomaly Detection
    • Random Cut Forest (RCF)
    • IP Insights

AWS also uses Read the Docs. SageMaker’s Algorithm section is especially helpful with respect to preparing for the Machine Learning — Specialty exam: image processing, text processing, time-series processing, supervised learning, unsupervised learning, and feature engineering algorithms.

Along with algorithms, review SageMaker’s Deploy Models for Inference documentation. According to the exam’s content outline, “Domain 4: Machine Learning Implementation and Operations” accounts for 20% of the exam’s score. Understand SageMaker’s options for model serving, model versioning, deployment strategies, and endpoint monitoring.

Review the AWS fully managed AI/ML services Developer Guide documentation for the following services:

  • Amazon Augmented AI
  • Amazon CodeGuru
  • Amazon Comprehend
  • Amazon Forecast
  • Amazon Fraud Detector
  • Amazon Kendra
  • Amazon Lex
  • Amazon Personalize
  • Amazon Polly
  • Amazon Rekognition
  • Amazon Textract
  • Amazon Transcribe
  • Amazon Translate

Understand the use cases for each of these services and most critically, how these managed services can be combined to create more complex AI/ML solutions. For example, building a near-real-time speech-to-speech translator with Amazon Transcribe, Amazon Translate, and Amazon Polly.

Online Courses

For my preparation, I completed three Udemy courses. Most of these online courses regularly go on sale and be purchased for $25 or less:

Udemy courses recommended in post.
Recommended Udemy online courses

Books

For my preparation, I read or re-read three books, two from Packt and one from O’Reilly:

  • AWS Certified Machine Learning Specialty: MLS-C01 Certification Guide, by Somanath Nanda and Weslley Moura (Packt Publishing). I recommend this one if you only have time to read a single book.
  • Practical Statistics for Data Scientists, 2nd Edition, by Peter Bruce, Andrew Bruce, Peter Gedeck (O’Reilly Media). According to the University of San Diego, “Statistics (or statistical analysis) is core to every machine learning algorithm.” This book covers many of the core statistical concepts behind Machine Learning, covered on the exam:
    • BLEU
    • Classification metrics: Precision-Recall Curve, ROC Curve, AUC
    • Confusion Matrix: TP, FP, TN, FN, Accuracy, Precision, Recall (Sensitivity), Specificity, F1
    • Correlated variables, Multicollinearity
    • Distributions: Normal (Gaussian or “bell curve”), Bernoulli, Binomial, Poisson
    • Elbow Method
    • Ensemble Learning: Bagging, Boosting
    • Euclidean Distance
    • K-Fold Cross-Validation
    • L1/L2 Regularization (lasso, alpha, ridge, lambda)
    • Overfitting, Underfitting, High Bias, High Variance, Bias-Variance Tradeoff
    • Plots: Histograms, Boxplots, Scatterplots
    • Regression metrics: MAE, MSE, RMSE, R-squared, Adjusted R-squared
    • Residuals
    • SMOTE
    • Standard Deviation, Three-Sigma/Empirical/68–95–99.7 Rule
    • Z-score
  • Python Machine Learning — Third Edition, by Sebastian Raschka and Vahid Mirjalili (Packt Publishing). Note that this book dives much deeper into the low-level statistical underpinnings of machine learning than is required for the exam, based on the exam outline. Again, don’t get caught up in the nitty-gritty details of Python; focus on the higher-level machine learning principles.

Scheduling the Exam

One last tip regarding when to take your exam. I have taken 15 AWS exams between nine AWS certifications and several recertifications. Although the Certified Machine Learning — Specialty exam is difficult, I found changing the time I sat the exam, greatly reduced my stress level. In the past, I took time off on a workday to complete exams, either in person or at home using online proctoring. I was preparing for the exam while frequently being interrupted by work-related items. For this exam, I chose to use online proctoring and took my exam at 6:00 AM on a Sunday morning. Up early, fresh, and full of energy, with no work- or family-related interruptions, no lawnmowers, dogs barking, or garbage trucks rumbling by, and no Internet bandwidth issues. I was done by 9:00 AM and eating breakfast with the family.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Exploring Popular Open-source Stream Processing Technologies: Part 2 of 2

A brief demonstration of Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot with Apache Superset

Introduction

According to TechTarget, “Stream processing is a data management technique that involves ingesting a continuous data stream to quickly analyze, filter, transform or enhance the data in real-time. Once processed, the data is passed off to an application, data store, or another stream processing engine.Confluent, a fully-managed Apache Kafka market leader, defines stream processing as “a software paradigm that ingests, processes, and manages continuous streams of data while they’re still in motion.

This two-part post series and forthcoming video explore four popular open-source software (OSS) stream processing projects: Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot.

This post uses the open-source projects, making it easier to follow along with the demonstration and keeping costs to a minimum. However, you could easily substitute the open-source projects for your preferred SaaS, CSP, or COSS service offerings.

Part Two

We will continue our exploration in part two of this two-part post, covering Apache Flink and Apache Pinot. In addition, we will incorporate Apache Superset into the demonstration to visualize the real-time results of our stream processing pipelines as a dashboard.

Demonstration #3: Apache Flink

In the third demonstration of four, we will examine Apache Flink. For this part of the post, we will also use the third of the three GitHub repository projects, flink-kafka-demo. The project contains a Flink application written in Java, which performs stream processing, incremental aggregation, and multi-stream joins.

High-level workflow for Apache Flink demonstration

New Streaming Stack

To get started, we need to replace the first streaming Docker Swarm stack, deployed in part one, with the second streaming Docker Swarm stack. The second stack contains Apache Kafka, Apache Zookeeper, Apache Flink, Apache Pinot, Apache Superset, UI for Apache Kafka, and Project Jupyter (JupyterLab).

https://programmaticponderings.wordpress.com/media/601efca17604c3a467a4200e93d7d3ff

The stack will take a few minutes to deploy fully. When complete, there should be ten containers running in the stack.

Viewing the Docker streaming stack’s ten containers

Flink Application

The Flink application has two entry classes. The first class, RunningTotals, performs an identical aggregation function as the previous KStreams demo.

public static void flinkKafkaPipeline(Properties prop) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// assumes PLAINTEXT authentication
KafkaSource<Purchase> source = KafkaSource.<Purchase>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PURCHASES_TOPIC"))
.setGroupId("flink_reduce_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new PurchaseDeserializationSchema())
.build();
DataStream<Purchase> purchases = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<RunningTotal> runningTotals = purchases
.flatMap((FlatMapFunction<Purchase, RunningTotal>) (purchase, out) -> out.collect(
new RunningTotal(
purchase.getTransactionTime(),
purchase.getProductId(),
1,
purchase.getQuantity(),
purchase.getTotalPurchase()
))
).returns(RunningTotal.class)
.keyBy(RunningTotal::getProductId)
.reduce((runningTotal1, runningTotal2) -> {
runningTotal2.setTransactions(runningTotal1.getTransactions() + runningTotal2.getTransactions());
runningTotal2.setQuantities(runningTotal1.getQuantities() + runningTotal2.getQuantities());
runningTotal2.setSales(runningTotal1.getSales().add(runningTotal2.getSales()));
return runningTotal2;
});
KafkaSink<RunningTotal> sink = KafkaSink.<RunningTotal>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(prop.getProperty("RUNNING_TOTALS_TOPIC"))
.setValueSerializationSchema(new RunningTotalSerializationSchema())
.build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
runningTotals.sinkTo(sink);
env.execute("Flink Running Totals Demo");
}

The second class, JoinStreams, joins the stream of data from the demo.purchases topic and the demo.products topic, processing and combining them, in real-time, into an enriched transaction and publishing the results to a new topic, demo.purchases.enriched.

public static void flinkKafkaPipeline(Properties prop) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// assumes PLAINTEXT authentication
KafkaSource<Product> productSource = KafkaSource.<Product>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PRODUCTS_TOPIC"))
.setGroupId("flink_join_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new ProductDeserializationSchema())
.build();
DataStream<Product> productsStream = env.fromSource(
productSource, WatermarkStrategy.noWatermarks(), "Kafka Products Source");
tableEnv.createTemporaryView("products", productsStream);
KafkaSource<Purchase> purchasesSource = KafkaSource.<Purchase>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setTopics(prop.getProperty("PURCHASES_TOPIC"))
.setGroupId("flink_join_demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new PurchaseDeserializationSchema())
.build();
DataStream<Purchase> purchasesStream = env.fromSource(
purchasesSource, WatermarkStrategy.noWatermarks(), "Kafka Purchases Source");
tableEnv.createTemporaryView("purchases", purchasesStream);
Table result =
tableEnv.sqlQuery(
"SELECT " +
"purchases.transactionTime, " +
"TO_TIMESTAMP(purchases.transactionTime), " +
"purchases.transactionId, " +
"purchases.productId, " +
"products.category, " +
"products.item, " +
"products.size, " +
"products.cogs, " +
"products.price, " +
"products.containsFruit, " +
"products.containsVeggies, " +
"products.containsNuts, " +
"products.containsCaffeine, " +
"purchases.price, " +
"purchases.quantity, " +
"purchases.isMember, " +
"purchases.memberDiscount, " +
"purchases.addSupplements, " +
"purchases.supplementPrice, " +
"purchases.totalPurchase " +
"FROM " +
"products " +
"JOIN purchases " +
"ON products.productId = purchases.productId"
);
DataStream<PurchaseEnriched> purchasesEnrichedTable = tableEnv.toDataStream(result,
PurchaseEnriched.class);
KafkaSink<PurchaseEnriched> sink = KafkaSink.<PurchaseEnriched>builder()
.setBootstrapServers(prop.getProperty("BOOTSTRAP_SERVERS"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(prop.getProperty("PURCHASES_ENRICHED_TOPIC"))
.setValueSerializationSchema(new PurchaseEnrichedSerializationSchema())
.build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
purchasesEnrichedTable.sinkTo(sink);
env.execute("Flink Streaming Join Demo");
}

The resulting enriched purchases messages look similar to the following:

{
"transaction_time": "2022-09-25 01:58:11.714838",
"transaction_id": "4068565608708439642",
"product_id": "CS08",
"product_category": "Classic Smoothies",
"product_name": "Rockin’ Raspberry",
"product_size": "24 oz.",
"product_cogs": 1.5,
"product_price": 4.99,
"contains_fruit": true,
"contains_veggies": false,
"contains_nuts": false,
"contains_caffeine": false,
"purchase_price": 4.99,
"purchase_quantity": 2,
"is_member": false,
"member_discount": 0,
"add_supplements": false,
"supplement_price": 0,
"total_purchase": 9.98
}
Sample enriched purchase message

Running the Flink Job

To run the Flink application, we must first compile it into an uber JAR.

We can copy the JAR into the Flink container or upload it through the Apache Flink Dashboard, a browser-based UI. For this demonstration, we will upload it through the Apache Flink Dashboard, accessible on port 8081.

The project’s build.gradle file has preset the Main class (Flink’s Entry class) to org.example.JoinStreams. Optionally, to run the Running Totals demo, we could change the build.gradle file and recompile, or simply change Flink’s Entry class to org.example.RunningTotals.

Uploading the JAR to Apache Flink

Before running the Flink job, restart the sales generator in the background (nohup python3 ./producer.py &) to generate a new stream of data. Then start the Flink job.

Apache Flink job running successfully

To confirm the Flink application is running, we can check the contents of the new demo.purchases.enriched topic using the Kafka CLI.

The new demo.purchases.enriched topic populated with messages from Apache Flink

Alternatively, you can use the UI for Apache Kafka, accessible on port 9080.

Viewing messages in the UI for Apache Kafka

Demonstration #4: Apache Pinot

In the fourth and final demonstration, we will explore Apache Pinot. First, we will query the unbounded data streams from Apache Kafka, generated by both the sales generator and the Apache Flink application, using SQL. Then, we build a real-time dashboard in Apache Superset, with Apache Pinot as our datasource.

Creating Tables

According to the Apache Pinot documentation, “a table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot).” There are three types of Pinot tables: Offline, Realtime, and Hybrid. For this demonstration, we will create three Realtime tables. Realtime tables ingest data from streams — in our case, Kafka — and build segments from the consumed data. Further, according to the documentation, “each table in Pinot is associated with a Schema. A schema defines what fields are present in the table along with the data types. The schema is stored in Zookeeper, along with the table configuration.

Below, we see the schema and config for one of the three Realtime tables, purchasesEnriched. Note how the columns are divided into three categories: Dimension, Metric, and DateTime.

{
"schemaName": "purchasesEnriched",
"dimensionFieldSpecs": [
{
"name": "transaction_id",
"dataType": "STRING"
},
{
"name": "product_id",
"dataType": "STRING"
},
{
"name": "product_category",
"dataType": "STRING"
},
{
"name": "product_name",
"dataType": "STRING"
},
{
"name": "product_size",
"dataType": "STRING"
},
{
"name": "product_cogs",
"dataType": "FLOAT"
},
{
"name": "product_price",
"dataType": "FLOAT"
},
{
"name": "contains_fruit",
"dataType": "BOOLEAN"
},
{
"name": "contains_veggies",
"dataType": "BOOLEAN"
},
{
"name": "contains_nuts",
"dataType": "BOOLEAN"
},
{
"name": "contains_caffeine",
"dataType": "BOOLEAN"
},
{
"name": "purchase_price",
"dataType": "FLOAT"
},
{
"name": "is_member",
"dataType": "BOOLEAN"
},
{
"name": "member_discount",
"dataType": "FLOAT"
},
{
"name": "add_supplements",
"dataType": "BOOLEAN"
},
{
"name": "supplement_price",
"dataType": "FLOAT"
}
],
"metricFieldSpecs": [
{
"name": "purchase_quantity",
"dataType": "INT"
},
{
"name": "total_purchase",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [
{
"name": "transaction_time",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSSSSS",
"granularity": "1:MILLISECONDS"
}
]
}
Schema file for purchasesEnriched Realtime table
{
"tableName": "purchasesEnriched",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "transaction_time",
"timeType": "MILLISECONDS",
"schemaName": "purchasesEnriched",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "demo.purchases.enriched",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:29092",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.rows": "50000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {}
}
Config file for purchasesEnriched Realtime table

To begin, copy the three Pinot Realtime table schemas and configurations from the streaming-sales-generator GitHub project into the Apache Pinot Controller container. Next, use a docker exec command to call the Pinot Command Line Interface’s (CLI) AddTable command to create the three tables: products, purchases, and purchasesEnriched.

# copy pinot table schema and config files to pinot controller
CONTROLLER_CONTAINER=$(docker container ls –filter name=streaming-stack_pinot-controller.1 –format "{{.ID}}")
cd ~/streaming-sales-generator/apache_pinot_examples
docker cp configs_schemas/ ${CONTROLLER_CONTAINER}:/tmp/
# create three tables
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/purchases-config.json \
-schemaFile /tmp/configs_schemas/purchases-schema.json -exec
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/products-config.json \
-schemaFile /tmp/configs_schemas/products-schema.json -exec
docker exec -it ${CONTROLLER_CONTAINER} \
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/configs_schemas/purchases-enriched-config.json \
-schemaFile /tmp/configs_schemas/purchases-enriched-schema.json -exec

To confirm the three tables were created correctly, use the Apache Pinot Data Explorer accessible on port 9000. Use the Tables tab in the Cluster Manager.

Cluster Manager’s Tables tab shows the three Realtime tables and corresponding schemas

We can further inspect and edit the table’s config and schema from the Tables tab in the Cluster Manager.

Realtime table’s editable config and schema

The three tables are configured to read the unbounded stream of data from the corresponding Kafka topics: demo.products, demo.purchases, and demo.purchases.enriched.

Querying with Pinot

We can use Pinot’s Query Console to query the Realtime tables using SQL. According to the documentation, “Pinot provides a SQL interface for querying. It uses the [Apache] Calcite SQL parser to parse queries and uses MYSQL_ANSI dialect.

Schema and query results for the purchases table

With the generator still running, re-query the purchases table in the Query Console (select count(*) from purchases). You should notice the document count increasing each time you re-run the query since new messages are published to the demo.purchases topic by the sales generator.

If you do not observe the count increasing, ensure the sales generator and Flink enrichment job are running.

Query Console showing the purchases table’s document count continuing to increase

Table Joins?

It might seem logical to want to replicate the same multi-stream join we performed with Apache Flink in part three of the demonstration on the demo.products and demo.purchases topics. Further, we might presume to join the products and purchases realtime tables by writing a SQL statement in Pinot’s Query Console. However, according to the documentation, at the time of this post, version 0.11.0 of Pinot did not [currently] support joins or nested subqueries.

This current join limitation is why we created the Realtime table, purchasesEnriched, allowing us to query Flink’s real-time results in the demo.purchases.enriched topic. We will use both Flink and Pinot as part of our stream processing pipeline, taking advantage of each tool’s individual strengths and capabilities.

Note, according to the documentation for the latest release of Pinot on the main branch, “the latest Pinot multi-stage supports inner join, left-outer, semi-join, and nested queries out of the box. It is optimized for in-memory process and latency.” For more information on joins as part of Pinot’s new multi-stage query execution engine, read the documentation, Multi-Stage Query Engine.

Query showing results from the demo.purchases.enriched topic in real-time

Aggregations

We can perform real-time aggregations using Pinot’s rich SQL query interface. For example, like previously with Spark and Flink, we can calculate running totals for the number of items sold and the total sales for each product in real time.

Aggregating running totals for each product

We can do the same with the purchasesEnriched table, which will use the continuous stream of enriched transaction data from our Apache Flink application. With the purchasesEnriched table, we can add the product name and product category for richer results. Each time we run the query, we get real-time results based on the running sales generator and Flink enrichment job.

Aggregating running totals for each product

Query Options and Indexing

Note the reference to the Star-Tree index at the start of the SQL query shown above. Pinot provides several query options, including useStarTree (true by default).

Multiple indexing techniques are available in Pinot, including Forward Index, Inverted Index, Star-tree Index, Bloom Filter, and Range Index, among others. Each has advantages in different query scenarios. According to the documentation, by default, Pinot creates a dictionary-encoded forward index for each column.

SQL Examples

Here are a few examples of SQL queries you can try in Pinot’s Query Console:

products
SELECT
COUNT(product_id) AS product_count,
AVG(price) AS avg_price,
AVG(cogs) AS avg_cogs,
AVG(price) AVG(cogs) AS avg_gross_profit
FROM
products;
purchases
SELECT
product_id,
SUMPRECISION(quantity, 10, 0) AS quantity,
SUMPRECISION(total_purchase, 10, 2) AS sales
FROM
purchases
GROUP BY
product_id
ORDER BY
sales DESC;
purchasesEnriched
SELECT
product_id,
product_name,
product_category,
SUMPRECISION(purchase_quantity, 10, 0) AS quantity,
SUMPRECISION(total_purchase, 10, 2) AS sales
FROM
purchasesEnriched
GROUP BY
product_id,
product_name,
product_category
ORDER BY
sales DESC;

Troubleshooting Pinot

If have issues with creating the tables or querying the real-time data, you can start by reviewing the Apache Pinot logs:

CONTROLLER_CONTAINER=$(docker container ls –filter name=streaming-stack_pinot-controller.1 –format "{{.ID}}")
docker exec -it ${CONTROLLER_CONTAINER} cat logs/pinot-all.log
view raw pinot_logs.sh hosted with ❤ by GitHub

Real-time Dashboards with Apache Superset

To display the real-time stream of data produced results of our Apache Flink stream processing job and made queriable by Apache Pinot, we can use Apache Superset. Superset positions itself as “a modern data exploration and visualization platform.” Superset allows users “to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

According to the documentation, “Superset requires a Python DB-API database driver and a SQLAlchemy dialect to be installed for each datastore you want to connect to.” In the case of Apache Pinot, we can use pinotdb as the Python DB-API and SQLAlchemy dialect for Pinot. Since the existing Superset Docker container does not have pinotdb installed, I have built and published a Docker Image with the driver and deployed it as part of the second streaming stack of containers.

# Custom Superset build to add apache pinot driver
# Gary A. Stafford (2022-09-25)
# Updated: 2022-12-18
FROM apache/superset:66138b0ca0b82a94404e058f0cc55517b2240069
# Switching to root to install the required packages
USER root
# Find which driver you need based on the analytics database:
# https://superset.apache.org/docs/databases/installing-database-drivers
RUN pip install mysqlclient psycopg2-binary pinotdb
# Switching back to using the `superset` user
USER superset
view raw Dockerfile hosted with ❤ by GitHub

First, we much configure the Superset container instance. These instructions are documented as part of the Superset Docker Image repository.

# establish an interactive session with the superset container
SUPERSET_CONTAINER=$(docker container ls –filter name=streaming-stack_superset.1 –format "{{.ID}}")
# initialize superset (see superset documentation)
docker exec -it ${SUPERSET_CONTAINER} \
superset fab create-admin \
–username admin \
–firstname Superset \
–lastname Admin \
–email admin@superset.com \
–password sUp3rS3cREtPa55w0rD1
docker exec -it ${SUPERSET_CONTAINER} superset db upgrade
docker exec -it ${SUPERSET_CONTAINER} superset init

Once the configuration is complete, we can log into the Superset web-browser-based UI accessible on port 8088.

Home page of the Superset web-browser-based UI

Pinot Database Connection and Dataset

Next, to connect to Pinot from Superset, we need to create a Database Connection and a Dataset.

Creating a new database connection to Pinot

The SQLAlchemy URI is shown below. Input the URI, test your connection (‘Test Connection’), make sure it succeeds, then hit ‘Connect’.

pinot+http://pinot-broker:8099/query?controller=http://pinot-controller:9000

Next, create a Dataset that references the purchasesEnriched Pinot table.

Creating a new dataset allowing us access to the purchasesEnriched Pinot table

Modify the dataset’s transaction_time column. Check the is_temporal and Default datetime options. Lastly, define the DateTime format as epoch_ms.

Modifying the dataset’s transaction_time column

Building a Real-time Dashboard

Using the new dataset, which connects Superset to the purchasesEnriched Pinot table, we can construct individual charts to be placed on a dashboard. Build a few charts to include on your dashboard.

Example of a chart whose data source is the new dataset
List of charts that included on the dashboard

Create a new Superset dashboard and add the charts and other elements, such as headlines, dividers, and tabs.

Apache Superset dashboard displaying data from Apache Pinot Realtime table

We can apply a refresh interval to the dashboard to continuously query Pinot and visualize the results in near real-time.

Configuring a refresh interval for the dashboard

Conclusion

In this two-part post series, we were introduced to stream processing. We explored four popular open-source stream processing projects: Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot. Next, we learned how we could solve similar stream processing and streaming analytics challenges using different streaming technologies. Lastly, we saw how these technologies, such as Kafka, Flink, Pinot, and Superset, could be integrated to create effective stream processing pipelines.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , , , , ,

Leave a comment

Exploring Popular Open-source Stream Processing Technologies: Part 1 of 2

A brief demonstration of Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot with Apache Superset

Introduction

According to TechTarget, “Stream processing is a data management technique that involves ingesting a continuous data stream to quickly analyze, filter, transform or enhance the data in real-time. Once processed, the data is passed off to an application, data store, or another stream processing engine.Confluent, a fully-managed Apache Kafka market leader, defines stream processing as “a software paradigm that ingests, processes, and manages continuous streams of data while they’re still in motion.

Batch vs. Stream Processing

Again, according to Confluent, “Batch processing is when the processing and analysis happens on a set of data that have already been stored over a period of time.” A batch processing example might include daily retail sales data, which is aggregated and tabulated nightly after the stores close. Conversely, “streaming data processing happens as the data flows through a system. This results in analysis and reporting of events as it happens.” To use a similar example, instead of nightly batch processing, the streams of sales data are processed, aggregated, and analyzed continuously throughout the day — sales volume, buying trends, inventory levels, and marketing program performance are tracked in real time.

Bounded vs. Unbounded Data

According to Packt Publishing’s book, Learning Apache Apex, “bounded data is finite; it has a beginning and an end. Unbounded data is an ever-growing, essentially infinite data set.” Batch processing is typically performed on bounded data, whereas stream processing is most often performed on unbounded data.

Stream Processing Technologies

There are many technologies available to perform stream processing. These include proprietary custom software, commercial off-the-shelf (COTS) software, fully-managed service offerings from Software as a Service (or SaaS) providers, Cloud Solution Providers (CSP), Commercial Open Source Software (COSS) companies, and popular open-source projects from the Apache Software Foundation and Linux Foundation.

The following two-part post and forthcoming video will explore four popular open-source software (OSS) stream processing projects, including Apache Spark Structured Streaming, Apache Kafka Streams, Apache Flink, and Apache Pinot. Each of these projects has some equivalent SaaS, CSP, and COSS offerings.

This post uses the open-source projects, making it easier to follow along with the demonstration and keeping costs to a minimum. However, you could easily substitute the open-source projects for your preferred SaaS, CSP, or COSS service offerings.

Apache Spark Structured Streaming

According to the Apache Spark documentation, “Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.” Further, “Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.” In the post, we will examine both batch and stream processing using a series of Apache Spark Structured Streaming jobs written in PySpark.

Spark Structured Streaming job statistics as seen from the Spark UI

Apache Kafka Streams

According to the Apache Kafka documentation, “Kafka Streams [aka KStreams] is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.” In the post, we will examine a KStreams application written in Java that performs stream processing and incremental aggregation.

Building the KStreams application’s uber JAR in JetBrains IntelliJ IDEA

Apache Flink

According to the Apache Flink documentation, “Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.” Further, “Apache Flink excels at processing unbounded and bounded data sets. Precise control of time and state enables Flink’s runtime to run any kind of application on unbounded streams. Bounded streams are internally processed by algorithms and data structures that are specifically designed for fixed-sized data sets, yielding excellent performance.” In the post, we will examine a Flink application written in Java, which performs stream processing, incremental aggregation, and multi-stream joins.

Apache Flink Dashboard showing Flink pipeline demonstrated in this post

Apache Pinot

According to Apache Pinot’s documentation, “Pinot is a real-time distributed OLAP datastore, purpose-built to provide ultra-low-latency analytics, even at extremely high throughput. It can ingest directly from streaming data sources — such as Apache Kafka and Amazon Kinesis — and make the events available for querying instantly. It can also ingest from batch data sources such as Hadoop HDFS, Amazon S3, Azure ADLS, and Google Cloud Storage.” In the post, we will query the unbounded data streams from Apache Kafka, generated by Apache Flink, using SQL.

Apache Pinot Query Console showing tables demonstrated in this post

Streaming Data Source

We must first find a good unbounded data source to explore or demonstrate these streaming technologies. Ideally, the streaming data source should be complex enough to allow multiple types of analyses and visualize different aspects with Business Intelligence (BI) and dashboarding tools. Additionally, the streaming data source should possess a degree of consistency and predictability while displaying a reasonable level of variability and randomness.

To this end, we will use the open-source Streaming Synthetic Sales Data Generator project, which I have developed and made available on GitHub. This project’s highly-configurable, Python-based, synthetic data generator generates an unbounded stream of product listings, sales transactions, and inventory restocking activities to a series of Apache Kafka topics.

Streaming Synthetic Sales Data Generator publishing messages to Apache Kafka

Source Code

All the source code demonstrated in this post is open source and available on GitHub. There are three separate GitHub projects:

# streaming data generator, Apache Spark and Apache Pinot examples
git clone –depth 1 -b main \
https://github.com/garystafford/streaming-sales-generator.git
# Apache Flink examples
git clone –depth 1 -b main \
https://github.com/garystafford/flink-kafka-demo.git
# Kafka Streams examples
git clone –depth 1 -b main \
https://github.com/garystafford/kstreams-kafka-demo.git

Docker

To make it easier to follow along with the demonstration, we will use Docker Swarm to provision the streaming tools. Alternatively, you could use Kubernetes (e.g., creating a Helm chart) or your preferred CSP or SaaS managed services. Nothing in this demonstration requires you to use a paid service.

The two Docker Swarm stacks are located in the Streaming Synthetic Sales Data Generator project:

  1. Streaming Stack — Part 1: Apache Kafka, Apache Zookeeper, Apache Spark, UI for Apache Kafka, and the KStreams application
  2. Streaming Stack — Part 2: Apache Kafka, Apache Zookeeper, Apache Flink, Apache Pinot, Apache Superset, UI for Apache Kafka, and Project Jupyter (JupyterLab).*

* the Jupyter container can be used as an alternative to the Spark container for running PySpark jobs (follow the same steps as for Spark, below)

Demonstration #1: Apache Spark

In the first of four demonstrations, we will examine two Apache Spark Structured Streaming jobs, written in PySpark, demonstrating both batch processing (spark_batch_kafka.py) and stream processing (spark_streaming_kafka.py). We will read from a single stream of data from a Kafka topic, demo.purchases, and write to the console.

High-level workflow for Apache Spark demonstration

Deploying the Streaming Stack

To get started, deploy the first streaming Docker Swarm stack containing the Apache Kafka, Apache Zookeeper, Apache Spark, UI for Apache Kafka, and the KStreams application containers.

# cd into project
cd streaming-sales-generator/
# initialize swarm stack – 1x only
docker swarm init
# optional: delete previous streaming-stack
docker stack rm streaming-stack
# deploy first streaming-stack
docker stack deploy streaming-stack –compose-file docker/spark-kstreams-stack.yml
# observe the deployment's progress
docker stack services streaming-stack

The stack will take a few minutes to deploy fully. When complete, there should be a total of six containers running in the stack.

Viewing the Docker streaming stack’s six containers

Sales Generator

Before starting the streaming data generator, confirm or modify the configuration/configuration.ini. Three configuration items, in particular, will determine how long the streaming data generator runs and how much data it produces. We will set the timing of transaction events to be generated relatively rapidly for test purposes. We will also set the number of events high enough to give us time to explore the Spark jobs. Using the below settings, the generator should run for an average of approximately 50–60 minutes: (((5 sec + 2 sec)/2)*1000 transactions)/60 sec=~58 min on average. You can run the generator again if necessary or increase the number of transactions.

[SALES]
# minimum sales frequency in seconds (debug with 1, typical min. 120)
min_sale_freq = 2
# maximum sales frequency in seconds (debug with 3, typical max. 300)
max_sale_freq = 5
# number of transactions to generate
number_of_sales = 1000
A code snippet from the project’s configuration.ini file

Start the streaming data generator as a background service:

# install required python packages (1x)
python3 -m pip install kafka-python
cd sales_generator/
# run in foreground
python3 ./producer.py
# better option, run as background process
nohup python3 ./producer.py &
# confirm process is running
ps -u

The streaming data generator will start writing data to three Apache Kafka topics: demo.products, demo.purchases, and demo.inventories. We can view these topics and their messages by logging into the Apache Kafka container and using the Kafka CLI:

# establish an interactive session with the spark container
KAFKA_CONTAINER=$(docker container ls –filter name=streaming-stack_kafka.1 –format "{{.ID}}")
docker exec -it ${KAFKA_CONTAINER} bash
# set environment variables used by jobs
export BOOTSTRAP_SERVERS="localhost:9092"
export TOPIC_PRODUCTS="demo.products"
export TOPIC_PURCHASES="demo.purchases"
export TOPIC_INVENTORIES="demo.inventories"
# list topics
kafka-topics.sh –list –bootstrap-server $BOOTSTRAP_SERVERS
# read topics from beginning
kafka-console-consumer.sh \
–topic $TOPIC_PRODUCTS –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $TOPIC_PURCHASES –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $TOPIC_INVENTORIES –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS

Below, we see a few sample messages from the demo.purchases topic:

Consuming messages from Kafka’s demo.purchases topic

Alternatively, you can use the UI for Apache Kafka, accessible on port 9080.

Viewing demo.purchases topic in the UI for Apache Kafka
Viewing messages in the demo.purchases topic using the UI for Apache Kafka

Prepare Spark

Next, prepare the Spark container to run the Spark jobs:

# establish an interactive session with the spark container
SPARK_CONTAINER=$(docker container ls –filter name=streaming-stack_spark.1 –format "{{.ID}}")
docker exec -it -u 0 ${SPARK_CONTAINER} bash
# update and install wget
apt-get update && apt-get install wget vim -y
# install required job dependencies
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.1/kafka-clients-3.3.1.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.1/spark-sql-kafka-0-10_2.12-3.3.1.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.1/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
mv *.jar /opt/bitnami/spark/jars/
exit
Preparing the Spark container instance as the root user

Running the Spark Jobs

Next, copy the jobs from the project to the Spark container, then exec back into the container:

# copy jobs to spark container
docker cp apache_spark_examples/ ${SPARK_CONTAINER}:/home/
# establish an interactive session with the spark container
docker exec -it ${SPARK_CONTAINER} bash

Batch Processing with Spark

The first Spark job, spark_batch_kafka.py, aggregates the number of items sold and the total sales for each product, based on existing messages consumed from the demo.purchases topic. We use the PySpark DataFrame class’s read() and write() methods in the first example, reading from Kafka and writing to the console. We could just as easily write the results back to Kafka.

ds_sales = (
df_sales.selectExpr("CAST(value AS STRING)")
.select(F.from_json("value", schema=schema).alias("data"))
.select("data.*")
.withColumn("row", F.row_number().over(window))
.withColumn("quantity", F.sum(F.col("quantity")).over(window_agg))
.withColumn("sales", F.sum(F.col("total_purchase")).over(window_agg))
.filter(F.col("row") == 1)
.drop("row")
.select(
"product_id",
F.format_number("sales", 2).alias("sales"),
F.format_number("quantity", 0).alias("quantity"),
)
.coalesce(1)
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
.write.format("console")
.option("numRows", 25)
.option("truncate", False)
.save()
)
A snippet of batch processing Spark job’s summarize_sales() method

The batch processing job sorts the results and outputs the top 25 items by total sales to the console. The job should run to completion and exit successfully.

Batch results for top 25 items by total sales

To run the batch Spark job, use the following commands:

# set environment variables used by jobs
export BOOTSTRAP_SERVERS="kafka:29092"
export TOPIC_PURCHASES="demo.purchases"
cd /home/apache_spark_examples/
# run batch processing job
spark-submit spark_batch_kafka.py
Run the batch Spark job

Stream Processing with Spark

The stream processing Spark job, spark_streaming_kafka.py, also aggregates the number of items sold and the total sales for each item, based on messages consumed from the demo.purchases topic. However, as shown in the code snippet below, this job continuously aggregates the stream of data from Kafka, displaying the top ten product totals within an arbitrary ten-minute sliding window, with a five-minute overlap, and updates output every minute to the console. We use the PySpark DataFrame class’s readStream() and writeStream() methods as opposed to the batch-oriented read() and write() methods in the first example.

ds_sales = (
df_sales.selectExpr("CAST(value AS STRING)")
.select(F.from_json("value", schema=schema).alias("data"))
.select("data.*")
.withWatermark("transaction_time", "10 minutes")
.groupBy("product_id", F.window("transaction_time", "10 minutes", "5 minutes"))
.agg(F.sum("total_purchase"), F.sum("quantity"))
.orderBy(F.col("window").desc(), F.col("sum(total_purchase)").desc())
.select(
"product_id",
F.format_number("sum(total_purchase)", 2).alias("sales"),
F.format_number("sum(quantity)", 0).alias("drinks"),
"window.start",
"window.end",
)
.coalesce(1)
.writeStream.queryName("streaming_to_console")
.trigger(processingTime="1 minute")
.outputMode("complete")
.format("console")
.option("numRows", 10)
.option("truncate", False)
.start()
)
ds_sales.awaitTermination()
A snippet of stream processing Spark job’s summarize_sales() method

Shorter event-time windows are easier for demonstrations — in Production, hourly, daily, weekly, or monthly windows are more typical for sales analysis.

Micro-batch representing real-time totals for the current ten-minute window

To run the stream processing Spark job, use the following commands:

# run stream processing job
spark-submit spark_streaming_kafka.py
Run the stream processing Spark job

We could just as easily calculate running totals for the stream of sales data versus aggregations over a sliding event-time window (example job included in project).

Micro-batch representing running totals for data stream as opposed to using event-time windows

Be sure to kill the stream processing Spark jobs when you are done, or they will continue to run, awaiting more data.

Demonstration #2: Apache Kafka Streams

Next, we will examine Apache Kafka Streams (aka KStreams). For this part of the post, we will also use the second of the three GitHub repository projects, kstreams-kafka-demo. The project contains a KStreams application written in Java that performs stream processing and incremental aggregation.

High-level workflow for KStreams demonstration

KStreams Application

The KStreams application continuously consumes the stream of messages from the demo.purchases Kafka topic (source) using an instance of the StreamBuilder() class. It then aggregates the number of items sold and the total sales for each item, maintaining running totals, which are then streamed to a new demo.running.totals topic (sink). All of this using an instance of the KafkaStreams() Kafka client class.

private static void kStreamPipeline(Properties props) {
System.out.println("Starting…");
Properties kafkaStreamsProps = new Properties();
kafkaStreamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, props.getProperty("APPLICATION_ID"));
kafkaStreamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("BOOTSTRAP_SERVERS"));
kafkaStreamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, props.getProperty("AUTO_OFFSET_RESET_CONFIG"));
kafkaStreamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, props.getProperty("COMMIT_INTERVAL_MS_CONFIG"));
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(props.getProperty("INPUT_TOPIC"), Consumed.with(Serdes.Void(), CustomSerdes.Purchase()))
.peek((unused, purchase) -> System.out.println(purchase.toString()))
.flatMap((KeyValueMapper<Void, Purchase, Iterable<KeyValue<String, Total>>>) (unused, purchase) -> {
List<KeyValue<String, Total>> result = new ArrayList<>();
result.add(new KeyValue<>(purchase.getProductId(), new Total(
purchase.getTransactionTime(),
purchase.getProductId(),
1,
purchase.getQuantity(),
purchase.getTotalPurchase()
)));
return result;
})
.groupByKey(Grouped.with(Serdes.String(), CustomSerdes.Total()))
.reduce((total1, total2) -> {
total2.setTransactions(total1.getTransactions() + total2.getTransactions());
total2.setQuantities(total1.getQuantities() + total2.getQuantities());
total2.setSales(total1.getSales().add(total2.getSales()));
return total2;
})
.toStream()
.peek((productId, total) -> System.out.println(total.toString()))
.to(props.getProperty("OUTPUT_TOPIC"), Produced.with(Serdes.String(), CustomSerdes.Total()));
KafkaStreams streams = new KafkaStreams(builder.build(), kafkaStreamsProps);
streams.start();
System.out.println("Running…");
}
A snippet of KStreams application’s kStreamPipeline() method

Running the Application

We have at least three choices to run the KStreams application for this demonstration: 1) running locally from our IDE, 2) a compiled JAR run locally from the command line, or 3) a compiled JAR copied into a Docker image, which is deployed as part of the Swarm stack. You can choose any of the options.

# set java version (v17 is latest compatible version with kstreams)
JAVA_HOME=~/Library/Java/JavaVirtualMachines/corretto-17.0.5/Contents/Home
$JAVA_HOME/bin/java -version
# compile to uber jar
./gradlew clean shadowJar
# run the streaming application
$JAVA_HOME/bin/java -jar build/libs/kstreams-kafka-demo-1.0.0-all.jar

Compiling and running the KStreams application locally

We will continue to use the same streaming Docker Swarm stack used for the Apache Spark demonstration. I have already compiled a single uber JAR file using OpenJDK 17 and Gradle from the project’s source code. I then created and published a Docker image, which is already part of the running stack.

FROM amazoncorretto:17.0.5
COPY build/libs/kstreams-kafka-demo-1.1.0-all.jar /tmp/kstreams-app.jar
CMD ["java", "-jar", "/tmp/kstreams-app.jar"]
view raw Dockerfile hosted with ❤ by GitHub
Dockerfile used to build KStreams app Docker image

Since we ran the sales generator earlier for the Spark demonstration, there is existing data in the demo.purchases topic. Re-run the sales generator (nohup python3 ./producer.py &) to generate a new stream of data. View the results of the KStreams application, which has been running since the stack was deployed using the Kafka CLI or UI for Apache Kafka:

# terminal 1: establish an interactive session with the kstreams app container
KSTREAMS_CONTAINER=$(docker container ls –filter name=streaming-stack_kstreams.1 –format "{{.ID}}")
docker logs ${KSTREAMS_CONTAINER} –follow
# terminal 2: establish an interactive session with the kafka container
KAFKA_CONTAINER=$(docker container ls –filter name=streaming-stack_kafka.1 –format "{{.ID}}")
docker exec -it ${KAFKA_CONTAINER} bash
# set environment variables used by jobs
export BOOTSTRAP_SERVERS="localhost:9092"
export INPUT_TOPIC="demo.purchases"
export OUTPUT_TOPIC="demo.running.totals"
# read topics from beginning
kafka-console-consumer.sh \
–topic $INPUT_TOPIC –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS
kafka-console-consumer.sh \
–topic $OUTPUT_TOPIC –from-beginning \
–bootstrap-server $BOOTSTRAP_SERVERS

Below, in the top terminal window, we see the output from the KStreams application. Using KStream’s peek() method, the application outputs Purchase and Total instances to the console as they are processed and written to Kafka. In the lower terminal window, we see new messages being published as a continuous stream to output topic, demo.running.totals.

KStreams application performing stream processing and the resulting output stream

Part Two

In part two of this two-part post, we continue our exploration of the four popular open-source stream processing projects. We will cover Apache Flink and Apache Pinot. In addition, we will incorporate Apache Superset into the demonstration, building a real-time dashboard to visualize the results of our stream processing.

Apache Superset dashboard displaying data from Apache Pinot Realtime table

This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , , , ,

Leave a comment

Lakehouse Data Modeling using dbt, Amazon Redshift, Redshift Spectrum, and AWS Glue

Learn how dbt makes it easy to transform data and materialize models in a modern cloud data lakehouse built on AWS

Introduction

Data lakes have grabbed much of the analytics community’s attention in recent years, thanks to an overabundance of VC-backed analytics startups and marketing dollars. Nonetheless, data warehouses, specifically modern cloud data warehouses, continue to gain market share, led by SnowflakeAmazon RedshiftGoogle Cloud BigQuery, and Microsoft’s Azure Synapse Analytics.

Several factors have fostered the renewed interest and appeal of data warehouses, including the data lakehouse architecture. According to Databricks, “a lakehouse is a new, open architecture that combines the best elements of data lakes and data warehouses. Lakehouses are enabled by a new system design: implementing similar data structures and data management features to those in a data warehouse directly on top of low-cost cloud storage in open formats.” Similarly, Snowflake describes a lakehouse as “a data solution concept that combines elements of the data warehouse with those of the data lake. Data lakehouses implement data warehouses’ data structures and management features for data lakes, which are typically more cost-effective for data storage.

dbt

In the following post, we will explore the use of dbt (data build tool), developed by dbt Labs, to transform data in an AWS-based data lakehouse, built with Amazon Redshift, Redshift Spectrum, AWS Glue, and Amazon S3. According to dbt Labs, “dbt enables analytics engineers to transform data in their warehouses by simply writing select statements. dbt handles turning these select statements into tables and views.” Further, “dbt does the T in ELT (Extract, Load, Transform) processes — it doesn’t extract or load data, but it’s extremely good at transforming data that’s already loaded into your warehouse.

This post’s project, displayed in dbt Cloud

Amazon Redshift

According to AWS, “Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes using AWS-designed hardware and machine learning to deliver the best price-performance at any scale.” AWS claims Amazon Redshift is the most widely used cloud data warehouse.

Amazon Redshift Spectrum

According to AWS, “Redshift Spectrum allows you to efficiently query and retrieve structured and semi-structured data from files in Amazon S3 without having to load the data into Amazon Redshift tables.” Redshift Spectrum tables define the data structure for the files in Amazon S3. The external tables exist in an external data catalog, which can be AWS Glue, the data catalog that comes with Amazon Athena, or an Apache Hive metastore.

dbt can interact with Amazon Redshift Spectrum to create external tables, refresh external table partitions, and access raw data in an Amazon S3-based data lake from the data warehouse. We will use dbt along with the dbt package, dbt_external_tables, to create the external tables in an AWS Glue data catalog.

Prerequisites

Prerequisites to follow along with this post’s demonstration include:

  • Amazon S3 bucket to store raw data;
  • Amazon Redshift or Amazon Redshift Serverless cluster;
  • AWS IAM Role with permissions to Amazon Redshift, Amazon S3, and AWS Glue;
  • dbt Cloud account;
  • dbt CLI (dbt Core) and dbt Amazon Redshift adapter installed locally;
  • Microsoft Visual Studio Code (VS Code) with dbt extensions installed;

The post’s demonstration uses dbt Cloud, VS Code, and the dbt CLI interchangeably with the project’s GitHub repository as a source. Follow along with the demonstration using any or all of these three dbt options.

Example of this project in VS Code with dbt extensions installed

Cost Warning!

Be careful when creating a new, provisioned Amazon Redshift cluster for this demonstration. The suggested default Production cluster with two ra3.4xlarge on-demand compute nodes and AQUA (Redshift’s Advanced Query Accelerator) enabled is estimated at $4,694/month ($3.26/node/hour). For this demonstration, choose the minimum size provisioned Redshift cluster configuration of one dc2.large on-demand compute node, estimated to cost $180/month ($0.25/node/hour). Be sure to delete the cluster when the demonstration is complete.

Creating a new Amazon Redshift cluster

Amazon Redshift Serverless Option

AWS recently announced the general availability (GA) of Amazon Redshift Serverless on July 12, 2022. Amazon Redshift Serverless allows data analysts, developers, and data scientists to run and scale analytics without having to provision and manage data warehouse clusters. dbt is fully compatible with Amazon Redshift Serverless and is an alternative to provisioned Redshift for this demonstration. According to AWS, Amazon Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). You pay for the workloads you run in RPU-hours on a per-second basis (with a 60-second minimum charge), including queries that access data in open file formats in Amazon S3.

Source Code

All the source code demonstrated in this post is open source and available on GitHub.

Sample Data

This demonstration uses the TICKIT sample database provided by AWS and designed for use with Amazon Redshift. This sample database application tracks sales activity for the fictional online TICKIT website, where users buy and sell tickets for sporting events, shows, and concerts. The database consists of seven tables in a star schema: five dimension tables and two fact tables. A clean copy of the raw TICKIT data, formatted as pipe-delimited text files, is included in this GitHub project. Use the following shell commands to copy the raw data to Amazon S3:

# Purpose: Unzip raw data files and copy to s3
# Author: Gary A. Stafford
# Date: 2022-12-29
# sh ./raw_date_to_s3.sh
# ** REPLACE ME! **
s3_bucket="<your_s3_bucket_name>"
pushd raw_tickit_data/
unzip tickit_data.zip
popd
declare -a TableArray=("category" "date" "event" "listing" "sale" "user" "venue")
for table in "${TableArray[@]}"; do
aws s3 cp ./raw_tickit_data/$table.txt s3://$s3_bucket/raw_tickit_data/$table/
done

Prepare Amazon Redshift for dbt

Create New Database

Create a new Redshift database to use for the demonstration, demo.

create new database
create database demo with owner admin;

Create Database Schemas

Within the new Redshift database,demo, create the external schema, tickit_external, and the corresponding external AWS Glue Data Catalogtickit_dbt, using the CREATE EXTERNAL SCHEMA Redshift SQL command. Make sure to update the command to reflect your IAM Role’s ARN. Next, create the schema that will hold our dbt models, tickit_dbt. Lastly, as a security best practice, drop the default public schema.

switch redshift database connection to new demo database before continuing!
create external tables schema and new glue data catalog
create external schema tickit_external
from data catalog
database 'tickit_dbt'
iam_role 'arn:aws:iam::<your_aws_acccount_id>:role/ClusterPermissionsRole'
create external database if not exists;
create schema tickit_dbt;
drop schema public;

From the AWS Glue console, we should observe a new tickit_dbt AWS Glue Data Catalog. The description shown below was manually added after the catalog was created.

Newly created AWS Glue Data Catalog

Create dbt Database User and Group

As a security best practice, create a separate database dbt user and dbt group. We are assigning a completely arbitrary connection limit of ten. Then, apply the grants to allow the dbt group access to the new database and schemas. Lastly, change the two schema’s owners to the dbt.

create dbt user and group
create user dbt with password 'CHANGE_ME!'
nocreatedb nocreateuser syslog access restricted
connection limit 10;
create dbt group
create group dbt with user dbt;
grants on tickit_external schema
grant usage on schema tickit_external to group dbt;
grant create on schema tickit_external to group dbt;
grant all on all tables in schema tickit_external to group dbt;
grants on tickit_dbt schema
grant usage on schema tickit_dbt to group dbt;
grant create on schema tickit_dbt to group dbt;
grant all on all tables in schema tickit_dbt to group dbt;
reassign schema ownership to dbt
alter schema tickit_dbt owner to dbt;
alter schema tickit_external owner to dbt;

Alternately, we could use an IAM Role with a SAML 2.0-compliant IdP.

Initialize and Configure dbt for Redshift

Next, configure your dbt Cloud account and dbt locally with your Amazon Redshift connection information using the dbt init command. On a Mac, this configuration is stored in the /Users/<your_usernama>/.dbt/profiles.yml file. You will need your Redshift cluster host URL, port, database, username, and password. With your local install of dbt, we can use the dbt debug command to confirm the new configuration.

Confirming configuration using dbt debug command

Project Structure

The GitHub project structure follows many of the best practices outlined in dbt Labs’ Best Practice Guide. Data models in the models directory is organized into the recommended stagingintermediate, and marts subdirectories (aka layers).

Project structure for data models

From a data lineage perspective, in this project, the staging layer’s data models depend on the external tables (AWS Glue/Amazon Redshift Spectrum). The intermediate layer’s data models depend on the staging models. The marts layer’s data models depend on staging and intermediate models.

dbt Cloud’s Lineage Graph showing an example of data model relationships

Install dbt Packages

The GitHub project’s packages.yml contains a few commonly recommended packages. The only one required for this post is the dbt-labs/dbt_external_tables package. Make sure your project is referring to the latest version of the package.

# updated 2022-12-29 (dbt=1.3.1 locally and dbt=1.3.1 in dbt cloud)
packages:
package: dbt-labs/dbt_external_tables
version: 0.8.2
package: dbt-labs/codegen
version: 0.9.0
package: dbt-labs/dbt_utils
version: 1.0.0
package: dbt-labs/redshift
version: 0.8.0
view raw packages.yml hosted with ❤ by GitHub

Use the dbt deps command to install the packages locally.

Installing dbt packages

External Tables

The _tickit__sources.yml file in the models/staging/tickit/external_tables/ model’s subdirectory defines the schema and S3 location for each of the seven external TICKIT database tables: category, date, event, listing, sale, user, and venue. You will need to update this file to reflect the name of your Amazon S3 bucket, in seven places.

version: 2
sources:
name: tickit_external
description: Sales activity for the fictional TICKIT web site, where users buy and sell tickets online for sporting events, shows, and concerts.
database: demo
schema: tickit_external
loader: s3
tables:
name: category
description: dimension table – TICKIT categories
external:
location: "s3://<your_s3_bucket_name>/raw_tickit_data/category/"
row_format: >
serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
'separatorChar'='|'
)
table_properties: "('skip.header.line.count'='1')"
columns:
name: catid
data_type: int
description: primary key
tests:
unique
not_null
name: catgroup
data_type: varchar(20)
name: catname
data_type: varchar(20)
name: catdesc
data_type: varchar(50)

Execute the command, dbt run-operation stage_external_sources, to create the seven external tables in the AWS Glue Data Catalog. This command is part of the dbt_external_tables package we installed earlier. It iterates through all source nodes, creates the tables if missing, and refreshes metadata.

Creating external tables in AWS Glue Data Catalog

If we failed to run the previous SQL statements to set schema ownership to the dbt user, the following error will likely occur.

Typical error resulting from incorrect ownership of the external_table schema

Once the command completes, we should observe seven new tables in the AWS Glue Data Catalog.

Seven new tables in the AWS Glue Data Catalog

Examining one of the AWS Glue data catalog tables, we can observe how the configuration in the _tickit__sources.yml file was used to define the table’s properties and schema. Note the Location field indicates where the underlying data is located in our Amazon S3 bucket.

The ‘category’ table in the AWS Glue Data Catalog

Staging Layer

In their best practices guide, dbt describes the staging layer in the following manner: “you can think of the staging layer as condensing and refining this material into the individual atoms we’ll later build more intricate and useful structures with.” The staging data models are the base tables and views we will use to build more complex aggregations and analytics queries in Redshift. The schema.yml file, also in the models/staging/tickit/ model’s subdirectory, defines seven late-binding views, modeled by dbt, to be created in Amazon Redshift.

version: 2
models:
name: stg_tickit__categories
description: Late binding view of external dimension table – TICKIT category
name: stg_tickit__dates
description: Late binding view of external dimension table – TICKIT date
name: stg_tickit__events
description: Late binding view of external dimension table – TICKIT event
name: stg_tickit__listings
description: Late binding view of external fact table – TICKIT listing
name: stg_tickit__sales
description: Late binding view of external fact table – TICKIT sale
name: stg_tickit__users
description: Late binding view of external dimension table – TICKIT user
name: stg_tickit__venues
description: Late binding view of external dimension table – TICKIT venue
view raw schema.yml hosted with ❤ by GitHub

The staging model’s SQL statements also follow many of dbt’s best practices. Below, we see an example of the stg_tickit__sales model (stg_tickit__sales.sql). This model performs a SELECT from the external sale table in the external_table schema. The model performs column renaming and basic calculations.

{{ config(materialized='view', bind=False) }}
with source as (
select * from {{ source('tickit_external', 'sale') }}
),
renamed as (
select
saleid as sale_id,
listid as list_id,
sellerid as seller_id,
buyerid as buyer_id,
eventid as event_id,
dateid as date_id,
qtysold as qty_sold,
round (pricepaid / qtysold, 2) as ticket_price,
pricepaid as price_paid,
round((commission / pricepaid) * 100, 2) as commission_prcnt,
commission,
(pricepaid commission) as earnings,
saletime as sale_time
from
source
where
sale_id IS NOT NULL
order by
sale_id
)
select * from renamed

The the dbt run command, according to dbt, “executes compiled SQL model files against the current target database. dbt connects to the target database and runs the relevant SQL, required to materialize all data models using the specified materialization strategies.” Instead of using the dbt run command to create all the project’s tables and views at once, for now, we are limiting the command to just the models in the ./models/staging/tickit/ directory using the --select optional argument. Execute the dbt run --select staging command to materialize the seven corresponding staging tables in Amazon Redshift.

The Staging layer’s data models successfully materialized in Redshift

Once the command completes, we should observe seven new views in Amazon Redshift demo database’s tickit_dbt schema with the stg_ prefix.

Amazon Redshift Query Editor v2 Console

Selecting from any of the views should return data.

Amazon Redshift Query Editor v2 Console

Late Binding Views

This demonstration uses late binding views for staging and intermediate layer models. According to dbt, “using late-binding views in a production deployment of dbt can vastly improve the availability of data in the warehouse, especially for models that are materialized as late-binding views and are queried by end-users, since they won’t be dropped when upstream models are updated. Additionally, late binding views can be used with external tables via Redshift Spectrum.

Alternatively, we could define the seven staging models as tables instead of late binding views. Once created as tables, the dependent intermediate and marts views will not require a late-binding reference, as in this project.

Intermediate Layer

In their best practices guide, dbt describes the intermediate layer as “purpose-built transformation steps.” Further, “the best guiding principle is to think about verbs (e.g. pivotedaggregated_to_userjoinedfanned_out_by_quanityfunnel_created, etc.) in the intermediate layer.

The project’s intermediate layer consists of two models related to users. The sample TICKIT database lumps all users into a single table. However, for analytics purposes, different user personas might interest marketing teams, such as buyers, sellers, sellers who also buy, and non-buyers (users who have never purchased tickets). The two models in the project’s intermediate layer filter for buyers and for sellers, resulting in two separate views of user personas.

{{ config(materialized='view', bind=False) }}
with sales as (
select * from {{ ref('stg_tickit__sales') }}
),
users as (
select * from {{ ref('stg_tickit__users') }}
),
first_purchase as (
select min(date(sale_time)) as first_purchase_date, buyer_id
from sales
group by buyer_id
),
final as (
select distinct
u.user_id,
u.username,
cast((u.last_name||', '||u.first_name) as varchar(100)) as full_name,
f.first_purchase_date,
u.city,
u.state,
u.email,
u.phone,
u.like_broadway,
u.like_classical,
u.like_concerts,
u.like_jazz,
u.like_musicals,
u.like_opera,
u.like_rock,
u.like_sports,
u.like_theatre,
u.like_vegas
from
sales as s
join users as u on u.user_id = s.buyer_id
join first_purchase as f on f.buyer_id = s.buyer_id
order by
user_id
)
select * from final

To materialize the intermediate layer’s two data models into views, execute the command, dbt run --select intermediate.

The Intermediate layer’s data models successfully materialized in Redshift

Once the command completes, we should observe a total of nine views in Amazon Redshift demo database’s tickit_dbt schema — seven staging and two intermediate, identified with the int_ prefix.

Amazon Redshift Query Editor v2 Console

Marts Layer

In their best practices guide, dbt describes the marts layer as “business defined entities.” Further, “this is the layer where everything comes together and we start to arrange all of our atoms (staging models) and molecules (intermediate models) into full-fledged cells that have identity and purpose. We sometimes like to call this the entity layer or concept layer, to emphasize that all our marts are meant to represent a specific entity or concept at its unique grain.

The project’s marts layer consists of four data models across marketing and sales. The models are materialized as two dimension tables and two fact tables. Although it is common practice to describe and label these as traditional star schema dimension (dim_) or fact (fct_) tables, in reality, the fact tables in this demonstration are actually flat, de-normalized, wide tables. Wide tables generally have better analytics performance in a modern data warehouse, according to Fivetran and others.

{{ config(materialized='table', sort='sale_id', dist='sale_id') }}
with categories as (
select * from {{ ref('stg_tickit__categories') }}
),
dates as (
select * from {{ ref('stg_tickit__dates') }}
),
events as (
select * from {{ ref('stg_tickit__events') }}
),
listings as (
select * from {{ ref('stg_tickit__listings') }}
),
sales as (
select * from {{ ref('stg_tickit__sales') }}
),
sellers as (
select * from {{ ref('int_sellers_extracted_from_users') }}
),
buyers as (
select * from {{ ref('int_buyers_extracted_from_users') }}
),
event_categories as (
select
e.event_id,
e.event_name,
c.cat_group,
c.cat_name
from events as e
join categories as c on c.cat_id = e.cat_id
),
final as (
select
s.sale_id,
s.sale_time,
d.qtr,
ec.cat_group,
ec.cat_name,
ec.event_name,
b.username as buyer_username,
b.full_name as buyer_name,
b.state as buyer_state,
b.first_purchase_date as buyer_first_purchase_date,
se.username as seller_username,
se.full_name as seller_name,
se.state as seller_state,
se.first_sale_date as seller_first_sale_date,
s.ticket_price,
s.qty_sold,
s.price_paid,
s.commission_prcnt,
s.commission,
s.earnings
from
sales as s
join listings as l on l.list_id = s.list_id
join buyers as b on b.user_id = s.buyer_id
join sellers as se on se.user_id = s.seller_id
join event_categories as ec on ec.event_id = s.event_id
join dates as d on d.date_id = s.date_id
order by
sale_id
)
select * from final
view raw fct_sales.sql hosted with ❤ by GitHub

The marts layer’s models take various dependencies through joins on staging and intermediate models. The data model above, fct_sales, has dependencies on multiple staging and intermediate models.

Lineage graph (dependency graph) of a data model, displayed in dbt Cloud

To materialize the marts layer’s four data models into tables, execute the command, dbt run --select marts.

The Marts layer’s data models successfully materialized in Redshift as tables

Once the command completes, we should observe four tables and nine views in the Redshift demo database’s tickit_dbt schema. Note how the dbt model for fct_sales (shown above), with its Jinja templating and multiple CTEs have been compiled into the resulting table in Redshift, this is the real magic of dbt!

Amazon Redshift Query Editor v2 Console showing fct_sales table

At this point, all of the project’s models have been compiled and created in the Redshift demo database by dbt.

Analyses

The demonstration’s project also contains example analyses. dbt allows us to version control more analytical-oriented SQL files within our dbt project using the analyses functionality of dbt. These analyses do not fit the fairly opinionated dbt model definition. We can compile the analyses SQL file using the dbt compile command, then copy and paste the resulting SQL statements from the target/compiled/ subdirectory into our data warehouse’s query tool of choice.

Analysis shown in dbt Cloud interface
Compiled analysis SQL executed in Amazon Redshift

Project Documentation

Using the dbt docs generate command will automatically generate the project’s documentation website from the SQL and YAML files. Documentations can be generated and displayed from your dbt Cloud account or hosted locally.

Project’s documentation website

Testing

According to dbt, “Tests are assertions you make about your models and other resources in your dbt project (e.g. sources, seeds, and snapshots). When you run dbt test, dbt will tell you if each test in your project passes or fails.” The project contains over 50 tests, split between the _tickit__sources.yml file and individual tests in the test/ directory. Typical dbt tests check for non-null and unique values, values within an expected numeric range, and values from a known list of strings. Any SELECT statement written in SQL can be tested.

name: user
description: dimension table – TICKIT users
columns:
name: userid
data_type: int
description: primary key
tests:
unique
not_null
name: username
data_type: char(8)
tests:
unique
not_null
name: firstname
data_type: varchar(30)
tests:
not_null
Snippet of tests in the _tickit__sources.sql file
all prices paid for tickets should be a positive value of $1 or greater
there are no credits (negative values)
select price_paid
from {{ ref('stg_tickit__sales') }}
group by price_paid
having not (price_paid >= 1)

Execute the project’s tests using the dbt test command. We can execute individual tests using the --select optional argument, for example, dbt test --select assert_all_sale_amounts_are_positive. We can also use the --threads optional argument with most dbt commands, including dbt test, increasing parallelism and reducing execution time. The example below uses 10 threads, the arbitrary maximum configured for the Amazon Redshift dbt user.

Running dbt tests
Successful test run

Jobs

According to dbt, Jobs are a set of dbt commands that you want to run on a schedule. For example, dbt run and dbt test. Jobs can load packages, run tests, materialize models, check source freshness (dbt source freshness), and regenerate documentation. Below, we have created a daily job to test, refresh, and document our project as the data is updated in the data lake.

dbt Cloud’s Job Run Overview

Notifications

According to dbt, Setting up notifications in dbt Cloud will allow you to receive alerts via Email or a chosen Slack channel when a job run succeeds, fails, or is canceled.

dbt Cloud’s Notifications interface for configuring email and Slack

The Slack notifications include run status, timings, and a link to open the job in dbt Cloud. Below, we see a notification regarding our project’s daily job run.

Notification of successful job run in Slack from dbt Cloud

Exposures

Exposures are a recent addition to dbt. Exposures make it possible to define and describe a downstream use of our dbt project, such as in a dashboard, application, or data science pipeline. Below we see an example of an exposure describing a sales dashboard created in Amazon QuickSight.

version: 2
exposures:
name: tickit_sales_summary
type: dashboard
maturity: medium
url: https://us-east-1.quicksight.aws.amazon.com
description: >
TICKIT sales summary dashboard, authored in Amazon QuickSight
depends_on:
ref('fct_sales')
ref('fct_listings')
owner:
name: Gary A. Stafford
email: gary.a.stafford@gmail.com
view raw dashboard.yml hosted with ❤ by GitHub

The exposure YAML file shown above describes the Amazon QuickSight dashboard shown below.

Amazon QuickSight dashboard

Exposures work with dbt’s auto-documentation feature. dbt populates a dedicated page in the auto-generated documentation site with context relevant to data consumers.

Project’s documentation website, showing dashboard exposure
Project’s documentation website, showing dashboard exposure’s lineage graph

Conclusion

In this post, we covered some of the basic functionality of dbt. We learned how dbt enables analysts to work more like software engineers. We also learned how dbt makes it easy to codify data models in SQL, to version control and manage data models as code with git, and collaborate on data models with other data team members.

Topics not explored in this post but critical to most large-scale dbt-managed production environments include advanced Jinja templating and macrosmodel freshnessorchestrationjob schedulingContinuous Integration and GitOpsnotificationsenvironment variables, and incremental models. We will explore these additional dbt capabilities in future posts.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , ,

Leave a comment

Serverless Analytics on AWS: Getting Started with Amazon EMR Serverless and Amazon MSK Serverless

Utilizing the recently released Amazon EMR Serverless and Amazon MSK Serverless for batch and streaming analytics with Apache Spark and Apache Kafka

Introduction

Amazon EMR Serverless

AWS recently announced the general availability (GA) of Amazon EMR Serverless on June 1, 2022. EMR Serverless is a new serverless deployment option in Amazon EMR, in addition to EMR on EC2EMR on EKS, and EMR on AWS Outposts. EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. According to AWS, with EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks.

Amazon MSK Serverless

Similarly, on April 28, 2022, AWS announced the general availability of Amazon MSK Serverless. According to AWS, Amazon MSK Serverless is a cluster type for Amazon MSK that makes it easy to run Apache Kafka without managing and scaling cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources, so you can use Apache Kafka on demand and only pay for the data you stream and retain.

Serverless Analytics

In the following post, we will learn how to use these two new, powerful, cost-effective, and easy-to-operate serverless technologies to perform batch and streaming analytics. The PySpark examples used in this post are similar to those featured in two earlier posts, which featured non-serverless alternatives Amazon EMR on EC2 and Amazon MSK: Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR and Stream Processing with Apache Spark, Kafka, Avro, and Apicurio Registry on AWS using Amazon MSK and EMR.

Source Code

All the source code demonstrated in this post is open-source and available on GitHub.

git clone --depth 1 -b main \
https://github.com/garystafford/emr-msk-serverless-demo.git

Architecture

The post’s high-level architecture consists of an Amazon EMR Serverless Application, Amazon MSK Serverless Cluster, and Amazon EC2 Kafka client instance. To support these three resources, we will need two Amazon Virtual Private Clouds (VPCs), a minimum of three subnets, an AWS Internet Gateway (IGW) or equivalent, an Amazon S3 Bucket, multiple AWS Identity and Access Management (IAM) Roles and Policies, Security Groups, and Route Tables, and a VPC Gateway Endpoint for S3. All resources are constrained to a single AWS account and a single AWS Region, us-east-1.

High-level AWS serverless analytics architecture used in this post

Prerequisites

As a prerequisite for this post, you will need to create the following resources:

  1. (1) Amazon EMR Serverless Application;
  2. (1) Amazon MSK Serverless Cluster;
  3. (1) Amazon S3 Bucket;
  4. (1) VPC Endpoint for S3;
  5. (3) Apache Kafka topics;
  6. PySpark applications, related JAR dependencies, and sample data files uploaded to Amazon S3 Bucket;

Let’s walk through each of these prerequisites.

Amazon EMR Serverless Application

Before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon EMR Serverless, especially, What is Amazon EMR Serverless? Create a new EMR Serverless Application by following the AWS documentation, Getting started with Amazon EMR Serverless. The creation of the EMR Serverless Application includes the following resources:

  1. Amazon S3 bucket for storage of Spark resources;
  2. Amazon VPC with at least two private subnets and associated Security Group(s);
  3. EMR Serverless runtime AWS IAM Role and associated IAM Policy;
  4. Amazon EMR Serverless Application;

For this post, use the latest version of EMR available in the EMR Studio Serverless Application console, the newly released version 6.7.0, to create a Spark application.

EMR Studio Serverless Application creation console

Keep the default pre-initialized capacity, application limits, and application behavior settings.

EMR Studio Serverless Application creation console

Since we are connecting to MSK Serverless from EMR Serverless, we need to configure VPC access. Select the new VPC and at least two private subnets in different Availability Zones (AZs).

EMR Studio Serverless Application creation console

According to the documentation, the subnets selected for EMR Serverless must be private subnets. The associated route tables for the subnets should not contain direct routes to the Internet.

Error resulting from trying to associate a public subnet with EMR Serverless
EMR Studio Serverless Application details console showing new Application

Amazon MSK Serverless Cluster

Similarly, before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon MSK Serverless, especially MSK Serverless. Create a new MSK Serverless Cluster by following the AWS documentation, Getting started using MSK Serverless clusters. The creation of the MSK Serverless Cluster includes the following resources:

  1. AWS IAM Role and associated IAM Policy for the Amazon EC2 Kafka client instance;
  2. VPC with at least one public subnet and associated Security Group(s);
  3. Amazon EC2 instance used as Apache Kafka client, provisioned in the public subnet of the above VPC;
  4. Amazon MSK Serverless Cluster;
Amazon MSK Serverless Create cluster console

Associate the new MSK Serverless Cluster with the EMR Serverless Application’s VPC and two private subnets. Also, associate the cluster with the EC2-based Kafka client instance’s VPC and its public subnet.

Amazon MSK Serverless Create cluster console — VPC 1
Amazon MSK Serverless Create cluster console — VPC 2

According to the AWS documentation, Amazon MSK does not support all AZs. For example, I tried to use a subnet in us-east-1e threw an error. If this happens, choose an alternative AZ.

Error resulting from using an unsupported AZ
Successfully created Amazon MSK Serverless Cluster

VPC Endpoint for S3

To access the Spark resource in Amazon S3 from EMR Serverless running in the two private subnets, we need a VPC Endpoint for S3. Specifically, a Gateway Endpoint, which sends traffic to Amazon S3 or DynamoDB using private IP addresses. A gateway endpoint for Amazon S3 enables you to use private IP addresses to access Amazon S3 without exposure to the public Internet. EMR Serverless does not require public IP addresses, and you don’t need an internet gateway (IGW), a NAT device, or a virtual private gateway in your VPC to connect to S3.

VPC Endpoint for S3 associated with route table for private subnets

Create the VPC Endpoint for S3 (Gateway Endpoint) and add the route table for the two EMR Serverless private subnets. You can add additional routes to that route table, such as VPC peering connections to data sources such as Amazon Redshift or Amazon RDS. However, do not add routes that provide direct Internet access.

Route table for private subnets showing VPC Endpoint to S3 route (first route shown)

Kafka Topics and Sample Messages

Once the MSK Serverless Cluster and EC2-based Kafka client instance are provisioned and running, create the three required Kafka topics using the EC2-based Kafka client instance. I recommend using AWS Systems Manager Session Manager to connect to the client instance as the ec2-user user. Session Manager provides secure and auditable node management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys. Alternatively, you can SSH into the client instance.

Before creating the topics, use a utility like telnet to confirm connectivity between the Kafka client and the MSK Serverless Cluster. Verifying connectivity will save you a lot of frustration with potential security and networking issues.

sudo yum install telnet -y
telnet <your_bootstrap_server_host> 9098
# > Trying 192.168.XX.XX…
# > Connected to boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com.
# > Escape character is '^]'.

With MSK Serverless Cluster connectivity confirmed, create the three Kafka topics: topicAtopicB, and topicC. I am using the default partitioning and replication settings from the AWS Getting Started Tutorial.

cd kafka_2.12-2.8.1
# *** CHANGE ME ***
export BOOTSTRAP_SERVER=<your_bootstrap_server> # e.g., boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com:9098
bin/kafka-topics.sh –create –topic topicA \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicB \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicC \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
# list topics to confirm creation
bin/kafka-topics.sh –list \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties

To create some quick sample data, we will copy and paste 250 messages from a file included in the GitHub project, sample_data/sales_messages.txt, into topicA. The messages are simple mock sales transactions.

{"payment_id":16940,"customer_id":130,"amount":5.99,"payment_date":"2021-05-08 21:21:56.996577 +00:00","city":"guas Lindas de Gois","district":"Gois","country":"Brazil"}
{"payment_id":16406,"customer_id":459,"amount":5.99,"payment_date":"2021-05-08 21:22:59.996577 +00:00","city":"Qomsheh","district":"Esfahan","country":"Iran"}
{"payment_id":16315,"customer_id":408,"amount":6.99,"payment_date":"2021-05-08 21:32:05.996577 +00:00","city":"Jaffna","district":"Northern","country":"Sri Lanka"}
{"payment_id":16185,"customer_id":333,"amount":7.99,"payment_date":"2021-05-08 21:33:07.996577 +00:00","city":"Baku","district":"Baki","country":"Azerbaijan"}
{"payment_id":17097,"customer_id":222,"amount":9.99,"payment_date":"2021-05-08 21:33:47.996577 +00:00","city":"Jaroslavl","district":"Jaroslavl","country":"Russian Federation"}
{"payment_id":16579,"customer_id":549,"amount":3.99,"payment_date":"2021-05-08 21:36:33.996577 +00:00","city":"Santiago de Compostela","district":"Galicia","country":"Spain"}
{"payment_id":16050,"customer_id":269,"amount":4.99,"payment_date":"2021-05-08 21:40:19.996577 +00:00","city":"Salinas","district":"California","country":"United States"}
{"payment_id":17126,"customer_id":239,"amount":7.99,"payment_date":"2021-05-08 22:00:12.996577 +00:00","city":"Ciomas","district":"West Java","country":"Indonesia"}
{"payment_id":16933,"customer_id":126,"amount":7.99,"payment_date":"2021-05-08 22:29:06.996577 +00:00","city":"Po","district":"So Paulo","country":"Brazil"}
{"payment_id":16297,"customer_id":399,"amount":8.99,"payment_date":"2021-05-08 22:30:47.996577 +00:00","city":"Okara","district":"Punjab","country":"Pakistan"}

Use the kafka-console-producer Shell script to publish the messages to the Kafka topic. Use the kafka-console-consumer Shell script to validate the messages made it to the topic by consuming a few messages.

bin/kafka-console-producer.sh \
–topic topicA \
–bootstrap-server $BOOTSTRAP_SERVER \
–producer.config config/client.properties
# copy and paste contents of 'sales_messages.txt' and then Ctrl+C to exit
# check for messages in topic
bin/kafka-console-consumer.sh \
–topic topicA \
–from-beginning –max-messages 5 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVER \
–consumer.config config/client.properties

The output should look similar to the following example.

Sample message output from Kafka topic

Spark Resources in Amazon S3

To submit and run the five Spark Jobs included in the project, you will need to copy the following resources to your Amazon S3 bucket: (5) Apache Spark jobs, (5) related JAR dependencies, and (2) sample data files.

PySpark Applications
To start, copy the five PySpark applications to a scripts/ subdirectory within your Amazon S3 bucket.

PySpark applications uploaded to the Amazon S3 bucket

Sample Data
Next, copy the two sample data files to a sample_data/ subdirectory within your Amazon S3 bucket. The large file contains 2,000 messages, while the small file contains 600 messages. These two files can be used interchangeably with the post’s final streaming example.

Sample sales data uploaded to the Amazon S3 bucket

PySpark Dependencies
Lastly, the PySpark applications have a handful of JAR dependencies that must be available when the job runs, which are not on the EMR Serverless classpath by default. If you are unsure which JARs are already on the EMR Serverless classpath, you can check the Spark UI’s Environment tab’s Classpath Entries section. Accessing the Spark UI is demonstrated in the first PySpark application example, below.

Spark UI’s Environment tab showing Classpath Entries

It is critical to choose the correct version of each JAR dependency based on the version of libraries used with the EMR and MSK. Using the wrong version or inconsistent versions, especially Scala, can result in job failures. Specifically, we are targeting Spark 3.2.1 and Scala 2.12 (EMR v6.7.0: Amazon’s Spark 3.2.1, Scala 2.12.15, Amazon Corretto 8 version of OpenJDK), and Apache Kafka 2.8.1 (MSK Serverless: Kafka 2.8.1).

Download the seven JAR files locally, then copy them to a jars/ subdirectory within your Amazon S3 bucket.

Dependency JARs uploaded to the Amazon S3 bucket

PySpark Applications Examples

With the EMR Serverless Application, MSK Serverless Cluster, Kafka topics, and sample data created, and the Spark resources uploaded to Amazon S3, we are ready to explore four different Spark examples.

Example 1: Kafka Batch Aggregation to the Console

The first PySpark application, 01_example_console.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to the console (stdout).

There are no hard-coded values in any of the PySpark application examples. All required environment-specific variables, such as your MSK Serverless bootstrap server (host and port) and Amazon S3 bucket name, will be passed to the running Spark jobs as arguments from the spark-submit command.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to the console (stdout)
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("01-example-console") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your first PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. You will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 01-example-console \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/01_example_console.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see the new Spark job you just submitted in one of several job states.

EMR Studio Serverless Application details console

You can click on the Spark job to get more details. Note the Script arguments and Spark properties passed in from the spark-submit command.

EMR Studio Serverless Application details Job details view

From the Spark job details tab, access the Spark UI, aka Spark Web UI, from a button in the upper right corner of the screen. If you have experience with Spark, you are most likely familiar with the Spark Web UI to monitor and tune Spark jobs.

Spark History Server UI

From the initial screen, the Spark History Server tab, click on the App ID. You can access an enormous amount of Spark-related information about your job and EMR environment from the Spark Web UI.

Spark UI’s Stages tab
Spark UI’s Stages tab showing a Directed acyclic graph (DAG) Visualization
Spark UI’s Environment tab showing environment variables, including versions of Spark, Java, and Scala

The Executors tab will give you access to the Spark job’s output. The output we are most interested in is the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI ‘s Executors tab

The stderr contains output related to the running Spark job. Below we see an example of Kafka consumer configuration values output to stderr. Several of these values were passed in from the Spark job, including items such as kafka.bootstrap.serverssecurity.protocolsasl.mechanism, and sasl.jaas.config.

driver executor’s stderr output to the console

The stdout from the driver executor contains the console output as directed from the Spark job. Below we see the successfully aggregated results of the first Spark job, output to stdout.

+——————+——+——+
|country |sales |orders|
+——————+——+——+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+——————+——+——+
only showing top 25 rows

Example 2: Kafka Batch Aggregation to CSV in S3

Although the console is useful for development and debugging, it is typically not used in Production. Instead, Spark typically sends results to S3 as CSV, JSON, Parquet, or Arvo formatted files, to Kafka, to a database, or to an API endpoint. The second PySpark application, 02_example_csv_s3.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a CSV file in Amazon S3.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to CSV file in Amazon S3
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("02-example-csv-s3") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.csv(path=f"s3a://{args.s3_bucket}/output/", header=True, sep="|")
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your second PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first example, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 02-example-csv-s3 \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/02_example_csv_s3.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

If successful, the Spark job should create a single CSV file in the designated Amazon S3 key (directory path) and an empty _SUCCESS indicator file. The presence of an empty _SUCCESS file signifies that the save() operation completed normally.

Amazon S3 bucket showing CSV file output by Spark job

Below we see the expected pipe-delimited output from the second Spark job.

country|sales|orders
India|138.80|20
China|133.80|20
Mexico|106.86|14
Japan|100.86|14
Brazil|96.87|13
Russian Federation|94.87|13
United States|92.86|14
Nigeria|58.93|7
Philippines|58.92|8
South Africa|46.94|6
Argentina|42.93|7
Germany|39.96|4
Indonesia|38.95|5
Italy|35.95|5
Iran|33.95|5

Example 3: Kafka Batch Aggregation to Kafka

The third PySpark application, 03_example_kafka.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a second Kafka topic, topicB. This job now has both read and write options.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to topicB
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("03-example-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"topic":
args.write_topic,
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.select(F.to_json(F.struct("*"))).toDF("value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
parser.add_argument("–write_topic", default="topicB", required=False, help="Kafka topic to write to")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your next PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first two examples, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 03-example-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/03_example_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Once the job completes, you can confirm the results by returning to your EC2-based Kafka client. Use the same kafka-console-consumer command you used previously to show messages from topicB.

bin/kafka-console-consumer.sh \
–topic topicB \
–from-beginning –max-messages 10 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVERS \
–consumer.config config/client.properties

If the Spark job and the Kafka client command worked successfully, you should see aggregated messages similar to the example output below. Note we are not using keys with the Kafka messages, only values for these simple examples.

Aggregated messages from Kafka topic

Example 4: Spark Structured Streaming

For our final example, we will switch from batch to streaming — from read to readstream and from write to writestream. Before continuing, I suggest reading the Structured Streaming Programming Guide.

In this example, we will demonstrate how to continuously measure a common business metric — real-time sales volumes. Imagine you are sell products globally and want to understand the relationship between the time of day and buying patterns in different geographic regions in real-time. For any given window of time — this 15-minute period, this hour, this day, or this week— you want to know the current sales volumes by country. You are not reviewing previous sales periods or examing running sales totals, but real-time sales during a sliding time window.

We will use two PySpark jobs running concurrently to simulate this metric. The first application, 04_stream_sales_to_kafka.py, simulates streaming data by continuously writing messages to topicC — 2,000 messages with a 0.5-second delay between messages. In my tests, the job ran for ~28–29 minutes.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Write messages from a CSV file to Kafka topicC
# to simulate real-time streaming sales data
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import time
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("04-stream-sales-to-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = read_from_csv(spark, schema, args)
df_sales.cache()
write_to_kafka(spark, df_sales, args)
def read_from_csv(spark, schema, args):
df_sales = spark.read \
.csv(path=f"s3a://{args.s3_bucket}/sample_data/{args.sample_data_file}",
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"topic":
args.write_topic,
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp())
df_message \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
df_message.show(1)
time.sleep(args.message_delay)
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–write_topic", default="topicC", required=False, help="Kafka topic to write to")
parser.add_argument("–sample_data_file", default="sales_incremental_large.csv", required=False, help="data file")
parser.add_argument("–message_delay", default=0.5, required=False, help="message publishing delay")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

Simultaneously, the PySpark application, 05_streaming_kafka.py, continuously consumes the sales transaction messages from the same topic, topicC. Then, Spark aggregates messages over a sliding event-time window and writes the results to the console.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads stream of messages from Kafka topicC and
# writes stream of aggregations over sliding event-time window to console (stdout)
# References: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("05-streaming-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
.select("data.*", "timestamp") \
.withWatermark("timestamp", "10 minutes") \
.groupBy("country",
F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.sum("amount"), F.count("amount")) \
.orderBy(F.col("window").desc(),
F.col("sum(amount)").desc()) \
.select("country",
F.format_number("sum(amount)", 2).alias("sales"),
F.format_number("count(amount)", 0).alias("orders"),
"window.start", "window.end") \
.coalesce(1) \
.writeStream \
.queryName("streaming_to_console") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 10) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicC", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit the two PySpark jobs to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Again, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

# run 04 and 05 simultaneously
aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 04-stream-sales-to-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/04_stream_sales_to_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'
aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 05-streaming-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/05_streaming_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see both Spark jobs you just submitted in one of several job states.

EMR Studio Serverless Application details console

Using the Spark UI again, we can review the output from the second job, 05_streaming_kafka.py.

Spark UI’s Jobs tab

With Spark Structured Streaming jobs, we have an extra tab in the Spark UI, Structured Streaming. This tab displays all running jobs with their latest [micro]batch number, the aggregate rate of data arriving, and the aggregate rate at which Spark is processing data. Unfortunately, with MSK Serverless, AWS doesn’t appear to allow access to the detailed streaming query statistics via the Run ID, which greatly reduces its value. You receive a 502 error when clicking on the Run ID hyperlink.

Spark UI’s Structured Streaming tab

The output we are most interested in, again, is contained in the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI’s Executors tab

Below we see sample output from stderr. The output shows the results of a micro-batch. According to the Apache Spark documentation, internally, by default, Structured Streaming queries are processed using a micro-batch processing engine. The engine processes data streams as a series of small batch jobs, achieving end-to-end latencies as low as 100ms and exactly-once fault-tolerance guarantees.

22/07/25 14:29:04 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-1b3adb9c-766a-4aec-97a9-decfd7be10e7/commits/.10.b017bcdc-f142-4b28-8891-d3f2d471b740.tmp to file:/tmp/temporary-1b3adb9c-766a-4aec-97a9-decfd7be10e7/commits/10
22/07/25 14:29:04 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "bec9640f-ac16-4d00-bd4a-ed8d29b5f768",
"runId" : "a2e00c3a-924c-4728-b25a-56ee855da7da",
"name" : "streaming_to_console",
"timestamp" : "2022-07-25T14:29:00.000Z",
"batchId" : 10,
"numInputRows" : 73,
"inputRowsPerSecond" : 1.2166666666666666,
"processedRowsPerSecond" : 16.375056078959172,
"durationMs" : {
"addBatch" : 4359,
"getBatch" : 1,
"latestOffset" : 5,
"queryPlanning" : 37,
"triggerExecution" : 4458,
"walCommit" : 27
},
"eventTime" : {
"avg" : "2022-07-25T14:28:29.947Z",
"max" : "2022-07-25T14:28:59.290Z",
"min" : "2022-07-25T14:28:00.559Z",
"watermark" : "2022-07-25T14:17:59.737Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 476,
"numRowsUpdated" : 132,
"allUpdatesTimeMs" : 319,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 14173,
"memoryUsedBytes" : 262776,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 400,
"numStateStoreInstances" : 400,
"customMetrics" : {
"loadedMapCacheHitCount" : 15600,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 154208
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topicC]]",
"startOffset" : {
"topicC" : {
"2" : 101,
"5" : 96,
"4" : 88,
"1" : 84,
"3" : 112,
"0" : 100
}
},
"endOffset" : {
"topicC" : {
"2" : 114,
"5" : 110,
"4" : 98,
"1" : 98,
"3" : 124,
"0" : 110
}
},
"latestOffset" : {
"topicC" : {
"2" : 114,
"5" : 110,
"4" : 98,
"1" : 98,
"3" : 124,
"0" : 110
}
},
"numInputRows" : 73,
"inputRowsPerSecond" : 1.2166666666666666,
"processedRowsPerSecond" : 16.375056078959172,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@52e344bf",
"numOutputRows" : 238
}
}
view raw microbatch.txt hosted with ❤ by GitHub
Example of a Spark Structured Streaming MicroBatch output

The corresponding output to the micro-batch output above is shown below. We see the initial micro-batch results, starting with the first micro-batch before any messages are streamed to topicC.

——————————————-
Batch: 0
——————————————-
+——-+—–+——+—–+—+
|country|sales|orders|start|end|
+——-+—–+——+—–+—+
+——-+—–+——+—–+—+
——————————————-
Batch: 1
——————————————-
+———-+—–+——+——————-+——————-+
|country |sales|orders|start |end |
+———-+—–+——+——————-+——————-+
|Azerbaijan|7.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Sri Lanka |6.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Iran |5.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Brazil |5.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Azerbaijan|7.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Sri Lanka |6.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Brazil |5.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Iran |5.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
+———-+—–+——+——————-+——————-+
——————————————-
Batch: 2
——————————————-
+——————+—–+——+——————-+——————-+
|country |sales|orders|start |end |
+——————+—–+——+——————-+——————-+
|Russian Federation|43.94|6 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|China |37.94|6 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Mexico |34.96|4 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|India |33.95|5 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|United States |26.96|4 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Philippines |22.97|3 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Nigeria |22.97|3 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Iran |14.98|2 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Vietnam |13.98|2 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|United Kingdom |11.99|1 |2022-07-25 14:20:00|2022-07-25 14:30:00|
+——————+—–+——+——————-+——————-+
only showing top 10 rows
Example of a Spark Structured Streaming MicroBatch results to console

If you are familiar with Spark Structured Streaming, you are likely aware that these Spark jobs run continuously. In other words, the streaming jobs will not stop; they continu