Posts Tagged AWS

Data Preparation on AWS: Comparing Available ELT Options to Cleanse and Normalize Data

Comparing the features and performance of different AWS analytics services for Extract, Load, Transform (ELT)

Introduction

According to Wikipedia, “Extract, load, transform (ELT) is an alternative to extract, transform, load (ETL) used with data lake implementations. In contrast to ETL, in ELT models the data is not transformed on entry to the data lake but stored in its original raw format. This enables faster loading times. However, ELT requires sufficient processing power within the data processing engine to carry out the transformation on demand, to return the results in a timely manner.

As capital investments and customer demand continue to drive the growth of the cloud-based analytics market, the choice of tools seems endless, and that can be a problem. Customers face a constant barrage of commercial and open-source tools for their batch, streaming, and interactive exploratory data analytics needs. The major Cloud Service Providers (CSPs) have even grown to a point where they now offer multiple services to accomplish similar analytics tasks.

This post will examine the choice of analytics services available on AWS capable of performing ELT. Specifically, this post will compare the features and performance of AWS Glue Studio, Amazon Glue DataBrew, Amazon Athena, and Amazon EMR using multiple ELT use cases and service configurations.

Data pipeline architecture showing a choice of AWS ELT services

Analytics Use Case

We will address a simple yet common analytics challenge for this comparison — preparing a nightly data feed for analysis the next day. Each night a batch of approximately 1.2 GB of raw CSV-format healthcare data will be exported from a Patient Administration System (PAS) and uploaded to Amazon S3. The data must be cleansed, deduplicated, refined, normalized, and made available to the Data Science team the following morning. The team of Data Scientists will perform complex data analytics on the data and build machine learning models designed for early disease detection and prevention.

Sample Dataset

The dataset used for this comparison is generated by Synthea, an open-source patient population simulation. The high-quality, synthetic, realistic patient data and associated health records cover every aspect of healthcare. The dataset contains the patient-related healthcare history for allergies, care plans, conditions, devices, encounters, imaging studies, immunizations, medications, observations, organizations, patients, payers, procedures, providers, and supplies.

The Synthea dataset was first introduced in my March 2021 post examining the handling of sensitive PII data using Amazon Macie: Data Lakes: Discovery, Security, and Privacy of Sensitive Data.

The Synthea synthetic patient data is available in different record volumes and various data formats, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Since this post seeks to measure the performance of different AWS ELT-capable services, we will use a larger version of the Synthea dataset containing hundreds of thousands to millions of records.

AWS Glue Data Catalog

The dataset comprises nine uncompressed CSV files uploaded to Amazon S3 and cataloged to an AWS Glue Data Catalog, a persistent metadata store, using an AWS Glue Crawler.

Raw Synthea CSV data, in S3, cataloged in AWS Glue Data Catalog

Test Cases

We will use three data preparation test cases based on the Synthea dataset to examine the different AWS ELT-capable services.

Specifications for three different test cases

Test Case 1: Encounters for Symptom

An encounter is a health care contact between the patient and the provider responsible for diagnosing and treating the patient. In our first test case, we will process 1.26M encounters records for an ongoing study of patient symptoms by our Data Science team.

id date patient code description reasoncode reasondescription
714fd61a-f9fd-43ff-87b9-3cc45a3f1e53 2014-01-09 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
23e07532-8b96-4d05-b14e-d4c5a5288ed2 2014-08-18 33f33990-ae8b-4be8-938f-e47ad473abfe 185349003 Outpatient Encounter
45044100-aaba-4209-8ad1-15383c76842d 2015-07-12 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 36971009 Sinusitis (disorder)
ffdddbfb-35e8-4a74-a801-89e97feed2f3 2014-08-12 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
352d1693-591a-4615-9b1b-f145648f49cc 2016-05-25 36d131ee-dd5b-4acb-acbe-19961c32c099 185349003 Outpatient Encounter
4620bd2f-8010-46a9-82ab-8f25eb621c37 2016-10-07 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 195662009 Acute viral pharyngitis (disorder)
815494d8-2570-4918-a8de-fd4000d8100f 2010-08-02 660bec03-9e58-47f2-98b9-2f1c564f3838 698314001 Consultation for treatment
67ec5c2d-f41e-4538-adbe-8c06c71ddc35 2010-11-22 660bec03-9e58-47f2-98b9-2f1c564f3838 170258001 Outpatient Encounter
dbe481ce-b961-4f43-ac0a-07fa8cfa8bdd 2012-11-21 660bec03-9e58-47f2-98b9-2f1c564f3838 50849002 Emergency room admission
b5f1ab7e-5e67-4070-bcf0-52451eb20551 2013-12-04 660bec03-9e58-47f2-98b9-2f1c564f3838 185345009 Encounter for symptom 10509002 Acute bronchitis (disorder)
view raw encounters.csv hosted with ❤ by GitHub
Sample of raw encounters data

Data preparation includes the following steps:

  1. Load 1.26M encounter records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Select only the records where the description column contains “Encounter for symptom.”
  4. Remove any rows with an empty reasoncodes column.
  5. Extract a new year, month, and day column from the date column.
  6. Remove the date column.
  7. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  8. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  9. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 2: Observations

Clinical observations ensure that treatment plans are up-to-date and correctly administered and allow healthcare staff to carry out timely and regular bedside assessments. We will process 5.38M encounters records for our Data Science team in our second test case.

date patient encounter code description value units
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8302-2 Body Height 175.76 cm
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 29463-7 Body Weight 56.51 kg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 39156-5 Body Mass Index 18.29 kg/m2
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8480-6 Systolic Blood Pressure 119.0 mmHg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8462-4 Diastolic Blood Pressure 77.0 mmHg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8302-2 Body Height 177.25 cm
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 29463-7 Body Weight 59.87 kg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 39156-5 Body Mass Index 19.05 kg/m2
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8480-6 Systolic Blood Pressure 113.0 mmHg
2012-03-26 36d131ee-dd5b-4acb-acbe-19961c32c099 296a1fd4-56de-451c-a5fe-b50f9a18472d 8302-2 Body Height 174.17 cm
Sample of raw observations data

Data preparation includes the following steps:

  1. Load 5.38M observation records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Extract a new year, month, and day column from the date column.
  4. Remove the date column.
  5. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  6. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  7. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 3: Sinusitis Study

A medical condition is a broad term that includes all diseases, lesions, and disorders. In our second test case, we will join the conditions records with the patient records and filter for any condition containing the term ‘sinusitis’ in preparation for our Data Science team.

start stop patient encounter code description
2012-09-05 2012-10-16 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 05a6ef43-d690-455e-ab2f-1ea19d902274 44465007 Sprain of ankle
2014-09-08 2014-09-28 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 1cdcbe46-caaf-4b3f-b58c-9ca9ccb13013 283371005 Laceration of forearm
2014-11-28 2014-12-13 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 b222e257-98da-4a1b-a46c-45d5ad01bbdc 195662009 Acute viral pharyngitis (disorder)
1980-01-09 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 40055000 Chronic sinusitis (disorder)
1989-06-25 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 201834006 Localized primary osteoarthritis of the hand
1996-01-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 196416002 Impacted molars
2016-02-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 748cda45-c267-46b2-b00d-3b405a44094e 15777000 Prediabetes
2016-04-27 2016-05-20 01858c8d-f81c-4a95-ab4f-bd79fb62b284 a64734f1-5b21-4a59-b2e8-ebfdb9058f8b 444814009 Viral sinusitis (disorder)
2014-02-06 2014-02-19 d32e9ad2-4ea1-4bb9-925d-c00fe85851ae c64d3637-8922-4531-bba5-f3051ece6354 43878008 Streptococcal sore throat (disorder)
1982-05-18 08858d24-52f2-41dd-9fe9-cbf1f77b28b2 3fff3d52-a769-475f-b01b-12622f4fee17 368581000119106 Neuropathy due to type 2 diabetes mellitus (disorder)
view raw conditions.csv hosted with ❤ by GitHub
Sample of raw conditions data

Data preparation includes the following steps:

  1. Load 483k condition records using the existing AWS Glue Data Catalog table.
  2. Inner join the condition records with the 132k patient records based on patient ID.
  3. Remove any duplicate records.
  4. Drop approximately 15 unneeded columns.
  5. Select only the records where the description column contains the term “sinusitis.”
  6. Remove any rows with empty ethnicity, race, gender, or marital columns.
  7. Create a new column, condition_age, based on a calculation of the age in days at which the patient’s condition was diagnosed.
  8. Write the resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet-format files. No partitions are necessary.
  9. Given the small resultset, bucket the data such that only one file is written to minimize the impact of too many small files on future query performance.
  10. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog.

AWS ELT Options

There are numerous options on AWS to handle the batch transformation use case described above; a non-exhaustive list includes:

  1. AWS Glue Studio (UI-driven with AWS Glue PySpark Extensions)
  2. Amazon Glue DataBrew
  3. Amazon Athena
  4. Amazon EMR with Apache Spark
  5. AWS Glue Studio (Apache Spark script)
  6. AWS Glue Jobs (Legacy jobs)
  7. Amazon EMR with Presto
  8. Amazon EMR with Trino
  9. Amazon EMR with Hive
  10. AWS Step Functions and AWS Lambda
  11. Amazon Redshift Spectrum
  12. Partner solutions on AWS, such as Databricks, Snowflake, Upsolver, StreamSets, Stitch, and Fivetran
  13. Self-managed custom solutions using a combination of OSS, such as dbt, Airbyte, Dagster, Meltano, Apache NiFi, Apache Drill, Apache Beam, Pandas, Apache Airflow, and Kubernetes

For this comparison, we will choose the first five options listed above to develop our ELT data preparation pipelines: AWS Glue Studio (UI-driven job creation with AWS Glue PySpark Extensions), Amazon Glue DataBrew, Amazon Athena, Amazon EMR with Apache Spark, and AWS Glue Studio (Apache Spark script).

Data pipeline architecture showing a choice of AWS ELT services

AWS Glue Studio

According to the documentation, “AWS Glue Studio is a new graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. You can visually compose data transformation workflows and seamlessly run them on AWS Glue’s Apache Spark-based serverless ETL engine. You can inspect the schema and data results in each step of the job.

AWS Glue Studio’s visual job creation capability uses the AWS Glue PySpark Extensions, an extension of the PySpark Python dialect for scripting ETL jobs. The extensions provide easier integration with AWS Glue Data Catalog and other AWS-managed data services. As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Spark scripts with AWS Glue Studio. In fact, we can use the exact same scripts run on Amazon EMR.

For the tests, we are using the G.2X worker type, Glue version 3.0 (Spark 3.1.1 and Python 3.7), and Python as the language choice for this comparison. We will test three worker configurations using both UI-driven job creation with AWS Glue PySpark Extensions and Apache Spark script options:

  • 10 workers with a maximum of 20 DPUs
  • 20 workers with a maximum of 40 DPUs
  • 40 workers with a maximum of 80 DPUs
AWS Glue Studio visual job creation UI for Test Case 3: Sinusitis Study

AWS Glue Studio Spark job details for Test Case 2: Observations

AWS Glue Studio job runs for Test Case 2: Observations

AWS Glue DataBrew

According to the documentation, “AWS Glue DataBrew is a visual data preparation tool that enables users to clean and normalize data without writing any code. Using DataBrew helps reduce the time it takes to prepare data for analytics and machine learning (ML) by up to 80 percent, compared to custom-developed data preparation. You can choose from over 250 ready-made transformations to automate data preparation tasks, such as filtering anomalies, converting data to standard formats, and correcting invalid values.

DataBrew allows you to set the maximum number of DataBrew nodes that can be allocated when a job runs. For this comparison, we will test three different node configurations:

  • 3 maximum nodes
  • 10 maximum nodes
  • 20 maximum nodes
AWS Glue DataBrew Project for Test Case 3: Sinusitis Study

AWS Glue DataBrew Recipe for Test Case 1: Encounters for Symptom

AWS Glue DataBrew recipe job runs for Test Case 1: Encounters for Symptom

Amazon Athena

According to the documentation, “Athena helps you analyze unstructured, semi-structured, and structured data stored in Amazon S3. Examples include CSV, JSON, or columnar data formats such as Apache Parquet and Apache ORC. You can use Athena to run ad-hoc queries using ANSI SQL, without the need to aggregate or load the data into Athena.

Although Athena is classified as an ad-hoc query engine, using a CREATE TABLE AS SELECT (CTAS) query, we can create a new table in the AWS Glue Data Catalog and write to Amazon S3 from the results of a SELECT statement from another query. That other query statement performs a transformation on the data using SQL.

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 2: Observations

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 3: Sinusitis Study

Amazon Athena is a fully managed AWS service and has no performance settings to adjust or monitor.

Amazon Athena CTAS statement for Test Case 1: Encounters for Symptom

Parquet data partitioned by year in Amazon S3 for Test Case 1: Encounters for Symptom, using Athena

CTAS and Partitions

A notable limitation of Amazon Athena for the batch use case is the 100 partition limit with CTAS queries. Athena [only] supports writing to 100 unique partition and bucket combinations with CTAS. Partitioned by year, month, and day, the observations test case requires 2,558 partitions, and the observations test case requires 10,433 partitions. There is a recommended workaround using an INSERT INTO statement. However, the workaround requires additional SQL logic, computation, and most important cost. It is not practical, in my opinion, compared to other methods when a higher number of partitions are needed. To avoid the partition limit with CTAS, we will only partition by year and bucket by month when using Athena. Take this limitation into account when comparing the final results.

Amazon EMR with Apache Spark

According to the documentation, “Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. You can quickly and easily create managed Spark clusters from the AWS Management Console, AWS CLI, or the Amazon EMR API.

For this comparison, we are using two different Spark 3.1.2 EMR clusters:

  • (1) r5.xlarge Master node and (2) r5.2xlarge Core nodes
  • (1) r5.2xlarge Master node and (4) r5.2xlarge Core nodes

All Spark jobs are written in both Python (PySpark) and Scala. We are using the AWS Glue Data Catalog as the metastore for Spark SQL instead of Apache Hive.

4-node Amazon EMR cluster shown in Amazon EMR Management Console

Completed EMR Steps (Spark Jobs) on 4-node Amazon EMR cluster

# Purpose: Process data for sinusitis study using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "sinusitis_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
patient,
code,
description,
datediff(
date(substr(start, 1, 10)),
date(substr(birthdate, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions as c, patients as p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.bucketBy(1, "patient") \
.sortBy("patient", "code") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 3: Sinusitis Study

# Purpose: Process encounters dataset using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "encounter_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition",
"true") \
.config("hive.exec.dynamic.partition.mode",
"nonstrict") \
.config("hive.exec.max.dynamic.partitions",
"10000") \
.config("hive.exec.max.dynamic.partitions.pernode",
"10000") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
id,
patient,
code,
description,
reasoncode,
reasondescription,
year(date) as year,
month(date) as month,
day(date) as day
FROM encounters
WHERE description='Encounter for symptom';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.partitionBy("year", "month", "day") \
.bucketBy(1, "patient") \
.sortBy("patient") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 1: Encounters for Symptom

package main.spark.demo
// Purpose: Process observations dataset using Spark on Amazon EMR with Scala
// Author: Gary A. Stafford
// Date: 2022-03-06
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object Observations {
def main(args: Array[String]): Unit = {
val (spark: SparkSession, sc: SparkContext) = createSession
performELT(spark, sc)
}
private def createSession = {
val spark: SparkSession = SparkSession.builder
.appName("Observations ELT App")
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
.config("hive.exec.dynamic.partition",
"true")
.config("hive.exec.dynamic.partition.mode",
"nonstrict")
.config("hive.exec.max.dynamic.partitions",
"10000")
.config("hive.exec.max.dynamic.partitions.pernode",
"10000")
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("INFO")
(spark, sc)
}
private def performELT(spark: SparkSession, sc: SparkContext) = {
val tableName: String = sc.getConf.get("spark.executorEnv.TABLE_NAME")
val dataLakeBucket: String = sc.getConf.get("spark.executorEnv.DATA_LAKE_BUCKET")
spark.sql("USE synthea_patient_big_data;")
val sql_query_data: String =
"""
SELECT DISTINCT
patient,
encounter,
code,
description,
value,
units,
year(date) as year,
month(date) as month,
day(date) as day
FROM observations
WHERE date <> 'date';
"""
val observationsDF: DataFrame = spark
.sql(sql_query_data)
observationsDF
.coalesce(1)
.write
.partitionBy("year", "month", "day")
.bucketBy(1, "patient")
.sortBy("patient")
.mode("overwrite")
.format("parquet")
.option("path", s"s3://${dataLakeBucket}/${tableName}/")
.saveAsTable(tableName = tableName)
spark.sql(s"ALTER TABLE ${tableName} SET TBLPROPERTIES ('classification'='parquet');")
}
}
Spark jobs written in Scala had nearly identical execution times, such as Test Case 2: Observations

Partitions in the AWS Glue Data Catalog table for Test Case 1: Encounters for Symptom

Results

Data pipelines were developed and tested for each of the three test cases using the five chosen AWS ELT services and configuration variations. Each pipeline was then run 3–5 times, for a total of approximately 150 runs. The resulting AWS Glue Data Catalog table and data in Amazon S3 were deleted between each pipeline run. Each new run created a new data catalog table and wrote new results to Amazon S3. The median execution times from these tests are shown below.

Number of raw and processed records for each test case

Overall results (see details below) — lower times are better

Although we can make some general observations about the execution times of the chosen AWS services, the results are not meant to be a definitive guide to performance. An accurate comparison would require a deeper understanding of how each of these managed services works under the hood, in order to both optimize and balance their compute profiles correctly.

Amazon Athena

The Resultset column contains the final number of records written to Amazon S3 by Athena. The results contain the data pipeline’s median execution time and any additional data points.

Results for Amazon Athena data pipelines

AWS Glue Studio (AWS Glue PySpark Extensions)

Tests were run with three different configurations for AWS Glue Studio using the graphical interface for creating jobs with AWS Glue PySpark Extensions. Times for each configuration were nearly identical.

Results for data pipelines using AWS Glue Studio with AWS Glue PySpark Extensions

AWS Glue Studio (Apache PySpark script)

As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Apache Spark scripts with AWS Glue Studio. The tests were run with the same three configurations as above. The execution times compared to the Amazon EMR tests, below, are almost identical.

Results for data pipelines using PySpark scripts on AWS Glue Studio

Amazon EMR with Apache Spark

Tests were run with three different configurations for Amazon EMR with Apache Spark using PySpark. The first set of results is for the 2-node EMR cluster. The second set of results is for the 4-node cluster. The third set of results is for the same 4-node cluster in which the data was not bucketed into a single file within each partition. Compare the execution times and the number of objects against the previous set of results. Too many small files can negatively impact query performance.

Results for data pipelines using Amazon EMR with Apache Spark — times for PySpark scripts

It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had similar execution times for all three test cases.

Results for data pipelines using Amazon EMR with Apache Spark — Python vs. Scala

Amazon Glue DataBrew

Tests were run with three different configurations Amazon Glue DataBrew, including 3, 10, and 20 maximum nodes. Times for each configuration were nearly identical.

Results for data pipelines using Amazon Glue DataBrew

Observations

  1. All tested AWS services can read and write to an AWS Glue Data Catalog and the underlying datastore, Amazon S3. In addition, they all work with the most common analytics data file formats.
  2. All tested AWS services have rich APIs providing access through the AWS CLI and SDKs, which support multiple programming languages.
  3. Overall, AWS Glue Studio, using the AWS Glue PySpark Extensions, appears to be the most capable ELT tool of the five services tested and with the best performance.
  4. Both AWS Glue DataBrew and AWS Glue Studio are no-code or low-code services, democratizing access to data for non-programmers. Conversely, Amazon Athena requires knowledge of ANSI SQL, and Amazon EMR with Apache Spark requires knowledge of Scala or Python. Be cognizant of the potential trade-offs from using no-code or low-code services on observability, configuration control, and automation.
  5. Both AWS Glue DataBrew and AWS Glue Studio can write a custom Parquet writer type optimized for Dynamic Frames, GlueParquet. One potential advantage, a pre-computed schema is not required before writing.
  6. There is a slight ‘cold-start’ with Glue Studio. Studio startup times ranged from 7 seconds to 2 minutes and 4 seconds in the tests. However, the lower execution time of AWS Glue Studio compared to Amazon EMR with Spark and AWS Glue DataBrew in the tests offsets any initial cold-start time, in my opinion.
  7. Changing the maximum number of units from 3 to 10 to 20 for AWS Glue DataBrew made negligible differences in job execution times. Given the nearly identical execution times, it is unclear exactly how many units are being used by the job. More importantly, how many DataBrew node hours we are being billed for. These are some of the trade-offs with a fully-managed service — visibility and fine-tuning configuration.
  8. Similarly, with AWS Glue Studio, using either 10 workers w/ max. 20 DPUs, 20 workers w/ max. 40 DPUs, or 40 workers w/ max. 80 DPUs resulted in nearly identical executions times.
  9. Amazon Athena had the fastest execution times but is limited by the 100 partition limit for large CTAS resultsets. Athena is not practical, in my opinion, compared to other ELT methods, when a higher number of partitions are needed.
  10. It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had almost identical execution times for all three test cases.
  11. Using Amazon EMR with EC2 instances takes about 9 minutes to provision a new cluster for this comparison fully. Given nearly identical execution times to AWS Glue Studio with Apache Spark scripts, Glue has the clear advantage of nearly instantaneous startup times.
  12. AWS recently announced Amazon EMR Serverless. Although this service is still in Preview, this new version of EMR could potentially reduce or eliminate the lengthy startup time for ephemeral clusters requirements.
  13. Although not discussed, scheduling the data pipelines to run each night was a requirement for our use case. AWS Glue Studio jobs and AWS Glue DataBrew jobs are schedulable from those services. For Amazon EMR and Amazon Athena, we could use Amazon Managed Workflows for Apache Airflow (MWAA), AWS Data Pipeline, or AWS Step Functions combined with Amazon CloudWatch Events Rules to schedule the data pipelines.

Conclusion

Customers have many options for ELT — the cleansing, deduplication, refinement, and normalization of raw data. We examined chosen services on AWS, each capable of handling the analytics use case presented. The best choice of tools depends on your specific ELT use case and performance requirements.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Evolving Models for ISV Software Delivery, Management, and Support

Understanding evolving models used by Independent Software Vendors for cloud-based software delivery, management, and support

Copyright: melpomen (123rf.com)

Introduction

As a Consultant, Enterprise Architect, Partner Solutions Architect, and Senior Solutions Architect, I have had the chance to work with many successful Independent Software Vendors (ISVs), from early-stage startups to large established enterprises. Based on my experience, I wrote two AWS Partner Network (APN) Blog posts: Architecting Successful SaaS: Understanding Cloud-Based Software-as-a-Service Models and Architecting Successful SaaS: Interacting with Your SaaS Customer’s Cloud Accounts. Continuing with that series, this post explores several existing and evolving models used by ISV’s to deliver, manage, and support their software product to cloud-based customers.

Independent Software Vendors

An ISV, also known as a software publisher, specializes in making and selling software designed for mass or niche markets. This is in contrast to in-house software, which the organization develops for its internal use, or custom software designed for a single, specific third party. Although end-users consume ISV-provided software, it remains the property of the vendor (source: Wikipedia).

The ISV industry, especially SaaS-based products, has seen huge year-over-year (YOY) growth. VC firms continue to fuel industry growth (and valuations) with an unprecedentedly high level of capital investment throughout 2021. According to SaaS Industry, the total investment for Q1-2021 stood at $9.9B. B2B data industry resource, Datamation, examines prominent ISVs in their article, Top 75 SaaS Companies of 2022. SaaS management company, Cledara, produced a similar piece, The Top SaaS Companies in 2021.

Online Marketplaces

Cloud-based ISV software products are purchased directly from the vendor, or more recently, through marketplaces hosted by major cloud providers. In their Predicts 2022: SaaS Dominates Software Contracting by 2026 — and So Do Risks, Gartner observes, “Online marketplaces have become more prevalent (e.g., Amazon Web Services [AWS], Google, etc.). With easy access to these marketplaces, customers can and are purchasing marketplace products without the need to engage the software vendor directly or interact with sourcing or procurement within their organizations.” Examples of marketplaces include AWS Marketplace, Azure Marketplace, Google Cloud Marketplace, Salesforce AppExchange, and Oracle Cloud Marketplace.

Major Cloud Providers’ approximate market share, according to Statista and Canalys

AWS Marketplace, for example, describes itself as “a curated digital catalog that makes it easy for organizations to discover, procure, entitle, provision, and govern third-party software.” Company tackle.io, whose platform facilitates the process of listing, selling, and managing cloud marketplaces for ISVs, produced a report, State of Cloud Marketplaces 2021, detailing the leading cloud software sales and delivery platforms.

Purpose-built Products

Based on my observations, most ISV products can be classified as either purpose-built or general-purpose. Purpose-built ISV products are designed to address a specific customer need. Many are considered enterprise software, also known as Enterprise Application Software (EAS). Enterprise software includes Customer Relationship Management (CRM), Management Information Systems (MIS), Enterprise Resource Planning (ERP), Human Resource Management (HRM or HRIS), Content Management Systems (CMS), Learning Management Systems (LMS), Field Service Management (FSM), Knowledge Management Systems (KMS), Talent Management Systems (TMS), and Applicant Tracking Systems (ATS).

General-purpose Products

General-purpose ISV products often focus on a certain technology, such as security, identity management, databases, analytics, storage, AI/ML, and virtual desktops. These products are frequently used by customers as one part of a larger solution. Many of these products are hosted ‘as-a-Service,’ such as Database as a Service (DBaaS), Data Warehousing as a Service (DWaaS), Monitoring as a Service (MaaS), Analytics as a Service (AaaS), Machine Learning-as-a-Service (MLaaS), Identity-as-a-Service (IaaS), Desktop as a Service (DaaS), and Storage as a Service (STaaS).

Examining the current 19,919 listings in the AWS Marketplace, by general category, we can see a mix of purpose-built (e.g., Business Applications, Industries) and general-purpose ISV products (e.g., DevOps, ML, IoT, Data, Infrastructure).

AWS Marketplace product by category (January 2022)

Below are all the categories of ISV products and services found on the AWS Marketplace.

AWS Marketplace product categories (January 2022)

Similarly, looking at the current 5,008 listings in the Google Cloud Marketplace by category, we can see both purpose-built and general-purpose ISV products.

Google Cloud Marketplace products by category (January 2022)

SaaS-as-a-Service

There is even an established market for SaaS-as-a-Service (SaaSaaS) — products and platforms designed to enable ISVs and SaaS providers. These products and platforms are designed to help overcome the inherent engineering complexities required to prepare, deliver, manage, bill, and support ISV products. Examples include services such as AWS SaaS Factory Program, AWS SaaS Boost, and Azure SaaS Development Kit (ASDK), as well as vendors, like tackle.io and AppDirect.

Current ISV Models

As the organizations continue to move their IT infrastructure and workloads to cloud providers such as Amazon Web Services (AWS), Google Cloud, and Microsoft Azure, ISVs have had to evolve how they distribute, manage, and support their software products. Today, most ISVs use a variation of one of three models: Customer-deployed (aka self-hosted), Software as a Service (SaaS), and SaaS with Remote Agents.

These methods are evident from looking at the current listings in the AWS Marketplace by delivery method. Of the 14,444 products, 11.3% are categorized as SaaS. Many of the remaining delivery methods could be classified as Customer-deployed products. The most significant percentage of products are delivered as Amazon Machine Images (AMI). Custom-built VM images were traditionally the most common delivery forms. However, newer technologies, such as Container Images, Helm Charts, Data Exchange (Datasets), and SageMaker (ML) Algorithms and Models are quickly growing in popularity. Data Exchange products, for example, have doubled in 18 months.

AWS Marketplace products by delivery method (January 2022)

Customer-deployed Model

In a Customer-deployed ISV product model, the customer deploys the ISV’s software product into their own Cloud environment. The ISV’s product is packaged as virtual machine images, such as Amazon Machine Images (AMIs), Docker container images, Helm Charts, licensed datasets, machine learning models, and infrastructure as code (IaC) files, such as Amazon CloudFormation Templates.

Customer-deployed (aka self-hosted) model

With Customer-deployed products, it is not required but also not uncommon for the ISV to have some connection to the customer’s cloud environment for break-the-glass (BTG) support, remote monitoring, or license management purposes.

Software as a Service (SaaS)

According to Wikipedia, SaaS is a software licensing and delivery model in which software is licensed on a subscription basis and is centrally hosted within the ISV’s cloud environment. SaaS is one of the three best-known cloud computing models, along with Platform as a Service (PaaS) and Infrastructure as a Service (IaaS).

Software as a Service (SaaS) model

With SaaS, the customer’s data can remain in the customer’s cloud environment. A secure connection, such as an Open Database Connectivity (ODBC) or Java Database Connectivity (JDBC) connection, can be made to the customer’s datasources. Alternately, the customer’s data is securely copied in advance or just-in-time (JIT) to dedicated storage within the ISV’s cloud environment. Using caching technologies, such as RubiX, Databricks Delta caching, and Apache Spark caching, data can be cached as needed. Some caching technologies, such as Alluxio, even offer tiered caching based on the frequency it is accessed — hot, warm, or cold.

SaaS with Remote Agents Model

The SaaS with Remote Agents model is a variation of the pure SaaS model. In this scenario, the customer deploys ISV-supplied software agents within their cloud, on-premise, and edge (IoT) environments. Software agents can be language-specific libraries or modules added to an application, sidecar containers, serverless functions, or stand-alone VMs. These agents collect data, pre-optimized payloads, and push data back to the ISV’s cloud environment. The prototypical example of this model is monitoring/observability and Application Performance Monitoring (APM) vendors. They often use agents to collect and aggregate a customer’s telemetry (logs, metrics, events, traces) to the ISV’s external cloud environment. The ISV’s cloud environment acts as a centralized, single pane of glass for the customer to view their aggregated telemetry.

SaaS with Remote Agents model

Some cloud providers offer products designed specifically to make a customer’s integration with SaaS products easier. With Amazon EventBridge, for example, you can “easily connect to and stream data from your SaaS applications without having to write any code.” Amazon EventBridge has established integrations with dozens of SaaS partners, including Auth0, DataDog, MongoDB, New Relic, Opsgenie, PagerDuty, Shopify, and Zendesk.

Evolving ISV Models

Remotely-managed Model

In addition to the customer-deployed and SaaS models, some ISVs have developed new models for offering their software products. One such model is what I refer to as the Remotely-managed model. This hybrid model combines the best aspects of both the Customer-deployed and SaaS models. They are designed to address common customer concerns, such as security, speed, ease of use, and cost.

Remotely-managed model

With the Remotely-managed model, the ISV’s product is administered by the customer through a user interface (UI) hosted in the ISV’s cloud environment. The administrative actions of the customer are translated into commands, which are executed in the customer’s cloud environment. These remote commands are communicated using API calls or bi-directional message queues such as EventBridge. Often, the customer grants the ISV programmatic access to their environment. The ISVs access is limited to a fine-grain set of permissions, based on the principle of least privilege (PoLP), to deploy and manage their product, usually isolated within a separate customer account or Virtual Private Cloud (VPC).

Deploying the ISV’s product to the customer’s environment adjacent to the data maximizes security by eliminating data movement external to the customer’s cloud environment. Instead, computations are done adjacent to data within the customer’s environment.

SaaS Façade Model

Recently, I have been developing some architectural thinking around a newer model that I call the SaaS Façade model. A façade or facade is generally the front part or exterior of a building. In software design, a facade is an object that serves as a front-facing interface masking more complex underlying or structural code (source: Wikipedia).

SaaS Façade model

The SaaS Façade model is a variation of the Remotely-managed model. Although architecturally more complex than the Remotely-managed model, the SaaS Façade model is simpler from a customer perspective. Both the customer’s administrators and end-users access the software product through the ISV’s cloud environment, but there is little to no data movement from the customer’s environment.

Separating Front-end from Back-end

The ISV’s product architecture is the most significant difference between the SaaS Façade model and the Remotely-managed model. Most modern software products are composed of multiple, decoupled components or tiers, including front-end/UI/presentation layer, back-end/services, and data. In the SaaS Façade model, the customer’s end-users access the ISV’s product through the ISV’s cloud environment, similar to SaaS. The ISV’s front-end is deployed to the ISV’s cloud environment. The ISV’s product’s back-end is deployed to the customer’s cloud environment, adjacent to the customer’s data. The ISV product’s data tier is deployed to either the ISV’s or customer’s cloud environment, depending on the product’s exact architectural requirements. This model requires a highly decoupled architecture and tolerance for moderate latency.

Decoupled User Management

A frequent request from customers of ISV software concerns user management. Customers want to allow approved external users to access read-only data, such as a sales report, without adding them to the customer’s cloud environment’s Identity and Access Management (IAM) system. Additionally, end-users do not need to access the software by first logging in through the customer’s cloud provider’s console and having an established IAM identity. The SaaS Façade model enables this capability.

Multi-Cloud

Another potential use case for the SaaS Façade model is implementing a multi-cloud customer architecture. Imagine an ISV’s cloud environment hosted on a single public cloud provider’s platform, while the customer has workloads and data housed on multiple cloud provider’s platforms. The ISV’s product’s back-end would be deployed to multiple cloud provider’s platforms using a common compute construct such as a Linux-based VM (e.g., Amazon EC2, Azure VM, or Google Cloud Compute Engine) or on Kubernetes (e.g., AWS’s EKS, Google Cloud’s GKE, or Azure’ AKS). The ISV product’s data-tier would also be built on a database engine common to most major cloud providers, such as MySQL or PostgreSQL. Similar to the SaaS with Remote Agents model, the ISV’s environment act as a single portal to the customer’s multiple environments and decentralized data sources.

SaaS Façade model with a multi-cloud configuration

In this scenario, the ISV product’s front-end and back-end are common and independent of the cloud provider’s platform. The customer-managed administration interface is also common. Potentially, only the ISV’s deployment, configuration, and monitoring elements may need to have aspects specific to each cloud provider’s platform. For example, Kubernetes is common to AWS, Google Cloud, and Azure. However, the authentication methods, IaC, and API commands to provision a Kubernetes cluster or deploy a containerized application differ between EKS, GKE, and AKS.

Conclusion

In this post, we briefly explored several models used by ISV’s to deliver, manage, and support their software product for cloud-native customers. As cloud adoption continues to grow and the complexity of cloud-based application platforms continues to evolve, ISVs will continue to develop new models for distributing their software products in the cloud.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. Introduction image – Copyright: melpomen (123rf.com).

, , , , ,

Leave a comment

The Art of Building Open Data Lakes with Apache Hudi, Kafka, Hive, and Debezium

Build near real-time, open-source data lakes on AWS using a combination of Apache Kafka, Hudi, Spark, Hive, and Debezium

Introduction

In the following post, we will learn how to build a data lake on AWS using a combination of open-source software (OSS), including Red Hat’s Debezium, Apache Kafka, Kafka Connect, Apache Hive, Apache Spark, Apache Hudi, and Hudi DeltaStreamer. We will use fully-managed AWS services to host the datasource, the data lake, and the open-source tools. These services include Amazon RDS, MKS, EKS, EMR, and S3.

The architecture and workflow demonstrated in this post

This post is an in-depth follow-up to the video demonstration, Building Open Data Lakes on AWS with Debezium and Apache Hudi.

Workflow

As shown in the architectural diagram above, these are the high-level steps in the demonstration’s workflow:

  1. Changes (inserts, updates, and deletes) are made to the datasource, a PostgreSQL database running on Amazon RDS;
  2. Kafka Connect Source Connector, utilizing Debezium and running on Amazon EKS (Kubernetes), continuously reads data from PostgreSQL WAL using Debezium;
  3. Source Connector creates and stores message schemas in Apicurio Registry, also running on Amazon EKS, in Avro format;
  4. Source Connector transforms and writes data in Apache Avro format to Apache Kafka, running on Amazon MSK;
  5. Kafka Connect Sink Connector, using Confluent S3 Sink Connector, reads messages from Kafka topics using schemas from Apicurio Registry;
  6. Sink Connector writes data to Amazon S3 in Apache Avro format;
  7. Apache Spark, using Hudi DeltaStreamer and running on Amazon EMR, reads message schemas from Apicurio Registry;
  8. DeltaStreamer reads raw Avro-format data from Amazon S3;
  9. DeltaStreamer writes data to Amazon S3 as both Copy on Write (CoW) and Merge on Read (MoR) table types;
  10. DeltaStreamer syncs Hudi tables and partitions to Apache Hive running on Amazon EMR;
  11. Queries are executed against Apache Hive Metastore or directly against Hudi tables using Apache Spark, with data returned from Hudi tables in Amazon S3;

The workflow described above actually contains two independent processes running simultaneously. Steps 2–6 represent the first process, the change data capture (CDC) process. Kafka Connect is used to continuously move changes from the database to Amazon S3. Steps 7–10 represent the second process, the data lake ingestion process. Hudi’s DeltaStreamer reads raw CDC data from Amazon S3 and writes the data back to another location in S3 (the data lake) in Apache Hudi table format. When combined, these processes can give us near real-time, incremental data ingestion of changes from the datasource to the Hudi-managed data lake.

Alternatives

This demonstration’s workflow is only one of many possible workflows to achieve similar outcomes. Alternatives include:

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. The specific resources used in this post are found in the debezium_hudi_demo directory of the GitHub repository. There are also two copies of the Museum of Modern Art (MoMA) Collection dataset from Kaggle, specifically prepared for this post, located in the moma_data directory. One copy is a nearly full dataset, and the other is a smaller, cost-effective dev/test version.

Kafka Connect

In this demonstration, Kafka Connect runs on Kubernetes, hosted on the fully-managed Amazon Elastic Kubernetes Service (Amazon EKS). Kafka Connect runs the Source and Sink Connectors.

Source Connector

The Kafka Connect Source Connector, source_connector_moma_postgres_kafka.json, used in steps 2–4 of the workflow, utilizes Debezium to continuously read changes to an Amazon RDS for PostgreSQL database. The PostgreSQL database hosts the MoMA Collection in two tables: artists and artworks.

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<your_database_hostname>",
"database.port": "5432",
"database.user": "<your_username>",
"database.password": "<your_password>",
"database.dbname": "moma",
"database.server.name": "moma",
"table.include.list": "public.artists,public.artworks",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,db,table,schema,lsn,source.ts_ms"
}

The Debezium Connector for PostgreSQL reads record-level insert, update, and delete entries from PostgreSQL’s write-ahead log (WAL). According to the PostgreSQL documentation, changes to data files must be written only after log records describing the changes have been flushed to permanent storage, thus the name, write-ahead log. The Source Connector then creates and stores Apache Avro message schemas in Apicurio Registry also running on Amazon EKS.

Apicurio Registry UI showing Avro-format Kafka message schemas
Apicurio Registry UI showing part of Avro-format Kafka message value schema for artists

Finally, the Source Connector transforms and writes Avro format messages to Apache Kafka running on the fully-managed Amazon Managed Streaming for Apache Kafka (Amazon MSK). Assuming Kafka’s topic.creation.enable property is set to true, Kafka Connect will create any necessary Kafka topics, one per database table.

Below, we see an example of a Kafka message representing an insert of a record with the artist_id 1 in the MoMA Collection database’s artists table. The record was read from the PostgreSQL WAL, transformed, and written to a corresponding Kafka topic, using the Debezium Connector for PostgreSQL. The first version represents the raw data before being transformed by Debezium. Note that the type of operation (_op) indicates a read (r). Possible values include c for create (or insert), u for update, d for delete, and r for read (applies to snapshots).

{
"payload": {
"before": null,
"after": {
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992
},
"source": {
"version": "1.7.0.Final",
"connector": "postgresql",
"name": "moma",
"ts_ms": 1640703877051,
"snapshot": "true",
"db": "moma",
"sequence": "[null,\"3668170506336\"]",
"schema": "public",
"table": "artists",
"txId": 217094,
"lsn": 3668170506336,
"xmin": null
},
"op": "r",
"ts_ms": 1640703877051,
"transaction": null
}
}

The next version represents the same record after being transformed by Debezium using the event flattening single message transformation (unwrap SMT). The final message structure represents the schema stored in Apicurio Registry. The message structure is identical to the structure of the data written to Amazon S3 by the Sink Connector.

{
"payload": {
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992,
"__op": "r",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3668438941792,
"__source_ts_ms": 1640705109121,
"__deleted": "false"
}
}

Sink Connector

The Kafka Connect Sink Connector, sink_connector_moma_kafka_s3.json, used in steps 5–6 of the workflow, implements the Confluent S3 Sink Connector. The Sink Connector reads the Avro-format messages from Kafka using the schemas stored in Apicurio Registry. It then writes the data to Amazon S3, also in Apache Avro format, based on the same schemas.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "moma.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "<your_data_lake_bucket>",
"s3.part.size": 5242880,
"flush.size": 10000,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"behavior.on.null.values": "ignore",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}

Running Kafka Connect

We first start Kafka Connect in the background to be the CDC process.

bin/connect-distributed.sh \
config/connect-distributed.properties \
> /dev/null 2>&1 &
tail -f logs/connect.log

Then, deploy the Kafka Connect Source and Sink Connectors using Kafka Connect’s RESTful API. Using the API, we can also confirm the status of the Connectors.

curl -s -d @"config/source_connector_moma_postgres_kafka.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/source_connector_moma_postgres_kafka/config | jq
curl -s -d @"config/sink_connector_moma_kafka_s3.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/sink_connector_moma_kafka_s3/config | jq
curl -s -X GET http://localhost:8083/connectors | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/source_connector_moma_postgres_kafka/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/sink_connector_moma_kafka_s3/status | jq

To confirm the two Kafka topics, moma.public.artists and moma.public.artworks, were created and contain Avro messages, we can use Kafka’s command-line tools.

# list kafka topics
bin/kafka-topics.sh –list \
–bootstrap-server $BBROKERS \
–command-config config/client-iam.properties
# read first 5 avro-format (binary) messages from topic
bin/kafka-console-consumer.sh \
–topic moma.public.artists \
–from-beginning \
–max-messages 5 \
–property print.value=true \
–property print.offset=true \
–bootstrap-server $BBROKERS \
–consumer.config config/client-iam.properties

In the short video-only clip below, we see the process of deploying the Kafka Connect Source and Sink Connectors and confirming they are working as expected.

Deploying and starting the Kafka Connect Source and Sink Connectors

The Sink Connector writes data to Amazon S3 in batches of 10k messages or every 60 seconds (one-minute intervals). These settings are configurable and highly dependent on your requirements, including message volume, message velocity, real-time analytics requirements, and available compute resources.

Amazon S3 objects containing MoMA Collection artwork records from PostgreSQL

Since we will not be querying this raw Avro-format CDC data in Amazon S3 directly, there is no need to catalog this data in Apache Hive or AWS Glue Data Catalog, a fully-managed Hive-compatible metastore.

Apache Hudi

According to the overview, Apache Hudi (pronounced “hoodie”) is the next-generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality to data lakes. Hudi provides tables, transactions, efficient upserts and deletes, advanced indexes, streaming ingestion services, data clustering, compaction optimizations, and concurrency, all while keeping data in open source file formats.

Without Hudi or an equivalent open-source data lake table format such as Apache Iceberg or Databrick’s Delta Lake, most data lakes are just of bunch of unmanaged flat files. Amazon S3 cannot natively maintain the latest view of the data, to the surprise of many who are more familiar with OLTP-style databases or OLAP-style data warehouses.

DeltaStreamer

DeltaStreamer, aka the HoodieDeltaStreamer utility (part of the hudi-utilities-bundle), used in steps 7–10 of the workflow, provides the way to perform streaming ingestion of data from different sources such as Distributed File System (DFS) and Apache Kafka.

Optionally, HoodieMultiTableDeltaStreamer, a wrapper on top of HoodieDeltaStreamer, ingests multiple tables in a single Spark job, into Hudi datasets. Currently, it only supports sequential processing of tables to be ingested and Copy on Write table type.

We are using HoodieDeltaStreamer to write to both Merge on Read (MoR) and Copy on Write (CoW) table types for demonstration purposes only. The MoR table type is a superset of the CoW table type, which stores data using a combination of columnar-based (e.g., Apache Parquet) plus row-based (e.g., Apache Avro) file formats. Updates are logged to delta files and later compacted to produce new versions of columnar files synchronously or asynchronously. Again, the choice of table types depends on your requirements.

Trade-offs between Hudi table types (table courtesy Apache Hudi documentation)
Trade-offs between Hudi table types (table courtesy Apache Hudi documentation)

Amazon EMR

For this demonstration, I’ve used the recently released Amazon EMR version 6.5.0 configured with Apache Spark 3.1.2 and Apache Hive 3.1.2. EMR 6.5.0 runs Scala version 2.12.10, Python 3.7.10, and OpenJDK Corretto-8.312. I have included the AWS CloudFormation template and parameters file used to create the EMR cluster, on GitHub.

When choosing Apache Spark, Apache Hive, or Presto on EMR 6.5.0, Apache Hudi release 0.9.0 is automatically installed.

Amazon EMR Master Node showing Apache Hudi related resources

DeltaStreamer Configuration

Below, we see the DeltaStreamer properties file, deltastreamer_artists_apicurio_mor.properties. This properties file is referenced by the Spark job that runs DeltaStreamer, shown next. The file contains properties related to the datasource, the data sink, and Apache Hive. The source of the data for DeltaStreamer is the CDC data written to Amazon S3. In this case, the datasource is the objects located in the /topics/moma.public.artworks/partition=0/ S3 object prefix. The data sink is a Hudi MoR table type in Amazon S3. DeltaStreamer will write Parquet data, partitioned by the artist’s nationality, to the /moma_mor/artists/ S3 object prefix. Lastly, DeltaStreamer will sync all tables and table partitions to Apache Hive, including creating the Hive databases and tables if they do not already exist.

# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and SchemaRegistryProvider
include=base.properties
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=moma_mor
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields=nationality
hoodie.datasource.hive_sync.table=artists
hoodie.datasource.write.drop.partition.columns=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=nationality
hoodie.datasource.write.recordkey.field=artist_id
hoodie.deltastreamer.schemaprovider.registry.url=http://<your_registry_url:post>/apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_bucket>/topics/moma.public.artists/partition=0/
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
# https://dacort.dev/posts/updating-partition-values-with-apache-hudi/
# This is required if we want to ensure we upsert a record, even if the partition changes
hoodie.index.type=GLOBAL_BLOOM
# This is required to write the data into the new partition
# defaults to false in Apache Hudi 0.8.0 (EMR 6.4.0), true in Hudi 0.9.0 (EMR 6.5.0)
# hoodie.bloom.index.update.partition.path=true
DeltaStreamer properties file for artists data using MoR table type

Below, we see the equivalent DeltaStreamer properties file for the MoMA artworks, deltastreamer_artworks_apicurio_mor.properties. There are also comparable DeltaStreamer property files for the Hudi CoW tables on GitHub.

# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and SchemaRegistryProvider
include=base.properties
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=moma_mor
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields=classification
hoodie.datasource.hive_sync.table=artworks
hoodie.datasource.write.drop.partition.columns=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=classification
hoodie.datasource.write.recordkey.field=artwork_id
hoodie.deltastreamer.schemaprovider.registry.url=http://<your_registry_url:post>/apis/ccompat/v6/subjects/moma.public.artworks-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_bucket>/topics/moma.public.artworks/partition=0/
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
# https://dacort.dev/posts/updating-partition-values-with-apache-hudi/
# This is required if we want to ensure we upsert a record, even if the partition changes
hoodie.index.type=GLOBAL_BLOOM
# This is required to write the data into the new partition
# defaults to false in Apache Hudi 0.8.0 (EMR 6.4.0), true in Hudi 0.9.0 (EMR 6.5.0)
# hoodie.bloom.index.update.partition.path=true
DeltaStreamer properties file for artworks data using MoR table type

All DeltaStreamer property files reference Apicurio Registry for the location of the Avro schemas. The schemas are used by both the Kafka Avro-format messages and the CDC-created Avro-format files in Amazon S3. Due to DeltaStreamer’s coupling with Confluent Schema Registry, as opposed to other registries, we must use Apicurio Registry’s Confluent Schema Registry API (Version 6) compatibility API endpoints (e.g., /apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest) when using the org.apache.hudi.utilities.schema.SchemaRegistryProvider datasource option with DeltaStreamer. According to Apicurio, to provide compatibility with Confluent SerDes (Serializer/Deserializer) and other clients, Apicurio Registry implements the API defined by the Confluent Schema Registry.

Apicurio Registry exposes multiple APIs

Running DeltaStreamer

The properties files are loaded by Spark jobs that call the DeltaStreamer library, using spark-submit. Below, we see an example Spark job that calls the DeltaStreamer class. DeltaStreamer reads the raw Avro-format CDC data from S3 and writes the data using the Hudi MoR table type into the /moma_mor/artists/ S3 object prefix. In this Spark particular job, we are using the continuous option. DeltaStreamer runs in continuous mode using this option, running source-fetch, transform, and write in a loop. We are also using the UPSERT write operation (op). Operation options include UPSERT, INSERT, and BULK_INSERT. This set of options is ideal for inserting ongoing changes to CDC data into Hudi tables. You can run jobs in the foreground or background on EMR’s Master Node or as EMR Steps from the Amazon EMR console.

export DATA_LAKE_BUCKET="<your_data_lake_bucket_name>"
# artists data, MoR table type, continuous upserts
spark-submit \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–props "s3://${DATA_LAKE_BUCKET}/hudi/deltastreamer_artists_apicurio_mor.properties" \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–target-base-path "s3://${DATA_LAKE_BUCKET}/moma/artists_mor/" \
–target-table moma_mor.artists \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–enable-sync \
–continuous \
–op UPSERT

Below, we see another example DeltaStreamer Spark job that reads the raw Avro-format CDC data from S3 and writes the data using the MoR table type into the /moma_mor/artworks/ S3 object prefix. This example uses the BULK_INSERT write operation (op) and the filter-dupes option. The filter-dupes option ensures that should duplicate records from the source are dropped/filtered out before INSERT or BULK_INSERT. This set of options is ideal for the initial bulk inserting of existing data into Hudi tables. The job runs one time and completes, unlike the previous example that ran continuously.

export DATA_LAKE_BUCKET="<your_data_lake_bucket_name>"
# artworks data, MoR table type, 1x bulk insert
spark-submit \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–props "s3://${DATA_LAKE_BUCKET}/hudi/deltastreamer_artworks_apicurio_mor.properties" \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–target-base-path "s3://${DATA_LAKE_BUCKET}/moma/artworks_mor/" \
–target-table moma_mor.artworks \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–enable-sync \
–op BULK_INSERT \
–filter-dupes

Syncing with Hive

The following abridged, video-only clip demonstrates the differences between the Hudi CoW and MoR table types with respect to Apache Hive. In the video, we run the deltastreamer_jobs_bulk_bkgd.sh script, included on GitHub. This script runs four different Apache Spark jobs, using Hudi DeltaStreamer to bulk-ingest all the artists and artworks CDC data from Amazon S3 into both Hudi CoW and MoR table types. Once the four Spark jobs are complete, the script queries Apache Hive and displays the new Hive databases and database tables created by DeltaStreamer.

Hudi DeltaStreamer Spark jobs running on the Amazon EMR

In both the video above and terminal screengrab below, note the difference in the tables created within the two Hive databases, the Hudi CoW table type (moma_cow) and the MoR table type (moma_mor). The MoR table type creates both a read-optimized table (_ro) as well as a real-time table (_rt) for each datasource (e.g., artists_ro and artists_rt).

View of the Apache Hive CoW and MoR database tables

According to documentation, Hudi creates two tables in the Hive metastore for the MoR table type. The first, a table which is a read-optimized view appended with _ro and the second, a table with the same name appended with _rt which is a real-time view. According to Hudi, the read-optimized view exposes columnar Parquet while the real-time view exposes columnar Parquet and/or row-based logs; you can query both tables. The CoW table type creates a single table without a suffix for each datasource (e.g., artists). Below, we see the Hive table structure for the artists_rt table, created by DeltaStreamer, using SHOW CREATE TABLE moma_mor.artists_rt;.

CREATE EXTERNAL TABLE `moma_mor.artists_rt`(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`artist_id` int,
`name` string,
`gender` string,
`birth_year` int,
`death_year` int,
`__op` string,
`__db` string,
`__table` string,
`__schema` string,
`__lsn` bigint,
`__source_ts_ms` bigint,
`__deleted` string)
PARTITIONED BY (
`nationality` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'hoodie.query.as.ro.table'='false',
'path'='s3://<your_data_lake_bucket>/moma/artists_mor')
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://<your_data_lake_bucket>/moma/artists_mor'
TBLPROPERTIES (
'bucketing_version'='2',
'last_commit_time_sync'='20211230180429',
'spark.sql.partitionProvider'='catalog',
'spark.sql.sources.provider'='hudi',
'spark.sql.sources.schema.numPartCols'='1',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"artist_id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"gender","type":"string","nullable":true,"metadata":{}},{"name":"birth_year","type":"integer","nullable":true,"metadata":{}},{"name":"death_year","type":"integer","nullable":true,"metadata":{}},{"name":"__op","type":"string","nullable":true,"metadata":{}},{"name":"__db","type":"string","nullable":true,"metadata":{}},{"name":"__table","type":"string","nullable":true,"metadata":{}},{"name":"__schema","type":"string","nullable":true,"metadata":{}},{"name":"__lsn","type":"long","nullable":true,"metadata":{}},{"name":"__source_ts_ms","type":"long","nullable":true,"metadata":{}},{"name":"__deleted","type":"string","nullable":true,"metadata":{}},{"name":"nationality","type":"string","nullable":true,"metadata":{}}]}',
'spark.sql.sources.schema.partCol.0'='nationality',
'transient_lastDdlTime'='1640919578')

Having run the demonstration’s deltastreamer_jobs_bulk_bkgd.sh script, the resulting object structure in the Hudi-managed section of the Amazon S3 bucket looks as follows.

S3 object structure in Hudi-managed Amazon S3 bucket

Below is an example of Hudi files created in the /moma/artists_cow/ S3 object prefix. When using data lake table formats like Hudi, given its specialized directory structure and the high number of objects, interactions with the data should be abstracted through Hudi’s programming interfaces. Generally speaking, you do not interact directly with the objects in a data lake.

"moma/artists_cow/.hoodie/.aux/.bootstrap/.fileids_$folder$",
"moma/artists_cow/.hoodie/.aux/.bootstrap/.partitions_$folder$",
"moma/artists_cow/.hoodie/.aux/.bootstrap_$folder$",
"moma/artists_cow/.hoodie/.aux_$folder$",
"moma/artists_cow/.hoodie/.temp_$folder$",
"moma/artists_cow/.hoodie/20211231203737.commit",
"moma/artists_cow/.hoodie/20211231203737.commit.requested",
"moma/artists_cow/.hoodie/20211231203737.inflight",
"moma/artists_cow/.hoodie/20211231203738.rollback",
"moma/artists_cow/.hoodie/20211231203738.rollback.inflight",
"moma/artists_cow/.hoodie/archived_$folder$",
"moma/artists_cow/.hoodie/hoodie.properties",
"moma/artists_cow/.hoodie_$folder$",
"moma/artists_cow/nationality=Afghan/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Afghan/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-0_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Afghan_$folder$",
"moma/artists_cow/nationality=Albanian/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Albanian/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-1_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Albanian_$folder$",
"moma/artists_cow/nationality=Algerian/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Algerian/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-2_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Algerian_$folder$",
"moma/artists_cow/nationality=American/.hoodie_partition_metadata",
"moma/artists_cow/nationality=American/0065ed77-4a6c-4755-b133-45126310936d-0_502-28-3854_20211231203737.parquet",
"moma/artists_cow/nationality=American/011d5c57-c918-40d8-8518-c3cb56747133-0_15-28-3367_20211231203737.parquet"
Hudi CLI commands used in the next video

Hudi CLI

Optionally, we can inspect the Hudi tables using the Hudi CLI (hudi-cli). The CLI offers an extensive list of available commands. Using the CLI, we can inspect the Hudi tables and their schemas, and review operational statistics like write amplification (the number of bytes written for 1 byte of incoming data), commits, and compactions.

> hudi-cli
help
connect –path s3://<your_data_lake_bucket>/moma/artworks_mor/
connect –path s3://<your_data_lake_bucket>/moma/artworks_cow/
desc
fetch table schema
commits show
stats wa
compactions show all
Using the Hudi CLI from the Amazon EMR Master Node

The following short video-only clip shows the use of the Hudi CLI, running on the Amazon EMR Master Node, to inspect the Hudi tables in S3.

Using the Hudi CLI from the Amazon EMR Master Node

Hudi Data Structure

Recall the sample Kafka message we saw earlier in the post representing an insert of an artist record with the artist_id 1. Below, we see what the same record looks like after being ingested by Hudi DeltaStreamer. Note the five additional fields added by Hudi with the _hoodie_ prefix.

{
"_hoodie_commit_time": "20211227215352",
"_hoodie_commit_seqno": "20211227215352_63_7301",
"_hoodie_record_key": "1",
"_hoodie_partition_path": "nationality=American",
"_hoodie_file_name": "0e91bb5b-aa93-42a9-933d-242f5fda1b8f-0_63-24-4710_20211227215352.parquet",
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992,
"__op": "r",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3637434647944,
"__source_ts_ms": 1640566580452,
"__deleted": "false"
}

Querying Hudi-managed Data

With the initial data ingestion complete and the CDC and DeltaStreamer processes monitoring for future changes, we can query the resulting data stored in Hudi tables. First, we will make some changes to the PostgreSQL MoMA Collection database to see how Hudi manages the data mutations. We could also make changes directly to the Hudi tables using Hive, Spark, or Presto. However, that would cause our datasource to be out of sync with the Hudi tables, potentially negating the entire CDC process. When developing a data lake, this is a critically important consideration — how changes are introduced to Hudi tables, especially when CDC is involved, and whether data continuity between datasources and the data lake is essential.

For the demonstration, I have made a series of arbitrary updates to a piece of artwork in the MoMA Collection database, ‘Picador (La Pique)’ by Pablo Picasso.

'Picador (La Pique)', by Pablo Picasso
SELECT *
FROM artworks
WHERE artwork_id = 128447 AND classification = 'Print';
firts update (creation date)
UPDATE artworks
SET date = 1959
WHERE artwork_id = 128447;
second update (acquisition date)
UPDATE artworks
SET acquisition_date = '2009-04-15'
WHERE artwork_id = 128447;
third update (in vs. '')
UPDATE artworks
SET dimensions = 'composition: 20 13/16 x 25 3/16 in (52.9 x 64 cm); sheet: 24 7/16 x 29 1/2 in (62.1 x 75 cm)'
WHERE artwork_id = 128447;
fourth update (acquisition date)
UPDATE artworks
SET acquisition_date = '2009-04-19'
WHERE artwork_id = 128447;

Below, note the last four objects shown in S3. Judging by the file names and dates, we can see that the CDC process, using Kafka Connect, has picked up the four updates I made to the record in the database. The Source Connector first wrote the changes to Kafka. The Sink Connector then read those Kafka messages and wrote the data to Amazon S3 in Avro format, as shown below.

Looking again at S3, we can also observe that DeltaStreamer picked up the new CDC objects in Amazon S3 and wrote them to both the Hudi CoW and MoR tables. Note the file types shown below. Given Hudi’s MoR table type structure, Hudi first logged the changes to row-based delta files and later compacted them to produce a new version of the columnar-format Parquet file.

Hudi MoR row-based delta log files and compacted Parquet files

Querying Results from Apache Hive

There are several ways to query Hudi-managed data in S3. In this demonstration, they include against Apache Hive using the hive client from the command line, against Hive using Spark, and against the Hudi tables also using Spark. We could also install Presto on EMR to query the Hudi data directly or via Hive.

Querying the real-time artwork_rt table in Hive after we make each database change, we can observe the data in Hudi reflects the updates. Note that the value of the _hoodie_file_name field for the first three updates is a Hudi delta log file, while the value for the last update is a Parquet file. The Parquet file signifies compaction occurred between the fourth update was made, and the time the Hive query was executed. Lastly, note the type of operation (_op) indicates an update change (u) for all records.

Querying the data in the Hudi MoR real-time table as we make changes to the database

Once all fours database updates are complete and compaction has occurred, we should observe identical results from all Hive tables. Below, note the _hoodie_file_name field for all three tables is a Parquet file. Logically, the Parquet file for the MoR read-optimized and real-time Hive tables is the same.

Querying the same record in all three Hive tables: Hudi MoR _ro and _rt tables and CoW table

Had we queried the data previous to compaction, the results would have differed. Below we have three queries. I further updated the artwork record, changing the date field from 1959 to 1960. The read-optimized MoR table, artworks_ro, still reflects the original date value, 1959, before the update and prior to compaction. The real-time table,artworks_rt , reflects the latest update to the date field, 1960. Note that the value of the _hoodie_file_name field for the read-optimized table is a Parquet file, while the value for the real-time table (artworks_rt), the third and final query, is a delta log file. The delta log allows the real-time table to display the most current state of the data in Hudi.

Querying the same record in all three Hive tables

Below are a few useful Hive commands to query the changes in Hudi.

beeline or hive
beeline connect
!connect jdbc:hive2://localhost:10000/default
SHOW DATABASES;
DESCRIBE DATABASE moma_mor;
USE moma_cow;SHOW TABLES;
USE moma_mor;SHOW TABLES;
USE moma_mor;DESCRIBE artworks_ro;
MSCK REPAIR TABLE moma_mor.artworks_ro;
SHOW PARTITIONS moma_mor.artworks_ro;
ANALYZE TABLE moma_mor.artists_rt COMPUTE STATISTICS;
DESCRIBE EXTENDED moma_mor.artists_rt;
test query performance without caching
set hive.query.results.cache.enabled=false;
100 rows selected (1.394 seconds) <- read-optimized vs. real-time table
SELECT * FROM moma_mor.artworks_ro WHERE department='Prints & Illustrated Books' LIMIT 100;
100 rows selected (2.371 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE department='Prints & Illustrated Books' LIMIT 100;
10 rows selected (0.719 seconds) <- read-optimized vs. real-time table, classification is partitioned
SELECT * FROM moma_mor.artworks_ro WHERE classification='Print' LIMIT 10;
10 rows selected (1.482 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE classification='Print' LIMIT 10;
EXPLAIN EXTENDED SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447 AND classification='Print';
1 row selected (14.126 seconds) <- read-optimized vs. real-time table
SELECT * FROM moma_mor.artworks_ro WHERE artwork_id=128447;
1 row selected (32.877 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447;
1 row selected (1.491 seconds) <- classification is partitioned
SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447 AND classification='Print';
84 rows selected (8.618 seconds)
SELECT artworks.title AS title,
artworks.`date` AS created,
artworks.name AS artist,
artists.nationality AS nationality,
artworks.classification AS classification
FROM moma_cow.artworks artworks
JOIN moma_cow.artists artists ON (artworks.artist_id = artists.artist_id)
WHERE artworks.artist_id = 4609
AND nationality = 'Spanish'
AND classification = 'Print'
AND artworks.`date` IS NOT NULL
ORDER BY created, title;

Deletes with Hudi

In addition to inserts and updates (upserts), Apache Hudi can manage deletes. Hudi supports implementing two types of deletes on data stored in Hudi tables: soft deletes and hard deletes. Given this demonstration’s specific configuration for CDC and DeltaStreamer, we will use soft deletes. Soft deletes retain the record key and nullify the other field’s values. Hard deletes, a stronger form of deletion, physically remove any record trace from the Hudi table.

Below, we see the CDC record for the artist with artist_id 441. The event flattening single message transformation (SMT), used by the Debezium-based Kafka Connect Source Connector, adds the __deleted field with a value of true and nullifies all fields except the record’s key, artist_id, which is required.

{
"artist_id" : 441,
"name" : null,
"nationality" : null,
"gender" : null,
"birth_year" : null,
"death_year" : null,
"__op" : {
"string" : "d"
},
"__db" : {
"string" : "moma"
},
"__table" : {
"string" : "artists"
},
"__schema" : {
"string" : "public"
},
"__lsn" : {
"long" : 3692866569488
},
"__source_ts_ms" : {
"long" : 1640814436010
},
"__deleted" : {
"string" : "true"
}
}

Below, we see the same delete record for the artist with artist_id 441 in the Hudi MoR table. All the null fields have been removed.

{
"_hoodie_commit_time": "20211229225047",
"_hoodie_commit_seqno": "20211229225047_1_1",
"_hoodie_record_key": "441",
"_hoodie_partition_path": "nationality=default",
"_hoodie_file_name": "2a98931a-6015-438e-be78-1eff80a75f83-2_1-24-15431_20211229225047.parquet",
"artist_id": 441,
"__op": "d",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3692866569488,
"__source_ts_ms": 1640814436010,
"__deleted": "true"
}

Below, we see how the deleted record appears in the three Hive CoW and MoR artwork tables. Note the query results from the read-optimized MoR table, artworks_ro, contains two records — the original record (r) and the deleted record (d). The data is partitioned by nationality, and since the record was deleted, the nationality field is changed to null. In S3, Hudi represents this partition as nationality=default. The record now exists in two different Parquet files, within two separate partitions, something to be aware of when querying the read-optimized MoR table.

Results of a database delete as shown in Hive CoW and MoR tables

Time Travel

According to the documentation, Hudi has supported time travel queries since version 0.9.0. With time travel, you can query the previous state of your data. Time travel is particularly useful for use cases, including rollbacks, debugging, and audit history.

To demonstrate time travel queries in Hudi, we start by making some additional changes to the source database. For this demonstration, I made a series of five updates and finally a delete to the artist record with artist_id 299 in the PostgreSQL database over a few-hour period.

first update (birth)
UPDATE public.artists
SET birth_year = 1907
WHERE artist_id = 299;
second update (death)
UPDATE public.artists
SET death_year = 1989
WHERE artist_id = 299;
third update (middle initial)
UPDATE public.artists
SET name = 'Gerhard M. Bakker'
WHERE artist_id = 299;
fourth update (nationality – impacts partitions)
UPDATE public.artists
SET nationality = 'German'
WHERE artist_id = 299;
fifth update (birth)
UPDATE public.artists
SET birth_year = 1905
WHERE artist_id = 299;
delete
DELETE
FROM public.artists
WHERE artist_id = 299;

Once the CDC and DeltaStreamer ingestion processes are complete, we can use Hudi’s time travel query capability to view the state of data in Hudi at different points in time (instants). To do so, we need to provide an as.an.instant date/time value to Spark (see line 21 below).

Based on the time period in which I made the five updates and the delete, I have chosen six instants during that period where I want to examine the state of the record. Below is an example of the PySpark code from a Jupyter Notebook used to perform the six time travel queries against the Hudi MoR artist’s table.

from datetime import timedelta
from dateutil import parser
base_path = "s3://open-data-lake-demo-us-east-1/moma/artists_mor"
instances = [ # times in EST
"2021-12-30 08:00:00", # reflects original record (r)
"2021-12-30 09:00:00", # refects updates 1 and 2 (u)
"2021-12-30 09:30:00", # refects updates 3 (u)
"2021-12-30 11:00:00", # refects updates 4 (u)
"2021-12-30 12:30:00", # refects updates 5 (u)
"2021-12-30 14:00:00", # refects delete (d)
]
for instant in instants:
as_of_instant = parser.parse(instant) + timedelta(hours=5) # adjust EST for UTC
print(f"Record state as of: {as_of_instant}")
artistsSnapshotDF = (
spark.read.format("hudi").option("as.of.instant", as_of_instant).load(base_path)
)
artistsSnapshotDF.createOrReplaceTempView("hudi_artists_snapshot")
spark.sql(
"""
SELECT _hoodie_commit_time, __op, _hoodie_partition_path, name, nationality, gender, birth_year, death_year
FROM hudi_artists_snapshot
WHERE artist_id=299;
"""
).show()

Below, we see the results of the time travel queries. At each instant, we can observe the mutating state of the data in the Hudi MoR Artist’s table, including the initial bulk insert of the existing snapshot of data (r) and the delete record (d). Since the delete made in the PostgreSQL database was recorded as a soft delete in Hudi, as opposed to a hard delete, we are still able to retrieve the record at any instant.

Record state as of: 2021-12-30 13:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230034812| r| nationality=American|Gerhard H. Bakker| American| Male| 1906| 1988|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 14:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230132628| u| nationality=American|Gerhard H. Bakker| American| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 14:30:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230142035| u| nationality=American|Gerhard M. Bakker| American| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 16:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230144237| u| nationality=German|Gerhard M. Bakker| German| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 17:30:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230171925| u| nationality=German|Gerhard M. Bakker| German| Male| 1905| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 19:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230180429| d| nationality=default| null| null| null| null| null|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Results of the time travel queries, ordered by commit time

In addition to time travel queries, Hudi also offers incremental queries and point in time queries.

Conclusion

Although this post only scratches the surface of the capabilities of Debezium and Hudi, you can see the power of CDC using Kafka Connect and Debezium, combined with Hudi, to build and manage open data lakes on AWS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

DevOps for DataOps: Building a CI/CD Pipeline for Apache Airflow DAGs

Build an effective CI/CD pipeline to test and deploy your Apache Airflow DAGs to Amazon MWAA using GitHub Actions

Introduction

In this post, we will learn how to use GitHub Actions to build an effective CI/CD workflow for our Apache Airflow DAGs. We will use the DevOps concepts of Continuous Integration and Continuous Delivery to automate the testing and deployment of Airflow DAGs to Amazon Managed Workflows for Apache Airflow (Amazon MWAA) on AWS.

Fork and pull model of collaborative Airflow development used in this post

Technologies

Apache Airflow

According to the documentation, Apache Airflow is an open-source platform to author, schedule, and monitor workflows programmatically. With Airflow, you author workflows as Directed Acyclic Graphs (DAGs) of tasks written in Python.

Amazon Managed Workflows for Apache Airflow

According to AWS, Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a highly available, secure, and fully-managed workflow orchestration for Apache Airflow. MWAA automatically scales its workflow execution capacity to meet your needs and is integrated with AWS security services to help provide fast and secure access to data.

Example of Apache Airflow UI within Amazon MWAA Environment

GitHub Actions

According to GitHub, GitHub Actions makes it easy to automate software workflows with CI/CD. GitHub Actions allow you to build, test, and deploy code right from GitHub. GitHub Actions are workflows triggered by GitHub events like push, issue creation, or a new release. You can leverage GitHub Actions prebuilt and maintained by the community.

Example of GitHub Action workflow running in the GitHub repository used in this post

If you are new to GitHub Actions, I recommend my previous post, Continuous Integration and Deployment of Docker Images using GitHub Actions.

Terminology

DataOps

According to Wikipedia, DataOps is an automated, process-oriented methodology used by analytic and data teams to improve the quality and reduce the cycle time of data analytics. While DataOps began as a set of best practices, it has now matured to become a new approach to data analytics.

DataOps applies to the entire data lifecycle from data preparation to reporting and recognizes the interconnected nature of the data analytics team and IT operations. DataOps incorporates the Agile methodology to shorten the software development life cycle (SDLC) of analytics development.

DevOps

According to Wikipedia, DevOps is a set of practices that combines software development (Dev) and IT operations (Ops). It aims to shorten the systems development life cycle and provide continuous delivery with high software quality.

DevOps is a set of practices intended to reduce the time between committing a change to a system and the change being placed into normal production, while ensuring high quality. -Wikipedia

Fail Fast

According to Wikipedia, a fail-fast system is one that immediately reports any condition that is likely to indicate a failure. Using the DevOps concept of fail fast, we build steps into our workflows to uncover errors sooner in the SDLC. We shift testing as far to the left as possible (referring to the pipeline of steps moving from left to right) and test at multiple points along the way.

Source Code

All source code for this demonstration, including the GitHub Actions, Pytest unit tests, and Git Hooks, is open-sourced and located on GitHub.

Architecture

The diagram below represents the architecture for a recent blog post and video demonstration, Lakehouse Automation on AWS with Apache Airflow. The post and video show how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based data lake using Apache Airflow.

Architecture for the post and video, Lakehouse Automation on AWS with Apache Airflow

In this post, we will review how the DAGs from the previous were developed, tested, and deployed to MWAA using a variety of progressively more effective CI/CD workflows. The workflows demonstrated could also be easily applied to other Airflow resources in addition to DAGs, such as SQL scripts, configuration and data files, Python requirement files, and plugins.

Workflows

No DevOps

Below we see a minimally viable workflow for loading DAGs into Amazon MWAA, which does not use the principles of CI/CD. Changes are made in the local Airflow developer’s environment. The modified DAGs are copied directly to the Amazon S3 bucket, which are then automatically synced with Amazon MWAA, barring any errors. Those changes are also (hopefully) pushed back to the centralized version control or source code management (SCM) system, which is GitHub in this post.

Error-prone, non-DevOps workflow for modifying and syncing DAGs to MWAA

There are at least two significant issues with this error-prone workflow. First, the DAGs are always out of sync between the Amazon S3 bucket and GitHub. These are two independent steps — copying or syncing the DAGs to S3 and pushing the DAGs to GitHub. A developer might continue making changes and pushing DAGs to S3 without pushing to GitHub or vice versa.

Secondly, the DevOps concept of fail-fast is missing. The first time you know your DAG contains errors is likely when it is synced to MWAA and throws an Import Error. By then, the DAG has already been copied to S3, synced to MWAA, and possibly pushed to GitHub, which other developers could then pull.

Example of a typical DAG Import Error, easily caught with a simple test

GitHub Actions

A significant step up from the previous workflow is using GitHub Actions to test and deploy your code after pushing it to GitHub. Although in this workflow, code is still ‘pushed straight to Trunk’ (the main branch in GitHub) and risks other developers in a collaborative environment pulling potentially erroneous code, you have far less chance of DAG errors making it to MWAA.

GitHub Actions allow you to fail faster and catch errors sooner

Using GitHub Actions, you also eliminate human error that could result in the changes to DAGs not being synced to Amazon S3. Lastly, using this workflow improves security by eliminating the need to provide direct access to the Airflow Amazon S3 bucket to Airflow Developers.

Fork and pull model of collaborative Airflow development used in this post (video only)

Types of Tests

The first GitHub Action, test_dags.yml, is triggered on a push to the dags directory in the main branch of the repository. It is also triggered whenever a pull request is made for the main branch. The first GitHub Action runs a battery of tests, including checking Python dependencies, code style, code quality, DAG import errors, and unit tests. The tests catch issues with DAGs before being synced to S3 by a second GitHub Action.

name: Test DAGs

on:
  push:
    paths:
      - 'dags/**'
  pull_request:
    branches:
      - main

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.7'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements/requirements.txt
        pip check
    - name: Lint with Flake8
      run: |
        pip install flake8
        flake8 --ignore E501 dags --benchmark -v
    - name: Confirm Black code compliance (psf/black)
      run: |
        pip install pytest-black
        pytest dags --black -v
    - name: Test with Pytest
      run: |
        pip install pytest
        cd tests || exit
        pytest tests.py -v

Successful runs of the ‘Test DAGs’ GitHub Action, shown in the Actions Console

Python Dependencies

The first test installs the modules listed in the requirements.txt file used locally to develop the application. This test is designed to uncover any missing or conflicting modules.

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements/requirements.txt
pip check

It is essential to develop your DAGs against the same version of Python and with the same version of the Python modules used in your Airflow environment. You can use the BashOperator to run shell commands to obtain the versions of Python and module installed in your Airflow environment:

python3 --version; python3 -m pip list

A snippet of log output from DAG showing Python version and Python modules available in MWAA 2.0.2:

Python version and Python modules available in MWAA 2.0.2

The latest stable release of Airflow is currently version 2.2.2, released 2021-11-15. However, as of December 2021, Amazon’s latest version of MWAA 2.x is version 2.0.2, released 2021-04-19. MWAA 2.0.2 currently runs Python3 version 3.7.10.

Available versions of Amazon MWAA as of December 2021

Flake8

Known as ‘your tool for style guide enforcement,’ Flake8 is described as the modular source code checker. It is a command-line utility for enforcing style consistency across Python projects. Flake8 is a wrapper around PyFlakes, pycodestyle, and Ned Batchelder’s McCabe script. The module, pycodestyle, is a tool to check your Python code against some of the style conventions in PEP 8.

Flake8 is highly configurable, with options to ignore specific rules if not required by your development team. For example, in this demonstration, I intentionally ignored rule E501, which states that ‘line length should be limited to 72 characters.

- name: Lint with Flake8
run: |
pip install flake8
flake8 --ignore E501 dags --benchmark -v

Black

Known as ‘the uncompromising code formatter,’ Python code formatted using Black (referred to as Blackened code) looks the same regardless of the project you’re reading. Formatting becomes transparent, allowing teams to focus on the content instead. Black makes code review faster by producing the smallest diffs possible, assuming all developers are using black to format their code.

The Airflow DAGs in this GitHub repository are automatically formatted with black using a pre-commit Git Hooks before being committed and pushed to GitHub. The test confirms black code compliance.

- name: Confirm Black code compliance (psf/black)
run: |
pip install pytest-black
pytest dags --black -v

Pytest

The pytest framework describes itself as a mature, fully-featured Python testing tool that helps you write better programs. The Pytest framework makes it easy to write small tests yet scales to support complex functional testing for applications and libraries.

The GitHub Action in the GitHub project, test_dags.yml, calls the tests.py file, also contained in the project.

- name: Test with Pytest
run: |
pip install pytest
cd tests || exit
pytest tests.py -v

The tests.py file contains several pytest unit tests. The tests are based on my project requirements; your tests will vary. These tests confirm that all DAGs:

  1. Do not contain DAG Import Errors (test catches 75% of my errors);
  2. Follow specific file naming conventions;
  3. Include a description and an owner other than ‘airflow’;
  4. Contain required project tags;
  5. Do not send emails (my projects use SNS or Slack for notifications);
  6. Do not retry more than three times;
import os
import sys
import pytest
from airflow.models import DagBag
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags"))
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags/utilities"))
# Airflow variables called from DAGs under test are stubbed out
os.environ["AIRFLOW_VAR_DATA_LAKE_BUCKET"] = "test_bucket"
os.environ["AIRFLOW_VAR_ATHENA_QUERY_RESULTS"] = "SELECT 1;"
os.environ["AIRFLOW_VAR_SNS_TOPIC"] = "test_topic"
os.environ["AIRFLOW_VAR_REDSHIFT_UNLOAD_IAM_ROLE"] = "test_role_1"
os.environ["AIRFLOW_VAR_GLUE_CRAWLER_IAM_ROLE"] = "test_role_2"
@pytest.fixture(params=["../dags/"])
def dag_bag(request):
return DagBag(dag_folder=request.param, include_examples=False)
def test_no_import_errors(dag_bag):
assert not dag_bag.import_errors
def test_requires_tags(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.tags
def test_requires_specific_tag(dag_bag):
for dag_id, dag in dag_bag.dags.items():
try:
assert dag.tags.index("data lake demo") >= 0
except ValueError:
assert dag.tags.index("redshift demo") >= 0
def test_desc_len_greater_than_fifteen(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.description) > 15
def test_owner_len_greater_than_five(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.owner) > 5
def test_owner_not_airflow(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag.owner) != "airflow"
def test_no_emails_on_retry(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_retry"]
def test_no_emails_on_failure(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_failure"]
def test_three_or_less_retries(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.default_args["retries"] <= 3
def test_dag_id_contains_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).find("__") != -1
def test_dag_id_requires_specific_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).startswith("data_lake__") \
or str.lower(dag_id).startswith("redshift_demo__")

If you are building custom Airflow Operators, additional unit, functional, and integration tests are recommended.

Fork and Pull

We can improve on the practice of pushing directly to Trunk by implementing one of two collaborative development models, recommended by GitHub:

  1. The Shared repository model: uses ‘topic’ branches, which are reviewed, approved, and merged into the main branch.
  2. Fork and pull model: a repo is forked, changes are made, a pull request is created, the request is reviewed, and if approved, merged into the main branch.

In the fork and pull model, we create a fork of the DAG repository where we make our changes. We then commit and push those changes back to the forked repository. When ready, we create a pull request. If the pull request is approved and passes all the tests, it is manually or automatically merged into the main branch. DAGs are then synced to S3 and, eventually, to MWAA. I usually prefer to trigger merges manually once all tests have passed.

The fork and pull model greatly reduces the chance that bad code is merged to the main branch before passing all tests.

Errors are caught early in the fork and pull model prior to merging code changes

Syncing DAGs to S3

The second GitHub Action in the GitHub project, sync_dags.yml, is triggered when the previous Action, test_dags.yml, completes successfully, or in the case of the folk and pull method, the merge to the main branch is successful.

name: Sync DAGs

on:
workflow_run:
workflows:
- 'Test DAGs'
types:
- completed
pull_request:
types:
- closed

jobs:
deploy:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@master
- uses: jakejarvis/s3-sync-action@master
env:
AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: 'us-east-1'
SOURCE_DIR: 'dags'
DEST_DIR: 'dags'

The GitHub Action, sync_dags.yml, requires three GitHub encrypted secrets, created in advance and associated with the GitHub repository. According to GitHub, secrets are encrypted environment variables you create in an organization, repository, or repository environment. Encrypted secrets allow you to store sensitive information, such as access tokens, in your repository. The secrets that you create are available to use in GitHub Actions workflows.

Encrypted repository secrets used by GitHub Action to sync with Amazon S3

The DAGs are synced to Amazon S3 and, eventually, automatically synced to MWAA.

GitHub Action syncs DAGs to Amazon S3 if tests are successful

Local Testing and Git Hooks

To further improve your CI/CD workflows, you should consider using Git Hooks. Using Git Hooks, we can ensure code is tested locally before committing and pushing changes to GitHub. Testing locally allows us to fail-faster, catching errors during development instead of once code is pushed to GitHub.

Errors are caught even early using Git Hooks

According to the documentation, Git has a way to fire off custom scripts when certain important actions occur. There are two types of hooks: client-side and server-side. Client-side hooks are triggered by operations such as committing and merging, while server-side hooks run on network operations such as receiving pushed commits.

You can use these hooks for all sorts of reasons. I often use a client-side pre-commit hook to format DAGs using black. Using a client-side pre-push Git Hook, we will ensure that tests are run before pushing the DAGs to GitHub. According to Git, The pre-push hook runs when the git push command is executed after the remote refs have been updated but before any objects have been transferred. You can use it to validate a set of ref updates before a push occurs. A non-zero exit code will abort the push. The test could instead be run as part of the pre-commit hook if they are not too time-consuming.

To use the pre-push hook, create the following file within the local repository, .git/hooks/pre-push:

#!/bin/sh
# do nothing if there are no commits to push
if [ -z "$(git log @{u}..)" ]; then
exit 0
fi
sh ./run_tests_locally.sh

Then, run the following chmod command to make the hook executable:

chmod 755 .git/hooks/pre-push

The the pre-push hook runs the shell script, run_tests_locally.sh. The script executes nearly identical tests, locally, as the GitHub Action, test_dags.yml, does remotely on GitHub:

#!/bin/sh
echo "Starting Flake8 test..."
flake8 --ignore E501 dags --benchmark || exit 1
echo "Starting Black test..."
python3 -m pytest --cache-clear
python3 -m pytest dags/ --black -v || exit 1
echo "Starting Pytest tests..."
cd tests || exit
python3 -m pytest tests.py -v || exit 1
echo "All tests completed successfully! 🥳"

Complete CI/CD workflow including running tests locally using a Git Hook (video only)

References

Here are some additional references for testing and deploying Airflow DAGs and the use of GitHub Actions:

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Video Demonstration: Lakehouse Automation on AWS with Apache Airflow

Programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow

Introduction

In the following video demonstration, we will learn how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow. Since we are on AWS, we will be using the fully-managed Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Using Airflow, we will COPY raw data into staging tables, then merge that staging data into a series of tables. We will then load incremental data into Redshift on a regular schedule. Next, we will join and aggregate data from several tables and UNLOAD the resulting dataset to an Amazon S3-based data lake. Lastly, we will catalog the data in S3 using AWS Glue and query with Amazon Athena.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL statements, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs included in the GitHub project are:

Demonstration DAGs as seen in MWAA Airflow UI

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Video Demonstration: Building a Data Lake with Apache Airflow

Build a simple Data Lake on AWS using a combination of services, including Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue, AWS Glue Studio, Amazon Athena, and Amazon S3

Introduction

In the following video demonstration, we will build a simple data lake on AWS using a combination of services, including Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue Data Catalog, AWS Glue Crawlers, AWS Glue Jobs, AWS Glue Studio, Amazon Athena, Amazon Relational Database Service (Amazon RDS), and Amazon S3.

Using a series of Airflow DAGs (Directed Acyclic Graphs), we will catalog and move data from three separate data sources into our Amazon S3-based data lake. Once in the data lake, we will perform ETL (or more accurately ELT) on the raw data — cleansing, augmenting, and preparing it for data analytics. Finally, we will perform aggregations on the refined data and write those final datasets back to our data lake. The data lake will be organized around the data lake pattern of bronze (aka raw), silver (aka refined), and gold (aka aggregated) data, popularized by Databricks.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL files, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs shown in the video demonstration have been renamed for easier project management within the Airflow UI. The DAGs included in the GitHub project are as follows:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Video Demonstration: Building a Data Lake on AWS

Build a simple Data Lake on AWS using a combination of services, including AWS Glue, AWS Glue Studio, Amazon Athena, and Amazon S3

Introduction

In the following video demonstration, we will build a simple data lake on AWS using a combination of services, including AWS Glue Data Catalog, AWS Glue Crawlers, AWS Glue Jobs, AWS Glue Studio, Amazon Athena, Amazon Relational Database Service (Amazon RDS), and Amazon S3.

We will catalog and move data from three separate data sources into our Amazon S3-based data lake. Once in the data lake, we will perform ETL (or more accurately ELT) on the raw data — cleansing, augmenting, and preparing it for data analytics. Finally, we will perform aggregations on the refined data and write those final datasets back to our data lake. The data lake will be organized around the data lake pattern of bronze (aka raw), silver (aka refined), and gold (aka aggregated) data, popularized by Databricks.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the SQL statements, is open-sourced and located on GitHub.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Video Demonstration: Ahana Cloud for Presto on AWS using Apache Hive and AWS Glue

Using Ahana Cloud for Presto to perform analytics on AWS using both Apache Hive and AWS Glue as metastores

Introduction

The following series of five videos are an extended version of the demonstration featured in the October 2021 webinar, Build an Open Data Lake on AWS with Presto. An on-demand copy of the live webinar is available on Ahana.io, featuring Dipti Borkar (Ahana Co-Founder and CPO) and I.

In the demonstration, we will build a data lake on AWS using a combination of Ahana Cloud for Presto, Apache Hive, Apache Superset, Amazon S3, AWS Glue, and Amazon Athena. We then analyze the data in Apache Superset using Ahana Cloud for Presto.

Build an Open Data Lake on AWS with Presto

Demonstration

The demonstration is divided into five YouTube videos (playlist):

Ahana Cloud for Presto Demo — Part 1/5: Public GitHub Resources

Ahana Cloud for Presto Demo — Part 2/5: MoMa Datasource

Ahana Cloud for Presto Demo — Part 3/5: Ahana SaaS

Ahana Cloud for Presto Demo — Part 4/5: AWS Glue & Amazon

Ahana Cloud for Presto Demo — Part 5/5: PrestoDB & Apache Hive

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. In the webinar and the videos, the Apache Hive and AWS Glue data catalog tables contain an _athena or _presto suffix. For clarity, in the source code, I have changed those to indicate the metastore they are associated with, _hive or _glue, since either set of tables can be queried Presto. Additionally, in the webinar and the videos, the raw data files were uploaded to Amazon S3 in uncompressed CSV format; this is unnecessary. The CTAS SQL statements both expect GZIP-compressed CSV files. To save time and cost, upload the compressed files, as they are, to Amazon S3.

The following files are used in the demonstration:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , , , ,

Leave a comment

Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR

Exploring Apache Spark with Apache Kafka using both batch queries and Spark Structured Streaming

Introduction

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. Using Structured Streaming, you can express your streaming computation the same way you would express a batch computation on static data. In this post, we will learn how to use Apache Spark and Spark Structured Streaming with Apache Kafka. Specifically, we will utilize Structured Streaming on Amazon EMR (fka Amazon Elastic MapReduce) with Amazon Managed Streaming for Apache Kafka (Amazon MSK). We will consume from and publish to Kafka using both batch and streaming queries. Spark jobs will be written in Python with PySpark for this post.

High-level AWS architecture for this post’s demonstration

Apache Spark

According to the documentation, Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python (PySpark), and R, and an optimized engine that supports general execution graphs. In addition, Spark supports a rich set of higher-level tools, including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Apache Spark and PySpark versus Apache Hive and Presto interest over time, according to Google Trends

Spark Structured Streaming

According to the documentation, Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end, exactly-once stream processing without the user having to reason about streaming.

Amazon EMR

According to the documentation, Amazon EMR (fka Amazon Elastic MapReduce) is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hadoop, Hive, HBase, Flink, and Hudi, and Presto. Amazon EMR is a fully managed AWS service that makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters.

A deployment option for Amazon EMR since December 2020, Amazon EMR on EKS, allows you to run Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). With the EKS deployment option, you can focus on running analytics workloads while Amazon EMR on EKS builds, configures, and manages containers for open-source applications.

If you are new to Amazon EMR for Spark, specifically PySpark, I recommend an earlier two-part series of posts, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

Apache Kafka

According to the documentation, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Amazon MSK

Apache Kafka clusters are challenging to set up, scale, and manage in production. According to the documentation, Amazon MSK is a fully managed AWS service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning and analytics applications.

Prerequisites

This post will focus primarily on configuring and running Apache Spark jobs on Amazon EMR. To follow along, you will need the following resources deployed and configured on AWS:

  1. Amazon S3 bucket (holds Spark resources and output);
  2. Amazon MSK cluster (using IAM Access Control);
  3. Amazon EKS container or an EC2 instance with the Kafka APIs installed and capable of connecting to Amazon MSK;
  4. Connectivity between the Amazon EKS cluster or EC2 and Amazon MSK cluster;
  5. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true; this setting is false by default;

As shown in the architectural diagram above, the demonstration uses three separate VPCs within the same AWS account and AWS Region, us-east-1, for Amazon EMR, Amazon MSK, and Amazon EKS. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports and the corresponding CIDR ranges within your Amazon EMR, Amazon MSK, and Amazon EKS Security Groups. For additional security and cost savings, use a VPC endpoint for private communications between Amazon EMR and Amazon S3.

Source Code

All source code for this post and the two previous posts in the Amazon MSK series, including the Python/PySpark scripts demonstrated here, are open-sourced and located on GitHub.

PySpark Scripts

According to the Apache Spark documentation, PySpark is an interface for Apache Spark in Python. It allows you to write Spark applications using Python API. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core.

There are nine Python/PySpark scripts covered in this post:

  1. Initial sales data published to Kafka
    01_seed_sales_kafka.py
  2. Batch query of Kafka
    02_batch_read_kafka.py
  3. Streaming query of Kafka using grouped aggregation
    03_streaming_read_kafka_console.py
  4. Streaming query using sliding event-time window
    04_streaming_read_kafka_console_window.py
  5. Incremental sales data published to Kafka
    05_incremental_sales_kafka.py
  6. Streaming query from/to Kafka using grouped aggregation
    06_streaming_read_kafka_kafka.py
  7. Batch query of streaming query results in Kafka
    07_batch_read_kafka.py
  8. Streaming query using static join and sliding window
    08_streaming_read_kafka_join_window.py
  9. Streaming query using static join and grouped aggregation 
    09_streaming_read_kafka_join.py

Amazon MSK Authentication and Authorization

Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For this post, the PySpark scripts use Kafka connection properties specific to IAM Access Control. You can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In a recent post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK:Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.

Language Choice

According to the latest Spark 3.1.2 documentation, Spark runs on Java 8/11, Scala 2.12, Python 3.6+, and R 3.5+. The Spark documentation contains code examples written in all four languages and provides sample code on GitHub for Scala, Java, Python, and R. Spark is written in Scala.

Spark language interest over time, according to Google Trends

There are countless posts and industry opinions on choosing the best language for Spark. Taking no sides, I have selected the language I use most frequently for data analytics, Python using PySpark. Compared to Scala, these two languages exhibit some of the significant differences: compiled versus interpreted, statically-typed versus dynamically-typed, JVM- versus non-JVM-based, Scala’s support for concurrency and true multi-threading, and Scala’s 10x raw performance versus the perceived ease-of-use, larger community, and relative maturity of Python.

Preparation

Amazon S3

We will start by gathering and copying the necessary files to your Amazon S3 bucket. The bucket will serve as the location for the Amazon EMR bootstrap script, additional JAR files required by Spark, PySpark scripts, CSV-format data files, and eventual output from the Spark jobs.

There are a small set of additional JAR files required by the Spark jobs we will be running. Download the JARs from Maven Central and GitHub, and place them in the emr_jars project directory. The JARs will include AWS MSK IAM Auth, AWS SDK, Kafka Client, Spark SQL for Kafka, Spark Streaming, and other dependencies.

cd ./pyspark/emr_jars/
wget https://github.com/aws/aws-msk-iam-auth/releases/download/1.1.0/aws-msk-iam-auth-1.1.0-all.jar
wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.28/bundle-2.17.28.jar
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.0/kafka-clients-2.8.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.12/3.1.2/spark-streaming_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.1.2/spark-tags_2.12-3.1.2.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.1.2/spark-token-provider-kafka-0-10_2.12-3.1.2.jar

Next, update the SPARK_BUCKET environment variable, then upload the JARs and all necessary project files from your copy of the GitHub project repository to your Amazon S3 bucket using the AWS s3 API.

cd ./pyspark/
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"

aws s3 cp emr_jars/ \
"s3://${SPARK_BUCKET}/jars/" --recursive
aws s3 cp pyspark_scripts/ \
"s3://${SPARK_BUCKET}/spark/" --recursive
aws s3 cp emr_bootstrap/ \
"s3://${SPARK_BUCKET}/spark/" --recursive
aws s3 cp data/ \
"s3://${SPARK_BUCKET}/spark/" --recursive

Amazon EMR

The GitHub project repository includes a sample AWS CloudFormation template and an associated JSON-format CloudFormation parameters file. The template, stack.yml, accepts several parameters. To match your environment, you will need to update the parameter values such as SSK key, Subnet, and S3 bucket. The template will build a minimally-sized Amazon EMR cluster with one master and two core nodes in an existing VPC. The template can be easily modified to meet your requirements and budget.

aws cloudformation deploy \
--stack-name spark-kafka-demo-dev \
--template-file ./cloudformation/stack.yml \
--parameter-overrides file://cloudformation/dev.json \
--capabilities CAPABILITY_NAMED_IAM

Whether you decide to use the CloudFormation template, two essential Spark configuration items in the EMR template are the list of applications to install and the bootstrap script deployment.

Applications:
Name: 'Hadoop'
Name: 'Spark'
Name: 'JupyterEnterpriseGateway'
Name: 'Livy'
BootstrapActions:
Name: bootstrap-script
ScriptBootstrapAction:
Path: !Join [ '', [ 's3://', !Ref ProjectBucket, '/spark/bootstrap_actions.sh' ] ]

Below, we see the EMR bootstrap shell script, bootstrap_actions.sh, deployed and executed on the cluster’s nodes.

#!/bin/bash
# Purpose: EMR bootstrap script
# Author: Gary A. Stafford
# Date: 2021-09-10
# arg passed in by CloudFormation
if [ $# -eq 0 ]
then
echo "No arguments supplied"
fi
SPARK_BUCKET=$1
# update yum packages, install jq
sudo yum update -y
sudo yum install -y jq
# jsk truststore for connecting to msk
sudo cp /usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/security/cacerts \
/tmp/kafka.client.truststore.jks
# set region for boto3
aws configure set region \
"$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)"
# install python packages for pyspark scripts
sudo python3 -m pip install boto3 botocore ec2-metadata
# install required jars for spark
sudo aws s3 cp \
"s3://${SPARK_BUCKET}/jars/" /usr/lib/spark/jars/ \
–recursive –exclude "*" –include "*.jar"

The script performed several tasks, including deploying the additional JAR files we copied to Amazon S3 earlier.

Amazon EMR cluster bootstrap actions tab

AWS Systems Manager Parameter Store

The PySpark scripts in this demonstration will obtain two parameters from the AWS Systems Manager (AWS SSM) Parameter Store. They include the Amazon MSK bootstrap brokers and the Amazon S3 bucket that contains the Spark assets. Using the Parameter Store ensures that no sensitive or environment-specific configuration is hard-coded into the PySpark scripts. Modify and execute the ssm_params.sh script to create two AWS SSM Parameter Store parameters.

aws ssm put-parameter \
--name /kafka_spark_demo/kafka_servers \
--type String \
--value "<b-1.your-brokers.kafka.us-east-1.amazonaws.com:9098,b-2.your-brokers.kafka.us-east-1.amazonaws.com:9098>" \
--description "Amazon MSK Kafka broker list" \
--overwrite

aws ssm put-parameter \
--name /kafka_spark_demo/kafka_demo_bucket \
--type String \
--value "<your-bucket-111222333444-us-east-1>" \
--description "Amazon S3 bucket" \
--overwrite

Spark Submit Options with Amazon EMR

Amazon EMR provides multiple options to run Spark jobs. The recommended method for PySpark scripts is to use Amazon EMR Steps from the EMR console or AWS CLI to submit work to Spark installed on an EMR cluster. In the console and CLI, you do this using a Spark application step, which runs the spark-submit script as a step on your behalf. With the API, you use a Step to invoke spark-submit using command-runner.jar. Alternately, you can SSH into the EMR cluster’s master node and run spark-submit. We will employ both techniques to run the PySpark jobs.

Securely Accessing Amazon MSK from Spark

Each of the PySpark scripts demonstrated in this post uses a common pattern for accessing Amazon MSK from Amazon EMR using IAM Authentication. Whether producing or consuming messages from Kafka, the same security-related options are used to configure Spark (starting at line 10, below). The details behind each option are outlined in the Security section of the Spark Structured Streaming + Kafka Integration Guide and the Configure clients for IAM access control section of the Amazon MSK IAM access control documentation.

Data Source and Analysis Objective

For this post, we will continue to use data from PostgreSQL’s sample Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less than ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytical query costs.

payment_id customer_id amount payment_date city district country
16940 130 5.99 2021-05-08 21:21:56.996577 +00:00 guas Lindas de Gois Gois Brazil
16406 459 5.99 2021-05-08 21:22:59.996577 +00:00 Qomsheh Esfahan Iran
16315 408 6.99 2021-05-08 21:32:05.996577 +00:00 Jaffna Northern Sri Lanka
16185 333 7.99 2021-05-08 21:33:07.996577 +00:00 Baku Baki Azerbaijan
17097 222 9.99 2021-05-08 21:33:47.996577 +00:00 Jaroslavl Jaroslavl Russian Federation
16579 549 3.99 2021-05-08 21:36:33.996577 +00:00 Santiago de Compostela Galicia Spain
16050 269 4.99 2021-05-08 21:40:19.996577 +00:00 Salinas California United States
17126 239 7.99 2021-05-08 22:00:12.996577 +00:00 Ciomas West Java Indonesia
16933 126 7.99 2021-05-08 22:29:06.996577 +00:00 Po So Paulo Brazil
16297 399 8.99 2021-05-08 22:30:47.996577 +00:00 Okara Punjab Pakistan
view raw sales_seed.csv hosted with ❤ by GitHub

According to mastersindatascience.org, data analytics is “…the process of analyzing raw data to find trends and answer questions…” Using Spark, we can analyze the movie rental sales data as a batch or in near-real-time using Structured Streaming to answer different questions. For example, using batch computations on static data, we could answer the question, how do the current total all-time sales for France compare to the rest of Europe? Or, what were the total sales for India during August? Using streaming computations, we can answer questions like, what are the sales volumes for the United States during this current four-hour marketing promotional period? Or, are sales to North America beginning to slow as the Olympics are aired during prime time?

Data analytics — the process of analyzing raw data to find trends and answer questions. (mastersindatascience.org)

Batch Queries

Before exploring the more advanced topic of streaming computations with Spark Structured Streaming, let’s first use a simple batch query and a batch computation to consume messages from the Kafka topic, perform a basic aggregation, and write the output to both the console and Amazon S3.

PySpark Job 1: Initial Sales Data

Kafka supports Protocol Buffers, JSON Schema, and Avro. However, to keep things simple in this first post, we will use JSON. We will seed a new Kafka topic with an initial batch of 250 JSON-format messages. This first batch of messages represents previous online movie rental sale transaction records. We will use these sales transactions for both batch and streaming queries.

The PySpark script, 01_seed_sales_kafka.py, and the seed data file, sales_seed.csv, are both read from Amazon S3 by Spark, running on Amazon EMR. The location of the Amazon S3 bucket name and the Amazon MSK’s broker list values are pulled from AWS SSM Parameter Store using the parameters created earlier. The Kafka topic that stores the sales data, pagila.sales.spark.streaming, is created automatically by the script the first time it runs.

# Purpose: Batch write initial sales data from S3 to a new Kafka topic
# Author: Gary A. Stafford
# Date: 2021-09-22
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType
from pyspark.sql.window import Window
sales_data = "sales_seed.csv"
topic_output = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-seed-sales") \
.getOrCreate()
df_sales = read_from_csv(spark, params)
write_to_kafka(params, df_sales)
def read_from_csv(spark, params):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{sales_data}",
schema=schema, header=True, sep="|")
df_sales = update_payment_date(df_sales)
return df_sales
def write_to_kafka(params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
df_sales \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
def update_payment_date(df):
"""Update existing payment date to a current timestamp for streaming simulation"""
record_count = 250
window = Window.orderBy("payment_id")
df = df \
.drop("payment_date") \
.withColumn("index", F.row_number().over(window)) \
.withColumn("payment_date",
(F.unix_timestamp(F.current_timestamp())
(record_count F.col("index"))).cast(IntegerType())) \
.drop("index")
return df
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Update the two environment variables, then submit your first Spark job as an Amazon EMR Step using the AWS CLI and the emr API:

export CLUSTER_ID="<your-cluster-id>"
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-seed-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/01_seed_sales_kafka.py]"""
Successfully adding a Step (Spark job) to the Amazon EMR cluster

From the Amazon EMR console, we should observe the Spark job has been completed successfully in about 30–90 seconds.

Amazon EMR Step (Spark job) completed successfully

The Kafka Consumer API allows applications to read streams of data from topics in the Kafka cluster. Using the Kafka Consumer API, from within a Kubernetes container running on Amazon EKS or an EC2 instance, we can observe that the new Kafka topic has been successfully created and that messages (initial sales data) have been published to the new Kafka topic.

export BBROKERS="b-1.your-cluster.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.kafka.us-east-1.amazonaws.com:9098, ..."
bin/kafka-console-consumer.sh \
--topic pagila.sales.spark.streaming \
--from-beginning \
--property print.key=true \
--property print.value=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.timestamp=true \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
Initial sales data as messages in Kafka topic

PySpark Job 2: Batch Query of Amazon MSK Topic

The PySpark script, 02_batch_read_kafka.py, performs a batch query of the initial 250 messages in the Kafka topic. When run, the PySpark script parses the JSON-format messages, then aggregates the data by both total sales and order count, by country, and finally, sorts by total sales.

window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.where(F.col("row") == 1).drop("row") \

The results are written to both the console as stdout and to Amazon S3 in CSV format.

# Purpose: Batch read Kafka topic, aggregate sales and orders by country,
# and output to console and Amazon S3 as CSV
# Author: Gary A. Stafford
# Date: 2021-09-22
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
topic_input = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-batch-sales") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(params, df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(params, df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_output = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.where(F.col("row") == 1).drop("row") \
.select("country", (F.format_number(F.col("sales"), 2)).alias("sales"), "orders") \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
df_output \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
df_output \
.write \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark_output/sales_by_country",
header=True, sep="|") \
.mode("overwrite")
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Again, submit this job as an Amazon EMR Step using the AWS CLI and the emr API:

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-batch-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/02_batch_read_kafka.py]"""

To view the console output, click on ‘View logs’ in the Amazon EMR console, then click on the stdout logfile, as shown below.

Logs from successful Amazon EMR Step (Spark job)

The stdout logfile should contain the top 25 total sales and order counts, by country, based on the initial 250 sales records.

+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+------------------+------+------+
only showing top 25 rows

The PySpark script also wrote the same results to Amazon S3 in CSV format.

CSV file written to Amazon S3 as a result of the Spark job

The total sales and order count for 69 countries were computed, sorted, and coalesced into a single CSV file.

country sales orders
India 138.80 20
China 133.80 20
Mexico 106.86 14
Japan 100.86 14
Brazil 96.87 13
Russian Federation 94.87 13
United States 92.86 14
Nigeria 58.93 7
Philippines 58.92 8
South Africa 46.94 6
Argentina 42.93 7
Germany 39.96 4
Indonesia 38.95 5
Italy 35.95 5
Iran 33.95 5
South Korea 33.94 6
Poland 30.97 3
Pakistan 25.97 3
Taiwan 25.96 4
Mozambique 23.97 3
Vietnam 23.96 4
Ukraine 23.96 4
Venezuela 22.97 3
France 20.98 2
Peru 19.98 2

Streaming Queries

To demonstrate streaming queries with Spark Structured Streaming, we will use a combination of two PySpark scripts. The first script, 03_streaming_read_kafka_console.py, will perform a streaming query and computation of messages in the Kafka topic, aggregating the total sales and number of orders. Concurrently, the second PySpark script, 04_incremental_sales_kafka.py, will read additional Pagila sales data from a CSV file located on Amazon S3 and write messages to the Kafka topic at a rate of two messages per second. The first script, 03_streaming_read_kafka_console.py, will stream aggregations in micro-batches of one-minute increments to the console. Spark Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small, batch jobs.

Note that this first script performs grouped aggregations as opposed to aggregations over a sliding event-time window. The aggregated results represent the total, all-time sales at a point in time, based on all the messages currently in the topic when the micro-batch was computed.

To follow along with this part of the demonstration, you can run the two Spark jobs as concurrent steps on the existing Amazon EMR cluster, or create a second EMR cluster, identically configured to the existing cluster, to run the second PySpark script, 04_incremental_sales_kafka.py. Using a second cluster, you can use a minimally-sized single master node cluster with no core nodes to save cost.

PySpark Job 3: Streaming Query to Console

The first PySpark scripts, 03_streaming_read_kafka_console.py, performs a streaming query of messages in the Kafka topic. The script then aggregates the data by both total sales and order count, by country, and finally, sorts by total sales.

.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select("country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \

The results are streamed to the console using the processingTime trigger parameter. A trigger defines how often a streaming query should be executed and emit new data. The processingTime parameter sets a trigger that runs a micro-batch query periodically based on the processing time (e.g. ‘5 minutes’ or ‘1 hour’). The trigger is currently set to a minimal processing time of one minute for ease of demonstration.

.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 25) \
# Purpose: Streaming read from Kafka topic and summarize top 25
# all-time total sales by country to the console every minute
# Author: Gary A. Stafford
# Date: 2021-09-08
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType, TimestampType
topic_input = "pagila.sales.spark.streaming"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales-console") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select("country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \
.coalesce(1) \
.writeStream \
.queryName("streaming_to_console") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

For demonstration purposes, we will run the Spark job directly from the master node of the EMR Cluster. This method will allow us to easily view the micro-batches and associated logs events as they are output to the console. The console is normally used for testing purposes. Submitting the PySpark script from the cluster’s master node is an alternative to submitting an Amazon EMR Step. Connect to the master node of the Amazon EMR cluster using SSH, as the hadoop user:

export EMR_MASTER=<your-emr-master-dns.compute-1.amazonaws.com>
export EMR_KEY_PATH=path/to/key/<your-ssk-key.pem>
ssh -i ${EMR_KEY_PATH} hadoop@${EMR_MASTER}

Submit the PySpark script, 03_streaming_read_kafka_console.py, to Spark:

export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
spark-submit s3a://${SPARK_BUCKET}/spark/03_streaming_read_kafka_console.py

Before running the second PySpark script, 04_incremental_sales_kafka.py, let the first script run long enough to pick up the existing sales data in the Kafka topic. Within about two minutes, you should see the first micro-batch of aggregated sales results, labeled ‘Batch: 0’ output to the console. This initial micro-batch should contain the aggregated results of the existing 250 messages from Kafka. The streaming query’s first micro-batch results should be identical to the previous batch query results.

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+------------------+------+------+
only showing top 25 rows

Immediately below the batch output, there will be a log entry containing information about the batch. In the log entry snippet below, note the starting and ending offsets of the topic for the Spark job’s Kafka consumer group, 0 (null) to 250, representing the initial sales data.

{
"id" : "e0168615-dd39-4025-9811-c001a324ed58",
"runId" : "ed76fe07-032c-42ab-881c-57b44f561a29",
"name" : "streaming_to_console",
"timestamp" : "2021-09-08T17:37:58.116Z",
"batchId" : 0,
"numInputRows" : 250,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 14.104372355430183,
"durationMs" : {
"addBatch" : 12298,
"getBatch" : 39,
"latestOffset" : 4710,
"queryPlanning" : 542,
"triggerExecution" : 17724,
"walCommit" : 33
},
"stateOperators" : [ {
"numRowsTotal" : 136,
"numRowsUpdated" : 136,
"memoryUsedBytes" : 119008,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 61408
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[pagila.sales.spark.streaming]]",
"startOffset" : null,
"endOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 250
}
},
"numInputRows" : 250,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 14.104372355430183
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@13b8bba3",
"numOutputRows" : 68
}
}

PySpark Job 4: Incremental Sales Data

As described earlier, the second PySpark script, 04_incremental_sales_kafka.py, reads 1,800 additional sales records from a second CSV file located on Amazon S3, sales_incremental_large.csv. The script then publishes messages to the Kafka topic at a deliberately throttled rate of two messages per second. Concurrently, the first PySpark job, still running and performing a streaming query, will consume the new Kafka messages and stream aggregated total sales and orders in micro-batches of one-minute increments to the console over a period of about 15 minutes.

# Purpose: Batch write incremental sales data from S3 to a new Kafka topic
# Use a delay between each message to simulate real-time streaming data
# Author: Gary A. Stafford
# Date: 2021-09-26
import os
import time
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType
sales_data = "sales_incremental_large.csv"
topic_output = "pagila.sales.spark.streaming"
time_between_messages = 0.5 # 1800 messages * .5 seconds = ~15 minutes
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-incremental-sales") \
.getOrCreate()
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = read_from_csv(spark, params, schema)
df_sales.cache()
write_to_kafka(spark, params, df_sales)
def read_from_csv(spark, params, schema):
df_sales = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{sales_data}",
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp()) \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
df_message.show(1)
time.sleep(time_between_messages)
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Submit the second PySpark script as a concurrent Amazon EMR Step to the first EMR cluster, or submit as a step to the second Amazon EMR cluster.

aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='kafka-incremental-sales',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/04_incremental_sales_kafka.py]"""

The job sends a total of 1,800 messages to Kafka at a rate of two messages per second for 15 minutes. The total runtime of the job should be approximately 19 minutes, given a few minutes for startup and shutdown. Why run for so long? We want to make sure the job’s runtime will span multiple, overlapping, sliding event-time windows.

After about two minutes, return to the terminal output of the first Spark job, 03_streaming_read_kafka_console.py, running on the master node of the first cluster. As long as new messages are consumed every minute, you should see a new micro-batch of aggregated sales results stream to the console. Below we see an example of Batch 3, which reflects additional sales compared to Batch 0, shown previously. The results reflect the current all-time sales by country in real-time as the sales are published to Kafka.

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------+------+------+
|country |sales |orders|
+------------------+------+------+
|China |473.35|65 |
|India |393.44|56 |
|Japan |292.60|40 |
|Mexico |262.64|36 |
|United States |252.65|35 |
|Russian Federation|243.65|35 |
|Brazil |220.69|31 |
|Philippines |191.75|25 |
|Indonesia |142.81|19 |
|South Africa |110.85|15 |
|Nigeria |108.86|14 |
|Argentina |89.86 |14 |
|Germany |85.89 |11 |
|Israel |68.90 |10 |
|Ukraine |65.92 |8 |
|Turkey |58.91 |9 |
|Iran |58.91 |9 |
|Saudi Arabia |56.93 |7 |
|Poland |50.94 |6 |
|Pakistan |50.93 |7 |
|Italy |48.93 |7 |
|French Polynesia |47.94 |6 |
|Peru |45.95 |5 |
|United Kingdom |45.94 |6 |
|Colombia |44.94 |6 |
+------------------+------+------+
only showing top 25 rows

If we fast forward to a later micro-batch, sometime after the second incremental sales job is completed, we should see the top 25 aggregated sales by country of 2,050 messages — 250 seed plus 1,800 incremental messages.

-------------------------------------------
Batch: 20
-------------------------------------------
+------------------+--------+------+
|country |sales |orders|
+------------------+--------+------+
|China |1,379.05|195 |
|India |1,338.10|190 |
|United States |915.69 |131 |
|Mexico |855.80 |120 |
|Japan |831.88 |112 |
|Russian Federation|723.95 |105 |
|Brazil |613.12 |88 |
|Philippines |528.27 |73 |
|Indonesia |381.46 |54 |
|Turkey |350.52 |48 |
|Argentina |298.57 |43 |
|Nigeria |294.61 |39 |
|South Africa |279.61 |39 |
|Taiwan |221.67 |33 |
|Germany |199.73 |27 |
|United Kingdom |196.75 |25 |
|Poland |182.77 |23 |
|Spain |170.77 |23 |
|Ukraine |160.79 |21 |
|Iran |160.76 |24 |
|Italy |156.79 |21 |
|Pakistan |152.78 |22 |
|Saudi Arabia |146.81 |19 |
|Venezuela |145.79 |21 |
|Colombia |144.78 |22 |
+------------------+--------+------+
only showing top 25 rows

Compare the informational output below for Batch 20 to Batch 0, previously. Note the starting offset of the Kafka consumer group on the topic is 1986, and the ending offset is 2050. This is because all messages have been consumed from the topic and aggregated. If additional messages were streamed to Kafka while the streaming job is still running, additional micro-batches would continue to be streamed to the console every one minute.

"sources" : [ {
"description" : "KafkaV2[Subscribe[pagila.sales.spark.streaming]]",
"startOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 1986
}
},
"endOffset" : {
"pagila.sales.spark.streaming" : {
"0" : 2050
}
},
"numInputRows" : 64,
"inputRowsPerSecond" : 1.0666666666666667,
"processedRowsPerSecond" : 13.772326231977619
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@13b8bba3",
"numOutputRows" : 105
}

PySpark Job 5: Aggregations over Sliding Event-time Window

In the previous example, we analyzed total all-time sales in real-time (e.g., show me the current, total, all-time sales for France compared to the rest of Europe, at regular intervals). This approach is opposed to sales made during a sliding event-time window (e.g., are the total sales for the United States trending better during this current four-hour marketing promotional period than the previous promotional period). In many cases, real-time sales during a distinct period or event window is probably a more commonly tracked KPI than total all-time sales.

If we add a sliding event-time window to the PySpark script, we can easily observe the total sales and order counts made during the sliding event-time window in real-time.

.withWatermark("timestamp", "10 minutes") \
.groupBy("country",
F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("window").desc(),
F.col("sum(amount)").desc()) \

Windowed totals would not include sales (messages) present in the Kafka topic before the streaming query beginning, nor in previous sliding windows. Constructing the correct query always starts with a clear understanding of the question you are trying to answer.

Below, in the abridged console output of the micro-batch from the script, 05_streaming_read_kafka_console_window.py, we see the results of three ten-minute sliding event-time windows with a five-minute overlap. The sales and order totals represent the volume sold during that window, with this micro-batch falling within the active current window, 19:30 to 19:40 UTC.

——————————————-
Batch: 14
——————————————-
+————————————-+——+——+——————-+——————-+
|country |sales |orders|start |end |
+————————————-+——+——+——————-+——————-+
|India |286.60|40 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|China |285.61|39 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|United States |205.69|31 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Japan |189.74|26 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Russian Federation |182.74|26 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Philippines |163.77|23 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Mexico |159.76|24 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Brazil |155.77|23 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Argentina |118.84|16 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|Indonesia |82.88 |12 |2021-09-08 19:30:00|2021-09-08 19:40:00|
|India |600.13|87 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|China |509.27|73 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|United States |416.42|58 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Japan |329.56|44 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Mexico |311.54|46 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Russian Federation |301.55|45 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Brazil |256.64|36 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Philippines |219.67|33 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Turkey |171.76|24 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|Argentina |159.78|22 |2021-09-08 19:25:00|2021-09-08 19:35:00|
|China |353.53|47 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|India |264.62|38 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Japan |191.74|26 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|United States |173.77|23 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Mexico |159.77|23 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Russian Federation |148.78|22 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Philippines |132.83|17 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Brazil |123.82|18 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|Indonesia |103.86|14 |2021-09-08 19:20:00|2021-09-08 19:30:00|
|South Africa |63.91 |9 |2021-09-08 19:20:00|2021-09-08 19:30:00|
+————————————-+——+——+——————-+——————-+

Plotting the total sales over time using sliding event-time windows, we will observe the results do not reflect a running total. Total sales only accumulate within a sliding window.

Cumulative sales within a 5-minute sliding event-time windows

Compare these results to the results of the previous script, whose total sales reflect a running total.

Running total of sales (no sliding windows)

PySpark Job 6: Streaming Query from/to Amazon MSK

The PySpark script, 06_streaming_read_kafka_kafka.py, performs the same streaming query and grouped aggregation as the previous script, 03_streaming_read_kafka_console.py. However, instead of outputting results to the console, the results of this job will be written to a new Kafka topic on Amazon MSK.

.format("kafka") \
.options(**options_write) \
.option("checkpointLocation", "/checkpoint/kafka/") \
# Purpose: Streaming read from Kafka topic and aggregate
# sales and orders by country to Kafka every minute
# Author: Gary A. Stafford
# Date: 2021-09-08
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType,
StringType, FloatType, TimestampType
topic_input = "pagila.sales.spark.streaming.in"
topic_output = "pagila.sales.spark.streaming.out"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales-kafka") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(params, df_sales)
def read_from_kafka(spark, params):
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
topic_input,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(params, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
topic_output,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.groupBy("country") \
.agg(F.count("amount"), F.sum("amount")) \
.orderBy(F.col("sum(amount)").desc()) \
.select(F.sha1("country").alias("id"),
"country",
(F.format_number(F.col("sum(amount)"), 2)).alias("sales"),
(F.col("count(amount)")).alias("orders")) \
.coalesce(1) \
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.trigger(processingTime="1 minute") \
.queryName("streaming_to_kafka") \
.outputMode("complete") \
.format("kafka") \
.options(**options_write) \
.option("checkpointLocation", "/checkpoint/kafka/") \
.start()
ds_sales.awaitTermination()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
}
return params
if __name__ == "__main__":
main()

Repeat the same process used with the previous script. Re-run the seed data script, 01_seed_sales_kafka.py, but update the input topic to a new name, such as pagila.sales.spark.streaming.in. Next, run the new script, 06_streaming_read_kafka_kafka.py. Give the script time to start and consume the 250 seed messages from Kafka. Then, update the input topic name and re-run the incremental data PySpark script, 04_incremental_sales_kafka.py, concurrent to the new script on the same cluster or run on the second cluster.

When run, the script, 06_streaming_read_kafka_kafka.py, will continuously consume messages from the new pagila.sales.spark.streaming.in topic and publish grouped aggregation results to a new topic, pagila.sales.spark.streaming.out.

Use the Kafka Consumer API to view new messages as the Spark job publishes them in near real-time to Kafka.

export BBROKERS="b-1.your-cluster.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.kafka.us-east-1.amazonaws.com:9098, ..."
bin/kafka-console-consumer.sh \
--topic pagila.sales.spark.streaming.out \
--from-beginning \
--property print.key=true \
--property print.value=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.timestamp=true \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
Aggregated sales results (messages) being published to Kafka by streaming Spark job

PySpark Job 7: Batch Query of Streaming Results from MSK

When run, the previous script produces Kafka messages containing non-windowed sales aggregations to the Kafka topic every minute. Using the next PySpark script, 07_batch_read_kafka.py, we can consume those aggregated messages using a batch query and display the most recent sales totals to the console. Each country’s most recent all-time sales totals and order counts should be identical to the previous script’s results, representing the aggregation of all 2,050 Kafka messages — 250 seed plus 1,800 incremental messages.

To get the latest total sales by country, we will consume all the messages from the output topic, group the results by country, find the maximum (max) value from the sales column for each country, and finally, display the results sorted sales in descending order.

window = Window.partitionBy("country") \
.orderBy(F.col("timestamp").desc())
.withColumn("row", F.row_number().over(window)) \
.where(F.col("row") == 1).drop("row") \
.select("country", "sales", "orders") \
# Purpose: Batch read Kafka output topic and display
# top 25 total sales by country to console
# Author: Gary A. Stafford
# Date: 2021-09-09
import os
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, TimestampType
from pyspark.sql.window import Window
topic_input = "pagila.sales.spark.streaming.out"
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-batch-sales") \
.getOrCreate()
df_sales = read_from_kafka(spark, params)
summarize_sales(df_sales)
def read_from_kafka(spark, params):
schema = StructType([
StructField("country", StringType(), False),
StructField("sales", StringType(), False),