Posts Tagged Analytics

Serverless Analytics on AWS: Getting Started with Amazon EMR Serverless and Amazon MSK Serverless

Utilizing the recently released Amazon EMR Serverless and Amazon MSK Serverless for batch and streaming analytics with Apache Spark and Apache Kafka

Introduction

Amazon EMR Serverless

AWS recently announced the general availability (GA) of Amazon EMR Serverless on June 1, 2022. EMR Serverless is a new serverless deployment option in Amazon EMR, in addition to EMR on EC2EMR on EKS, and EMR on AWS Outposts. EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. According to AWS, with EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks.

Amazon MSK Serverless

Similarly, on April 28, 2022, AWS announced the general availability of Amazon MSK Serverless. According to AWS, Amazon MSK Serverless is a cluster type for Amazon MSK that makes it easy to run Apache Kafka without managing and scaling cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources, so you can use Apache Kafka on demand and only pay for the data you stream and retain.

Serverless Analytics

In the following post, we will learn how to use these two new, powerful, cost-effective, and easy-to-operate serverless technologies to perform batch and streaming analytics. The PySpark examples used in this post are similar to those featured in two earlier posts, which featured non-serverless alternatives Amazon EMR on EC2 and Amazon MSK: Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR and Stream Processing with Apache Spark, Kafka, Avro, and Apicurio Registry on AWS using Amazon MSK and EMR.

Source Code

All the source code demonstrated in this post is open-source and available on GitHub.

git clone --depth 1 -b main \
https://github.com/garystafford/emr-msk-serverless-demo.git

Architecture

The post’s high-level architecture consists of an Amazon EMR Serverless Application, Amazon MSK Serverless Cluster, and Amazon EC2 Kafka client instance. To support these three resources, we will need two Amazon Virtual Private Clouds (VPCs), a minimum of three subnets, an AWS Internet Gateway (IGW) or equivalent, an Amazon S3 Bucket, multiple AWS Identity and Access Management (IAM) Roles and Policies, Security Groups, and Route Tables, and a VPC Gateway Endpoint for S3. All resources are constrained to a single AWS account and a single AWS Region, us-east-1.

High-level AWS serverless analytics architecture used in this post

Prerequisites

As a prerequisite for this post, you will need to create the following resources:

  1. (1) Amazon EMR Serverless Application;
  2. (1) Amazon MSK Serverless Cluster;
  3. (1) Amazon S3 Bucket;
  4. (1) VPC Endpoint for S3;
  5. (3) Apache Kafka topics;
  6. PySpark applications, related JAR dependencies, and sample data files uploaded to Amazon S3 Bucket;

Let’s walk through each of these prerequisites.

Amazon EMR Serverless Application

Before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon EMR Serverless, especially, What is Amazon EMR Serverless? Create a new EMR Serverless Application by following the AWS documentation, Getting started with Amazon EMR Serverless. The creation of the EMR Serverless Application includes the following resources:

  1. Amazon S3 bucket for storage of Spark resources;
  2. Amazon VPC with at least two private subnets and associated Security Group(s);
  3. EMR Serverless runtime AWS IAM Role and associated IAM Policy;
  4. Amazon EMR Serverless Application;

For this post, use the latest version of EMR available in the EMR Studio Serverless Application console, the newly released version 6.7.0, to create a Spark application.

EMR Studio Serverless Application creation console

Keep the default pre-initialized capacity, application limits, and application behavior settings.

EMR Studio Serverless Application creation console

Since we are connecting to MSK Serverless from EMR Serverless, we need to configure VPC access. Select the new VPC and at least two private subnets in different Availability Zones (AZs).

EMR Studio Serverless Application creation console

According to the documentation, the subnets selected for EMR Serverless must be private subnets. The associated route tables for the subnets should not contain direct routes to the Internet.

Error resulting from trying to associate a public subnet with EMR Serverless
EMR Studio Serverless Application details console showing new Application

Amazon MSK Serverless Cluster

Similarly, before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon MSK Serverless, especially MSK Serverless. Create a new MSK Serverless Cluster by following the AWS documentation, Getting started using MSK Serverless clusters. The creation of the MSK Serverless Cluster includes the following resources:

  1. AWS IAM Role and associated IAM Policy for the Amazon EC2 Kafka client instance;
  2. VPC with at least one public subnet and associated Security Group(s);
  3. Amazon EC2 instance used as Apache Kafka client, provisioned in the public subnet of the above VPC;
  4. Amazon MSK Serverless Cluster;
Amazon MSK Serverless Create cluster console

Associate the new MSK Serverless Cluster with the EMR Serverless Application’s VPC and two private subnets. Also, associate the cluster with the EC2-based Kafka client instance’s VPC and its public subnet.

Amazon MSK Serverless Create cluster console — VPC 1
Amazon MSK Serverless Create cluster console — VPC 2

According to the AWS documentation, Amazon MSK does not support all AZs. For example, I tried to use a subnet in us-east-1e threw an error. If this happens, choose an alternative AZ.

Error resulting from using an unsupported AZ
Successfully created Amazon MSK Serverless Cluster

VPC Endpoint for S3

To access the Spark resource in Amazon S3 from EMR Serverless running in the two private subnets, we need a VPC Endpoint for S3. Specifically, a Gateway Endpoint, which sends traffic to Amazon S3 or DynamoDB using private IP addresses. A gateway endpoint for Amazon S3 enables you to use private IP addresses to access Amazon S3 without exposure to the public Internet. EMR Serverless does not require public IP addresses, and you don’t need an internet gateway (IGW), a NAT device, or a virtual private gateway in your VPC to connect to S3.

VPC Endpoint for S3 associated with route table for private subnets

Create the VPC Endpoint for S3 (Gateway Endpoint) and add the route table for the two EMR Serverless private subnets. You can add additional routes to that route table, such as VPC peering connections to data sources such as Amazon Redshift or Amazon RDS. However, do not add routes that provide direct Internet access.

Route table for private subnets showing VPC Endpoint to S3 route (first route shown)

Kafka Topics and Sample Messages

Once the MSK Serverless Cluster and EC2-based Kafka client instance are provisioned and running, create the three required Kafka topics using the EC2-based Kafka client instance. I recommend using AWS Systems Manager Session Manager to connect to the client instance as the ec2-user user. Session Manager provides secure and auditable node management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys. Alternatively, you can SSH into the client instance.

Before creating the topics, use a utility like telnet to confirm connectivity between the Kafka client and the MSK Serverless Cluster. Verifying connectivity will save you a lot of frustration with potential security and networking issues.

sudo yum install telnet -y
telnet <your_bootstrap_server_host> 9098
# > Trying 192.168.XX.XX…
# > Connected to boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com.
# > Escape character is '^]'.

With MSK Serverless Cluster connectivity confirmed, create the three Kafka topics: topicAtopicB, and topicC. I am using the default partitioning and replication settings from the AWS Getting Started Tutorial.

cd kafka_2.12-2.8.1
# *** CHANGE ME ***
export BOOTSTRAP_SERVER=<your_bootstrap_server> # e.g., boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com:9098
bin/kafka-topics.sh –create –topic topicA \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicB \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicC \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
# list topics to confirm creation
bin/kafka-topics.sh –list \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties

To create some quick sample data, we will copy and paste 250 messages from a file included in the GitHub project, sample_data/sales_messages.txt, into topicA. The messages are simple mock sales transactions.

{"payment_id":16940,"customer_id":130,"amount":5.99,"payment_date":"2021-05-08 21:21:56.996577 +00:00","city":"guas Lindas de Gois","district":"Gois","country":"Brazil"}
{"payment_id":16406,"customer_id":459,"amount":5.99,"payment_date":"2021-05-08 21:22:59.996577 +00:00","city":"Qomsheh","district":"Esfahan","country":"Iran"}
{"payment_id":16315,"customer_id":408,"amount":6.99,"payment_date":"2021-05-08 21:32:05.996577 +00:00","city":"Jaffna","district":"Northern","country":"Sri Lanka"}
{"payment_id":16185,"customer_id":333,"amount":7.99,"payment_date":"2021-05-08 21:33:07.996577 +00:00","city":"Baku","district":"Baki","country":"Azerbaijan"}
{"payment_id":17097,"customer_id":222,"amount":9.99,"payment_date":"2021-05-08 21:33:47.996577 +00:00","city":"Jaroslavl","district":"Jaroslavl","country":"Russian Federation"}
{"payment_id":16579,"customer_id":549,"amount":3.99,"payment_date":"2021-05-08 21:36:33.996577 +00:00","city":"Santiago de Compostela","district":"Galicia","country":"Spain"}
{"payment_id":16050,"customer_id":269,"amount":4.99,"payment_date":"2021-05-08 21:40:19.996577 +00:00","city":"Salinas","district":"California","country":"United States"}
{"payment_id":17126,"customer_id":239,"amount":7.99,"payment_date":"2021-05-08 22:00:12.996577 +00:00","city":"Ciomas","district":"West Java","country":"Indonesia"}
{"payment_id":16933,"customer_id":126,"amount":7.99,"payment_date":"2021-05-08 22:29:06.996577 +00:00","city":"Po","district":"So Paulo","country":"Brazil"}
{"payment_id":16297,"customer_id":399,"amount":8.99,"payment_date":"2021-05-08 22:30:47.996577 +00:00","city":"Okara","district":"Punjab","country":"Pakistan"}

Use the kafka-console-producer Shell script to publish the messages to the Kafka topic. Use the kafka-console-consumer Shell script to validate the messages made it to the topic by consuming a few messages.

bin/kafka-console-producer.sh \
–topic topicA \
–bootstrap-server $BOOTSTRAP_SERVER \
–producer.config config/client.properties
# copy and paste contents of 'sales_messages.txt' and then Ctrl+C to exit
# check for messages in topic
bin/kafka-console-consumer.sh \
–topic topicA \
–from-beginning –max-messages 5 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVER \
–consumer.config config/client.properties

The output should look similar to the following example.

Sample message output from Kafka topic

Spark Resources in Amazon S3

To submit and run the five Spark Jobs included in the project, you will need to copy the following resources to your Amazon S3 bucket: (5) Apache Spark jobs, (5) related JAR dependencies, and (2) sample data files.

PySpark Applications
To start, copy the five PySpark applications to a scripts/ subdirectory within your Amazon S3 bucket.

PySpark applications uploaded to the Amazon S3 bucket

Sample Data
Next, copy the two sample data files to a sample_data/ subdirectory within your Amazon S3 bucket. The large file contains 2,000 messages, while the small file contains 600 messages. These two files can be used interchangeably with the post’s final streaming example.

Sample sales data uploaded to the Amazon S3 bucket

PySpark Dependencies
Lastly, the PySpark applications have a handful of JAR dependencies that must be available when the job runs, which are not on the EMR Serverless classpath by default. If you are unsure which JARs are already on the EMR Serverless classpath, you can check the Spark UI’s Environment tab’s Classpath Entries section. Accessing the Spark UI is demonstrated in the first PySpark application example, below.

Spark UI’s Environment tab showing Classpath Entries

It is critical to choose the correct version of each JAR dependency based on the version of libraries used with the EMR and MSK. Using the wrong version or inconsistent versions, especially Scala, can result in job failures. Specifically, we are targeting Spark 3.2.1 and Scala 2.12 (EMR v6.7.0: Amazon’s Spark 3.2.1, Scala 2.12.15, Amazon Corretto 8 version of OpenJDK), and Apache Kafka 2.8.1 (MSK Serverless: Kafka 2.8.1).

Download the seven JAR files locally, then copy them to a jars/ subdirectory within your Amazon S3 bucket.

Dependency JARs uploaded to the Amazon S3 bucket

PySpark Applications Examples

With the EMR Serverless Application, MSK Serverless Cluster, Kafka topics, and sample data created, and the Spark resources uploaded to Amazon S3, we are ready to explore four different Spark examples.

Example 1: Kafka Batch Aggregation to the Console

The first PySpark application, 01_example_console.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to the console (stdout).

There are no hard-coded values in any of the PySpark application examples. All required environment-specific variables, such as your MSK Serverless bootstrap server (host and port) and Amazon S3 bucket name, will be passed to the running Spark jobs as arguments from the spark-submit command.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to the console (stdout)
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("01-example-console") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your first PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. You will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 01-example-console \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/01_example_console.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see the new Spark job you just submitted in one of several job states.

EMR Studio Serverless Application details console

You can click on the Spark job to get more details. Note the Script arguments and Spark properties passed in from the spark-submit command.

EMR Studio Serverless Application details Job details view

From the Spark job details tab, access the Spark UI, aka Spark Web UI, from a button in the upper right corner of the screen. If you have experience with Spark, you are most likely familiar with the Spark Web UI to monitor and tune Spark jobs.

Spark History Server UI

From the initial screen, the Spark History Server tab, click on the App ID. You can access an enormous amount of Spark-related information about your job and EMR environment from the Spark Web UI.

Spark UI’s Stages tab
Spark UI’s Stages tab showing a Directed acyclic graph (DAG) Visualization
Spark UI’s Environment tab showing environment variables, including versions of Spark, Java, and Scala

The Executors tab will give you access to the Spark job’s output. The output we are most interested in is the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI ‘s Executors tab

The stderr contains output related to the running Spark job. Below we see an example of Kafka consumer configuration values output to stderr. Several of these values were passed in from the Spark job, including items such as kafka.bootstrap.serverssecurity.protocolsasl.mechanism, and sasl.jaas.config.

driver executor’s stderr output to the console

The stdout from the driver executor contains the console output as directed from the Spark job. Below we see the successfully aggregated results of the first Spark job, output to stdout.

+——————+——+——+
|country |sales |orders|
+——————+——+——+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+——————+——+——+
only showing top 25 rows

Example 2: Kafka Batch Aggregation to CSV in S3

Although the console is useful for development and debugging, it is typically not used in Production. Instead, Spark typically sends results to S3 as CSV, JSON, Parquet, or Arvo formatted files, to Kafka, to a database, or to an API endpoint. The second PySpark application, 02_example_csv_s3.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a CSV file in Amazon S3.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to CSV file in Amazon S3
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("02-example-csv-s3") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.csv(path=f"s3a://{args.s3_bucket}/output/", header=True, sep="|")
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your second PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first example, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 02-example-csv-s3 \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/02_example_csv_s3.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

If successful, the Spark job should create a single CSV file in the designated Amazon S3 key (directory path) and an empty _SUCCESS indicator file. The presence of an empty _SUCCESS file signifies that the save() operation completed normally.

Amazon S3 bucket showing CSV file output by Spark job

Below we see the expected pipe-delimited output from the second Spark job.

country|sales|orders
India|138.80|20
China|133.80|20
Mexico|106.86|14
Japan|100.86|14
Brazil|96.87|13
Russian Federation|94.87|13
United States|92.86|14
Nigeria|58.93|7
Philippines|58.92|8
South Africa|46.94|6
Argentina|42.93|7
Germany|39.96|4
Indonesia|38.95|5
Italy|35.95|5
Iran|33.95|5

Example 3: Kafka Batch Aggregation to Kafka

The third PySpark application, 03_example_kafka.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a second Kafka topic, topicB. This job now has both read and write options.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to topicB
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("03-example-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"topic":
args.write_topic,
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.select(F.to_json(F.struct("*"))).toDF("value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
parser.add_argument("–write_topic", default="topicB", required=False, help="Kafka topic to write to")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your next PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first two examples, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 03-example-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/03_example_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Once the job completes, you can confirm the results by returning to your EC2-based Kafka client. Use the same kafka-console-consumer command you used previously to show messages from topicB.

bin/kafka-console-consumer.sh \
–topic topicB \
–from-beginning –max-messages 10 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVERS \
–consumer.config config/client.properties

If the Spark job and the Kafka client command worked successfully, you should see aggregated messages similar to the example output below. Note we are not using keys with the Kafka messages, only values for these simple examples.

Aggregated messages from Kafka topic

Example 4: Spark Structured Streaming

For our final example, we will switch from batch to streaming — from read to readstream and from write to writestream. Before continuing, I suggest reading the Structured Streaming Programming Guide.

In this example, we will demonstrate how to continuously measure a common business metric — real-time sales volumes. Imagine you are sell products globally and want to understand the relationship between the time of day and buying patterns in different geographic regions in real-time. For any given window of time — this 15-minute period, this hour, this day, or this week— you want to know the current sales volumes by country. You are not reviewing previous sales periods or examing running sales totals, but real-time sales during a sliding time window.

We will use two PySpark jobs running concurrently to simulate this metric. The first application, 04_stream_sales_to_kafka.py, simulates streaming data by continuously writing messages to topicC — 2,000 messages with a 0.5-second delay between messages. In my tests, the job ran for ~28–29 minutes.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Write messages from a CSV file to Kafka topicC
# to simulate real-time streaming sales data
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import time
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("04-stream-sales-to-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = read_from_csv(spark, schema, args)
df_sales.cache()
write_to_kafka(spark, df_sales, args)
def read_from_csv(spark, schema, args):
df_sales = spark.read \
.csv(path=f"s3a://{args.s3_bucket}/sample_data/{args.sample_data_file}",
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"topic":
args.write_topic,
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp())
df_message \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
df_message.show(1)
time.sleep(args.message_delay)
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–write_topic", default="topicC", required=False, help="Kafka topic to write to")
parser.add_argument("–sample_data_file", default="sales_incremental_large.csv", required=False, help="data file")
parser.add_argument("–message_delay", default=0.5, required=False, help="message publishing delay")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

Simultaneously, the PySpark application, 05_streaming_kafka.py, continuously consumes the sales transaction messages from the same topic, topicC. Then, Spark aggregates messages over a sliding event-time window and writes the results to the console.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads stream of messages from Kafka topicC and
# writes stream of aggregations over sliding event-time window to console (stdout)
# References: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("05-streaming-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
.select("data.*", "timestamp") \
.withWatermark("timestamp", "10 minutes") \
.groupBy("country",
F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.sum("amount"), F.count("amount")) \
.orderBy(F.col("window").desc(),
F.col("sum(amount)").desc()) \
.select("country",
F.format_number("sum(amount)", 2).alias("sales"),
F.format_number("count(amount)", 0).alias("orders"),
"window.start", "window.end") \
.coalesce(1) \
.writeStream \
.queryName("streaming_to_console") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 10) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicC", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit the two PySpark jobs to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Again, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

# run 04 and 05 simultaneously
aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 04-stream-sales-to-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/04_stream_sales_to_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'
aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 05-streaming-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/05_streaming_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see both Spark jobs you just submitted in one of several job states.

EMR Studio Serverless Application details console

Using the Spark UI again, we can review the output from the second job, 05_streaming_kafka.py.

Spark UI’s Jobs tab

With Spark Structured Streaming jobs, we have an extra tab in the Spark UI, Structured Streaming. This tab displays all running jobs with their latest [micro]batch number, the aggregate rate of data arriving, and the aggregate rate at which Spark is processing data. Unfortunately, with MSK Serverless, AWS doesn’t appear to allow access to the detailed streaming query statistics via the Run ID, which greatly reduces its value. You receive a 502 error when clicking on the Run ID hyperlink.

Spark UI’s Structured Streaming tab

The output we are most interested in, again, is contained in the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI’s Executors tab

Below we see sample output from stderr. The output shows the results of a micro-batch. According to the Apache Spark documentation, internally, by default, Structured Streaming queries are processed using a micro-batch processing engine. The engine processes data streams as a series of small batch jobs, achieving end-to-end latencies as low as 100ms and exactly-once fault-tolerance guarantees.

22/07/25 14:29:04 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-1b3adb9c-766a-4aec-97a9-decfd7be10e7/commits/.10.b017bcdc-f142-4b28-8891-d3f2d471b740.tmp to file:/tmp/temporary-1b3adb9c-766a-4aec-97a9-decfd7be10e7/commits/10
22/07/25 14:29:04 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "bec9640f-ac16-4d00-bd4a-ed8d29b5f768",
"runId" : "a2e00c3a-924c-4728-b25a-56ee855da7da",
"name" : "streaming_to_console",
"timestamp" : "2022-07-25T14:29:00.000Z",
"batchId" : 10,
"numInputRows" : 73,
"inputRowsPerSecond" : 1.2166666666666666,
"processedRowsPerSecond" : 16.375056078959172,
"durationMs" : {
"addBatch" : 4359,
"getBatch" : 1,
"latestOffset" : 5,
"queryPlanning" : 37,
"triggerExecution" : 4458,
"walCommit" : 27
},
"eventTime" : {
"avg" : "2022-07-25T14:28:29.947Z",
"max" : "2022-07-25T14:28:59.290Z",
"min" : "2022-07-25T14:28:00.559Z",
"watermark" : "2022-07-25T14:17:59.737Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 476,
"numRowsUpdated" : 132,
"allUpdatesTimeMs" : 319,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 14173,
"memoryUsedBytes" : 262776,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 400,
"numStateStoreInstances" : 400,
"customMetrics" : {
"loadedMapCacheHitCount" : 15600,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 154208
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topicC]]",
"startOffset" : {
"topicC" : {
"2" : 101,
"5" : 96,
"4" : 88,
"1" : 84,
"3" : 112,
"0" : 100
}
},
"endOffset" : {
"topicC" : {
"2" : 114,
"5" : 110,
"4" : 98,
"1" : 98,
"3" : 124,
"0" : 110
}
},
"latestOffset" : {
"topicC" : {
"2" : 114,
"5" : 110,
"4" : 98,
"1" : 98,
"3" : 124,
"0" : 110
}
},
"numInputRows" : 73,
"inputRowsPerSecond" : 1.2166666666666666,
"processedRowsPerSecond" : 16.375056078959172,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@52e344bf",
"numOutputRows" : 238
}
}
view raw microbatch.txt hosted with ❤ by GitHub
Example of a Spark Structured Streaming MicroBatch output

The corresponding output to the micro-batch output above is shown below. We see the initial micro-batch results, starting with the first micro-batch before any messages are streamed to topicC.

——————————————-
Batch: 0
——————————————-
+——-+—–+——+—–+—+
|country|sales|orders|start|end|
+——-+—–+——+—–+—+
+——-+—–+——+—–+—+
——————————————-
Batch: 1
——————————————-
+———-+—–+——+——————-+——————-+
|country |sales|orders|start |end |
+———-+—–+——+——————-+——————-+
|Azerbaijan|7.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Sri Lanka |6.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Iran |5.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Brazil |5.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Azerbaijan|7.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Sri Lanka |6.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Brazil |5.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Iran |5.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
+———-+—–+——+——————-+——————-+
——————————————-
Batch: 2
——————————————-
+——————+—–+——+——————-+——————-+
|country |sales|orders|start |end |
+——————+—–+——+——————-+——————-+
|Russian Federation|43.94|6 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|China |37.94|6 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Mexico |34.96|4 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|India |33.95|5 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|United States |26.96|4 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Philippines |22.97|3 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Nigeria |22.97|3 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Iran |14.98|2 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Vietnam |13.98|2 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|United Kingdom |11.99|1 |2022-07-25 14:20:00|2022-07-25 14:30:00|
+——————+—–+——+——————-+——————-+
only showing top 10 rows
Example of a Spark Structured Streaming MicroBatch results to console

If you are familiar with Spark Structured Streaming, you are likely aware that these Spark jobs run continuously. In other words, the streaming jobs will not stop; they continually await more streaming data.

EMR Studio Serverless Application details console

The first job, 04_stream_sales_to_kafka.py, will run for ~28–29 minutes and stop with a status of Sucess. However, the second job, 05_streaming_kafka.py, the Spark Structured Streaming job, must be manually canceled.

EMR Studio Serverless Application details console

Cleaning Up

You can delete your resources from the AWS Management Console or AWS CLI. However, to delete your Amazon S3 bucket, all objects (including all object versions and delete markers) in the bucket must be deleted before the bucket itself can be deleted.

# delete applicatiom, cluster, and ec2 client
aws kafka delete-cluster –cluster-arn <your_msk_serverless_cluster_arn>
aws emr-serverless delete-application –application-id <your_application_id>
aws ec2 terminate-instances –instance-ids <your_ec2_instance_id>
# all objects (including all object versions and delete markers) in the bucket
# must be deleted before the bucket itself can be deleted.
aws s3api delete-bucket –bucket <your_s3_bucket>

Conclusion

In this post, we discovered how easy it is to adopt a serverless approach to Analytics on AWS. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks. With MSK Serverless, you can use Apache Kafka on demand and pay for the data you stream and retain. In addition, MSK Serverless automatically provisions and scales compute and storage resources. Given suitable analytics use cases, EMR Serverless with MSK Serverless will likely save you time, effort, and expense.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , ,

Leave a comment

Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena

Learn how to develop Cloud-native, RESTful Java services that query data in an AWS-based data lake using Amazon Athena’s API

Introduction

AWS provides a collection of fully-managed services that makes building and managing secure data lakes faster and easier, including AWS Lake Formation, AWS Glue, and Amazon S3. Additional analytics services such as Amazon EMR, AWS Glue Studio, and Amazon Redshift allow Data Scientists and Analysts to run high-performance queries on large volumes of semi-structured and structured data quickly and economically.

What is not always as obvious is how teams develop internal and external customer-facing analytics applications built on top of data lakes. For example, imagine sellers on an eCommerce platform, the scenario used in this post, want to make better marketing decisions regarding their products by analyzing sales trends and buyer preferences. Further, suppose the data required for the analysis must be aggregated from multiple systems and data sources; the ideal use case for a data lake.

Example of a personalized sales report generated from the Spring Boot service’s salesbyseller endpoint

In this post, we will explore an example Java Spring Boot RESTful Web Service that allows end-users to query data stored in a data lake on AWS. The RESTful Web Service will access data stored as Apache Parquet in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena. The service will use Spring Boot and the AWS SDK for Java to expose a secure, RESTful Application Programming Interface (API).

High-level AWS architecture demonstrated in this post

Amazon Athena is a serverless, interactive query service based on Presto, used to query data and analyze big data in Amazon S3 using standard SQL. Using Athena functionality exposed by the AWS SDK for Java and Athena API, the Spring Boot service will demonstrate how to access tablesviewsprepared statements, and saved queries (aka named queries).

Amazon Athena Query Editor

TL;DR

Do you want to explore the source code for this post’s Spring Boot service or deploy it to Kubernetes before reading the full article? All the source code, Docker, and Kubernetes resources are open-source and available on GitHub.

git clone --depth 1 -b main \
https://github.com/garystafford/athena-spring-app.git

A Docker image for the Spring Boot service is also available on Docker Hub.

Spring Boot service image available on Docker Hub

Data Lake Data Source

There are endless data sources to build a demonstration data lake on AWS. This post uses the TICKIT sample database provided by AWS and designed for Amazon Redshift, AWS’s cloud data warehousing service. The database consists of seven tables. Two previous posts and associated videos, Building a Data Lake on AWS with Apache Airflow and Building a Data Lake on AWS, detail the setup of the data lake used in this post using AWS Glue and optionally Apache Airflow with Amazon MWAA.

Those two posts use the data lake pattern of segmenting data as bronze (aka raw), silver (aka refined), and gold (aka aggregated), popularized by Databricks. The data lake simulates a typical scenario where data originates from multiple sources, including an e-commerce platform, a CRM system, and a SaaS provider must be aggregated and analyzed.

High-level data lake architecture demonstrated in the previous post

Spring Projects with IntelliJ IDE

Although not a requirement, I used JetBrains IntelliJ IDEA 2022 (Ultimate Edition) to develop and test the post’s Spring Boot service. Bootstrapping Spring projects with IntelliJ is easy. Developers can quickly create a Spring project using the Spring Initializr plugin bundled with the IntelliJ.

JetBrains IntelliJ IDEA plugin support for Spring projects

The Spring Initializr plugin’s new project creation wizard is based on start.spring.io. The plugin allows you to quickly select the Spring dependencies you want to incorporate into your project.

Adding dependencies to a new Spring project in IntelliJ

Visual Studio Code

There are also several Spring extensions for the popular Visual Studio Code IDE, including Microsoft’s Spring Initializr Java Support extension.

Spring Initializr Java Support extension for Visual Studio Code by Microsoft

Gradle

This post uses Gradle instead of Maven to develop, test, build, package, and deploy the Spring service. Based on the packages selected in the new project setup shown above, the Spring Initializr plugin’s new project creation wizard creates a build.gradle file. Additional packages, such as LombakMicrometer, and Rest Assured, were added separately.

plugins {
id 'org.springframework.boot' version '2.7.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
id 'io.freefair.lombok' version '6.5.0-rc1'
}
group = 'aws.example'
version = '1.0.0'
sourceCompatibility = '17'
def awsSdkVersion = '2.17.225'
def springBootVersion = '2.7.1'
def restAssuredVersion = '5.1.1'
repositories {
mavenCentral()
}
dependencies {
// aws sdk
runtimeOnly "software.amazon.awssdk:bom:${awsSdkVersion}"
implementation "software.amazon.awssdk:athena:${awsSdkVersion}"
// spring
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}"
developmentOnly "org.springframework.boot:spring-boot-devtools:${springBootVersion}"
implementation 'org.springdoc:springdoc-openapi-ui:1.6.9'
implementation 'org.springframework:spring-context:5.3.20'
// testings
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
testImplementation "io.rest-assured:rest-assured:${restAssuredVersion}"
testImplementation "io.rest-assured:json-path:${restAssuredVersion}"
testImplementation "io.rest-assured:xml-path:${restAssuredVersion}"
testImplementation "io.rest-assured:json-schema-validator:${restAssuredVersion}"
// monitoring
implementation 'io.micrometer:micrometer-registry-prometheus:1.9.1'
}
tasks.named('test') {
useJUnitPlatform()
}
view raw build.gradle hosted with ❤ by GitHub

Amazon Corretto

The Spring boot service is developed for and compiled with the most recent version of Amazon Corretto 17. According to AWS, Amazon Corretto is a no-cost, multiplatform, production-ready distribution of the Open Java Development Kit (OpenJDK). Corretto comes with long-term support that includes performance enhancements and security fixes. Corretto is certified as compatible with the Java SE standard and is used internally at Amazon for many production services.

Source Code

Each API endpoint in the Spring Boot RESTful Web Service has a corresponding POJO data model class, service interface and service implementation class, and controller class. In addition, there are also common classes such as configuration, a client factory, and Athena-specific request/response methods. Lastly, there are additional class dependencies for views and prepared statements.

Java class relationships related to querying the Amazon Athena refined_tickit_public_category table

The project’s source code is arranged in a logical hierarchy by package and class type.

.
└── com
└── example
└── athena
├── AthenaApplication.java
├── common
│   ├── AthenaClientFactory.java
│   ├── AthenaClientFactoryImp.java
│   ├── AthenaCommon.java
│   ├── NamedQuery.java
│   ├── PreparedStatement.java
│   └── View.java
├── config
│   └── ConfigProperties.java
└── tickit
├── controller
│   ├── BuyerLikesByCategoryController.java
│   ├── CategoryController.java
│   ├── DateDetailController.java
│   ├── EventController.java
│   ├── ListingController.java
│   ├── SaleBySellerController.java
│   ├── SaleController.java
│   ├── SalesByCategoryController.java
│   ├── UserController.java
│   └── VenueController.java
├── model
│   ├── crm
│   │   └── User.java
│   ├── ecomm
│   │   ├── DateDetail.java
│   │   ├── Listing.java
│   │   └── Sale.java
│   ├── resultsets
│   │   ├── BuyerLikesByCategory.java
│   │   ├── SaleBySeller.java
│   │   └── SalesByCategory.java
│   └── saas
│   ├── Category.java
│   ├── Event.java
│   └── Venue.java
└── service
├── BuyerLikesByCategoryServiceImp.java
├── BuyersLikesByCategoryService.java
├── CategoryService.java
├── CategoryServiceImp.java
├── DateDetailService.java
├── DateDetailServiceImp.java
├── EventService.java
├── EventServiceImp.java
├── ListingService.java
├── ListingServiceImp.java
├── SaleBySellerService.java
├── SaleBySellerServiceImp.java
├── SaleService.java
├── SaleServiceImp.java
├── SalesByCategoryService.java
├── SalesByCategoryServiceImp.java
├── UserService.java
├── UserServiceImp.java
├── VenueService.java
└── VenueServiceImp.java

Amazon Athena Access

There are three standard methods for accessing Amazon Athena with the AWS SDK for Java: 1) the AthenaClient service client, 2) the AthenaAsyncClient service client for accessing Athena asynchronously, and 3) using the JDBC driver with the AWS SDK. The AthenaClient and AthenaAsyncClient service clients are both parts of the software.amazon.awssdk.services.athena package. For simplicity, this post’s Spring Boot service uses the AthenaClient service client instead of Java’s asynchronously programming model. AWS supplies basic code samples as part of their documentation as a starting point for writing Athena applications using the SDK. The code samples also use the AthenaClient service client.

POJO-based Data Model Class

For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Plain Old Java Object (POJO). According to Wikipedia, a POGO is an ordinary Java object, not bound by any particular restriction. The POJO class is similar to a JPA Entity, representing persistent data stored in a relational database. In this case, the POJO uses Lombok’s @Data annotation. According to the documentation, this annotation generates getters for all fields, a useful toString method, and hashCode and equals implementations that check all non-transient fields. It also generates setters for all non-final fields and a constructor.

package com.example.athena.tickit.model.saas;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {
private int id;
private int venueId;
private int catId;
private int dateId;
private String name;
private LocalDateTime startTime;
}
view raw Event.java hosted with ❤ by GitHub

Each POJO corresponds directly to a ‘silver’ table in the AWS Glue Data Catalog. For example, the Event POJO corresponds to the refined_tickit_public_event table in the tickit_demo Data Catalog database. The POJO defines the Spring Boot service’s data model for data read from the corresponding AWS Glue Data Catalog table.

Glue Data Catalog refined_tickit_public_event table

The Glue Data Catalog table is the interface between the Athena query and the underlying data stored in Amazon S3 object storage. The Athena query targets the table, which returns the underlying data from S3.

Tickit Category data stored as Apache Parquet files in Amazon S3

Service Class

Retrieving data from the data lake via AWS Glue, using Athena, is handled by a service class. For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Service Interface and implementation class. The service implementation class uses Spring Framework’s @Service annotation. According to the documentation, it indicates that an annotated class is a “Service,” initially defined by Domain-Driven Design (Evans, 2003) as “an operation offered as an interface that stands alone in the model, with no encapsulated state.” Most importantly for the Spring Boot service, this annotation serves as a specialization of @Component, allowing for implementation classes to be autodetected through classpath scanning.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.saas.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class EventServiceImp implements EventService {
private static final Logger logger = LoggerFactory.getLogger(EventServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public EventServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<Event> findAll(Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE eventid IS NOT NULL";
String query = String.format("""
SELECT *
FROM refined_tickit_public_event
%s
ORDER BY eventid
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}
public Event findById(int id) {
String query = String.format("""
SELECT DISTINCT *
FROM refined_tickit_public_event
WHERE eventid=%s""", id);
Event event;
try {
event = startQuery(query).get(0);
} catch (IndexOutOfBoundsException e) {
logger.error(e.getMessage());
return null;
}
return event;
}
private List<Event> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<Event> events = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return events;
}
private List<Event> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<Event> events = new ArrayList<>();
try {
// Max Results can be set but if it's not set,
// it will choose the maximum page size
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Event event = new Event();
event.setId(parseInt(allData.get(0).varCharValue()));
event.setVenueId(parseInt(allData.get(1).varCharValue()));
event.setCatId(parseInt(allData.get(2).varCharValue()));
event.setDateId(parseInt(allData.get(3).varCharValue()));
event.setName(allData.get(4).varCharValue());
event.setStartTime(LocalDateTime.parse(allData.get(5).varCharValue(), formatter));
events.add(event);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return events;
}
}

Using Spring’s common constructor-based Dependency Injection (DI) method (aka constructor injection), the service auto-wires an instance of the AthenaClientFactory interface. Note that we are auto-wiring the service interface, not the service implementation, allowing us to wire in a different implementation if desired, such as for testing.

The service calls the AthenaClientFactoryclass’s createClient() method, which returns a connection to Amazon Athena using one of several available authentication methods. The authentication scheme will depend on where the service is deployed and how you want to securely connect to AWS. Some options include environment variables, local AWS profile, EC2 instance profile, or token from the web identity provider.

return AthenaClient.builder()
.credentialsProvider(EnvironmentVariableCredentialsProvider.create())
.build();

The service class transforms the payload returned by an instance of GetQueryResultsResponse into an ordered collection (also known as a sequence), List<E>, where E represents a POJO. For example, with the data lake’srefined_tickit_public_event table, the service returns a List<Event>. This pattern repeats itself for tables, views, prepared statements, and named queries. Column data types can be transformed and formatted on the fly, new columns added, and existing columns skipped.

List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Event event = new Event();
event.setId(parseInt(allData.get(0).varCharValue()));
event.setVenueId(parseInt(allData.get(1).varCharValue()));
event.setCatId(parseInt(allData.get(2).varCharValue()));
event.setDateId(parseInt(allData.get(3).varCharValue()));
event.setName(allData.get(4).varCharValue());
event.setStartTime(LocalDateTime.parse(allData.get(5).varCharValue(), formatter));
events.add(event);
}
}

For each endpoint defined in the Controller class, for example, get()findAll(), and FindById(), there is a corresponding method in the Service class. Below, we see an example of the findAll() method in the SalesByCategoryServiceImp service class. This method corresponds to the identically named method in the SalesByCategoryController controller class. Each of these service methods follows a similar pattern of constructing a dynamic Athena SQL query based on input parameters, which is passed to Athena through the AthenaClient service client using an instance of GetQueryResultsRequest.

public List<SalesByCategory> findAll(String calendarDate, Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE caldate IS NOT NULL";
if (calendarDate != null) {
whereClause = whereClause + " AND caldate=date('" + calendarDate + "')";
}
String query = String.format("""
SELECT *
FROM tickit_sales_by_day_and_category
%s
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}

Controller Class

Lastly, there is a corresponding Controller class for each API endpoint in the Spring Boot RESTful Web Service. The controller class uses Spring Framework’s @RestController annotation. According to the documentation, this annotation is a convenience annotation that is itself annotated with @Controller and @ResponseBody. Types that carry this annotation are treated as controllers where @RequestMapping methods assume @ResponseBody semantics by default.

The controller class takes a dependency on the corresponding service class application component using constructor-based Dependency Injection (DI). Like the service example above, we are auto-wiring the service interface, not the service implementation.

package com.example.athena.tickit.controller;
import com.example.athena.tickit.model.saas.Event;
import com.example.athena.tickit.service.EventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping(value = "/events")
public class EventController {
private final EventService service;
@Autowired
public EventController(EventService service) {
this.service = service;
}
@RequestMapping(method = RequestMethod.GET)
public ResponseEntity<List<Event>> findAll(
@RequestParam(required = false) Integer limit,
@RequestParam(required = false) Integer offset
) {
List<Event> events = service.findAll(limit, offset);
if (events.size() == 0) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(null);
}
return ResponseEntity.status(HttpStatus.OK).body(events);
}
@RequestMapping(value = "/{id}", method = RequestMethod.GET)
public ResponseEntity<Event> findById(@PathVariable("id") int id) {
Event event = service.findById(id);
if (event == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(null);
}
return ResponseEntity.status(HttpStatus.OK).body(event);
}
}

The controller is responsible for serializing the ordered collection of POJOs into JSON and returning that JSON payload in the body of the HTTP response to the initial HTTP request.

Querying Views

In addition to querying AWS Glue Data Catalog tables (aka Athena tables), we also query views. According to the documentation, a view in Amazon Athena is a logical table, not a physical table. Therefore, the query that defines a view runs each time the view is referenced in a query.

For convenience, each time the Spring Boot service starts, the main AthenaApplication class calls the View.java class’s CreateView() method to check for the existence of the view, view_tickit_sales_by_day_and_category. If the view does not exist, it is created and becomes accessible to all application end-users. The view is queried through the service’s /salesbycategory endpoint.

Java class relationships related to querying the Amazon Athena view

This confirm-or-create pattern is repeated for the prepared statement in the main AthenaApplication class (detailed in the next section).

package com.example.athena;
import com.example.athena.common.PreparedStatement;
import com.example.athena.common.View;
import com.example.athena.config.ConfigProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@SpringBootApplication
@EnableConfigurationProperties(ConfigProperties.class)
public class AthenaApplication {
private final PreparedStatement preparedStatement;
private final View view;
@Autowired
public AthenaApplication(PreparedStatement preparedStatement, View view) {
this.preparedStatement = preparedStatement;
this.view = view;
}
public static void main(String[] args) {
SpringApplication.run(AthenaApplication.class, args);
}
@Bean
void CreatePreparedStatement() {
preparedStatement.CreatePreparedStatement();
}
@Bean
void createView() {
view.CreateView();
}
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**").allowedOrigins("*");
}
};
}
}

Below, we see the View class called by the service at startup.

package com.example.athena.common;
import com.example.athena.config.ConfigProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.GetTableMetadataRequest;
import software.amazon.awssdk.services.athena.model.GetTableMetadataResponse;
import software.amazon.awssdk.services.athena.model.MetadataException;
@Component
public class View {
private static final Logger logger = LoggerFactory.getLogger(View.class);
private final AthenaClientFactory athenaClientFactoryImp;
private final ConfigProperties configProperties;
private final AthenaCommon athenaCommon;
@Autowired
public View(AthenaClientFactory athenaClientFactoryImp,
ConfigProperties configProperties,
AthenaCommon athenaCommon) {
this.athenaClientFactoryImp = athenaClientFactoryImp;
this.configProperties = configProperties;
this.athenaCommon = athenaCommon;
}
public void CreateView() {
String viewName = "view_tickit_sales_by_day_and_category";
String createViewSqlStatement = String.format("""
CREATE VIEW %s AS
SELECT cast(d.caldate AS DATE) AS caldate,
c.catgroup,
c.catname,
sum(round(cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold, 2)) AS saleamount,
sum(cast(s.commission AS DECIMAL(8,2))) AS commission
FROM refined_tickit_public_sales AS s
LEFT JOIN refined_tickit_public_event AS e ON e.eventid = s.eventid
LEFT JOIN refined_tickit_public_date AS d ON d.dateid = s.dateid
LEFT JOIN refined_tickit_public_category AS c ON c.catid = e.catid
GROUP BY caldate,
catgroup,
catname
ORDER BY caldate,
catgroup,
catname;""", viewName);
try (AthenaClient athenaClient = athenaClientFactoryImp.createClient()) {
try {
GetTableMetadataResponse getPreparedStatementRequest = getGetTableMetadataResponse(viewName, athenaClient);
logger.debug(String.format("View already exists: %s", getPreparedStatementRequest.tableMetadata().name()));
} catch (MetadataException e) { // View does not exist
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, createViewSqlStatement);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
// Confirm View was created
GetTableMetadataResponse getPreparedStatementRequest = getGetTableMetadataResponse(viewName, athenaClient);
logger.debug(String.format("View created successfully: %s", getPreparedStatementRequest.tableMetadata().name()));
}
}
}
private GetTableMetadataResponse getGetTableMetadataResponse(String viewName, AthenaClient athenaClient) {
GetTableMetadataRequest getTableMetadataRequest = GetTableMetadataRequest.builder()
.catalogName(configProperties.getCatalog())
.databaseName(configProperties.getDatabase())
.tableName(viewName)
.build();
return athenaClient.getTableMetadata(getTableMetadataRequest);
}
}
view raw View.java hosted with ❤ by GitHub

Aside from the fact the /salesbycategory endpoint queries a view, everything else is identical to querying a table. This endpoint uses the same model-service-controller pattern.

Executing Prepared Statements

According to the documentation, you can use the Athena parameterized query feature to prepare statements for repeated execution of the same query with different query parameters. The prepared statement used by the service, tickit_sales_by_seller, accepts a single parameter, the ID of the seller (sellerid). The prepared statement is executed using the /salesbyseller endpoint. This scenario simulates an end-user of the analytics application who wants to retrieve enriched sales information about their sales.

package com.example.athena.common;
import com.example.athena.config.ConfigProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.CreatePreparedStatementRequest;
import software.amazon.awssdk.services.athena.model.GetPreparedStatementRequest;
import software.amazon.awssdk.services.athena.model.GetPreparedStatementResponse;
import software.amazon.awssdk.services.athena.model.ResourceNotFoundException;
@Component
public class PreparedStatement {
private static final Logger logger = LoggerFactory.getLogger(PreparedStatement.class);
private final AthenaClientFactory athenaClientFactoryImp;
private final ConfigProperties configProperties;
@Autowired
public PreparedStatement(AthenaClientFactory athenaClientFactoryImp, ConfigProperties configProperties) {
this.athenaClientFactoryImp = athenaClientFactoryImp;
this.configProperties = configProperties;
}
public void CreatePreparedStatement() {
String preparedStatementName = "tickit_sales_by_seller";
String preparedStatementSql = """
SELECT cast(d.caldate AS DATE) AS caldate,
s.pricepaid,
s.qtysold,
round(cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold, 2) AS saleamount,
cast(s.commission AS DECIMAL(8,2)) AS commission,
round((cast(s.commission AS DECIMAL(8,2)) / (cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold)) * 100, 2) AS commissionprcnt,
e.eventname,
concat(u1.firstname, ' ', u1.lastname) AS seller,
concat(u2.firstname, ' ', u2.lastname) AS buyer,
c.catgroup,
c.catname
FROM refined_tickit_public_sales AS s
LEFT JOIN refined_tickit_public_listing AS l ON l.listid = s.listid
LEFT JOIN refined_tickit_public_users AS u1 ON u1.userid = s.sellerid
LEFT JOIN refined_tickit_public_users AS u2 ON u2.userid = s.buyerid
LEFT JOIN refined_tickit_public_event AS e ON e.eventid = s.eventid
LEFT JOIN refined_tickit_public_date AS d ON d.dateid = s.dateid
LEFT JOIN refined_tickit_public_category AS c ON c.catid = e.catid
WHERE s.sellerid = ?
ORDER BY caldate,
eventname;""";
try (AthenaClient athenaClient = athenaClientFactoryImp.createClient()) {
try {
GetPreparedStatementResponse getPreparedStatementResponse = getGetPreparedStatementResponse(preparedStatementName, athenaClient);
logger.debug(String.format("Prepared statement already exists: %s", getPreparedStatementResponse.preparedStatement().statementName()));
} catch (ResourceNotFoundException e) { // PreparedStatement does not exist
CreatePreparedStatementRequest createPreparedStatementRequest = CreatePreparedStatementRequest.builder()
.statementName(preparedStatementName)
.description("Returns all sales by seller based on the seller's userid")
.workGroup(configProperties.getWorkGroup())
.queryStatement(preparedStatementSql).build();
athenaClient.createPreparedStatement(createPreparedStatementRequest);
// Confirm PreparedStatement was created
GetPreparedStatementResponse getPreparedStatementResponse = getGetPreparedStatementResponse(preparedStatementName, athenaClient);
logger.debug(String.format("Prepared statement created successfully: %s", getPreparedStatementResponse.preparedStatement().statementName()));
}
}
}
private GetPreparedStatementResponse getGetPreparedStatementResponse(String preparedStatementName, AthenaClient athenaClient) {
GetPreparedStatementRequest getPreparedStatementRequest = GetPreparedStatementRequest.builder()
.statementName(preparedStatementName)
.workGroup(configProperties.getWorkGroup()).build();
return athenaClient.getPreparedStatement(getPreparedStatementRequest);
}
}

The pattern of querying data is similar to tables and views, except instead of using the common SELECT...FROM...WHERE SQL query pattern, we use the EXECUTE...USING pattern.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.tickit.model.resultsets.SaleBySeller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
@Service
public class SaleBySellerServiceImp implements SaleBySellerService {
private static final Logger logger = LoggerFactory.getLogger(SaleBySellerServiceImp.class);
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public SaleBySellerServiceImp(AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<SaleBySeller> find(int id) {
String query = String.format("""
EXECUTE tickit_sales_by_seller USING %s;""", id);
return startQuery(query);
}
private List<SaleBySeller> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<SaleBySeller> saleBySellers = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return saleBySellers;
}
private List<SaleBySeller> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<SaleBySeller> saleBySellers = new ArrayList<>();
try {
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
SaleBySeller saleBySeller = new SaleBySeller();
saleBySeller.setCalDate(LocalDate.parse(allData.get(0).varCharValue()));
saleBySeller.setPricePaid(new BigDecimal(allData.get(1).varCharValue()));
saleBySeller.setQtySold(Integer.parseInt(allData.get(2).varCharValue()));
saleBySeller.setSaleAmount(new BigDecimal(allData.get(3).varCharValue()));
saleBySeller.setCommission(new BigDecimal(allData.get(4).varCharValue()));
saleBySeller.setCommissionPrcnt(Double.valueOf(allData.get(5).varCharValue()));
saleBySeller.setEventName(allData.get(6).varCharValue());
saleBySeller.setSeller(allData.get(7).varCharValue());
saleBySeller.setBuyer(allData.get(8).varCharValue());
saleBySeller.setCatGroup(allData.get(9).varCharValue());
saleBySeller.setCatName(allData.get(10).varCharValue());
saleBySellers.add(saleBySeller);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return saleBySellers;
}
}

For example, to execute the prepared statement for a seller with an ID of 3, we would use EXECUTE tickit_sales_by_seller USING 3;. We pass the seller’s ID of 3 as a path parameter similar to other endpoints exposed by the service: /v1/salesbyseller/3.

Sales by seller query results from Athena using the seller’s ID as a parameter for the prepared statement

Again, aside from the fact the /salesbyseller endpoint executes a prepared statement and passes a parameter; everything else is identical to querying a table or a view, using the same model-service-controller pattern.

Working with Named Queries

In addition to tables, views, and prepared statements, Athena has the concept of saved queries, referred to as named queries in the Athena API and when using AWS CloudFormation. You can use the Athena console or API to save, edit, run, rename, and delete queries. The queries are persisted using a NamedQueryId, a unique identifier (UUID) of the query. You must reference the NamedQueryId when working with existing named queries.

Example of saved query (named query) used in this post

There are multiple ways to use and reuse existing named queries programmatically. For this demonstration, I created the named query, buyer_likes_by_category, in advance and then stored the resulting NamedQueryId as an application property, injected at runtime or kubernetes deployment time through a local environment variable.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.resultsets.BuyerLikesByCategory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class BuyerLikesByCategoryServiceImp implements BuyersLikesByCategoryService {
private static final Logger logger = LoggerFactory.getLogger(BuyerLikesByCategoryServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public BuyerLikesByCategoryServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<BuyerLikesByCategory> get() {
return getNamedQueryResults(configProperties.getNamedQueryId());
}
private List<BuyerLikesByCategory> getNamedQueryResults(String queryId) {
logger.debug(String.format("NamedQueryId: %s", queryId));
AthenaClient athenaClient = athenaClientFactory.createClient();
GetNamedQueryRequest getNamedQueryRequest = GetNamedQueryRequest.builder()
.namedQueryId(queryId)
.build();
GetNamedQueryResponse getNamedQueryResponse = athenaClient.getNamedQuery(getNamedQueryRequest);
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, getNamedQueryResponse.namedQuery().queryString());
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<BuyerLikesByCategory> buyerLikesByCategories = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return buyerLikesByCategories;
}
private List<BuyerLikesByCategory> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<BuyerLikesByCategory> buyerLikesByCategories = new ArrayList<>();
try {
// Max Results can be set but if it's not set,
// it will choose the maximum page size
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
BuyerLikesByCategory buyerLikesByCategory = new BuyerLikesByCategory();
buyerLikesByCategory.setSports(parseInt(allData.get(0).varCharValue()));
buyerLikesByCategory.setTheatre(parseInt(allData.get(1).varCharValue()));
buyerLikesByCategory.setConcerts(parseInt(allData.get(2).varCharValue()));
buyerLikesByCategory.setJazz(parseInt(allData.get(3).varCharValue()));
buyerLikesByCategory.setClassical(parseInt(allData.get(4).varCharValue()));
buyerLikesByCategory.setOpera(parseInt(allData.get(5).varCharValue()));
buyerLikesByCategory.setRock(parseInt(allData.get(6).varCharValue()));
buyerLikesByCategory.setVegas(parseInt(allData.get(7).varCharValue()));
buyerLikesByCategory.setBroadway(parseInt(allData.get(8).varCharValue()));
buyerLikesByCategory.setMusicals(parseInt(allData.get(9).varCharValue()));
buyerLikesByCategories.add(buyerLikesByCategory);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return buyerLikesByCategories;
}
}

Alternately, you might iterate through a list of named queries to find one that matches the name at startup. However, this method would undoubtedly impact service performance, startup time, and cost. Lastly, you could use a method like NamedQuery() included in the unused NamedQuery class at startup, similar to the view and prepared statement. That named query’s unique NamedQueryId would be persisted as a system property, referencable by the service class. The downside is that you would create a duplicate of the named query each time you start the service. Therefore, this method is also not recommended.

Configuration

Two components responsible for persisting configuration for the Spring Boot service are the application.yml properties file and ConfigProperties class. The class uses Spring Framework’s @ConfigurationProperties annotation. According to the documentation, this annotation is used for externalized configuration. Add this to a class definition or a @Bean method in a @Configuration class if you want to bind and validate some external Properties (e.g., from a .properties or .yml file). Binding is performed by calling setters on the annotated class or, if @ConstructorBinding in use, by binding to the constructor parameters.

The @ConfigurationProperties annotation includes the prefix of athena. This value corresponds to the athena prefix in the the application.yml properties file. The fields in the ConfigProperties class are bound to the properties in the the application.yml. For example, the property, namedQueryId, is bound to the property, athena.named.query.id. Further, that property is bound to an external environment variable, NAMED_QUERY_ID. These values could be supplied from an external configuration system, a Kubernetes secret, or external secrets management system.

spring:
profiles:
active: dev
server:
port: 8080
servlet:
contextPath: /v1
athena:
region: us-east-1
workgroup: primary
catalog: AwsDataCatalog
database: tickit_demo
limit: 25
client-execution-timeout: 100000
retry-sleep: 1000
results-bucket: ${RESULTS_BUCKET}
named-query-id: ${NAMED_QUERY_ID}
spring:
config:
activate:
on-profile: dev
logging:
level:
root: DEBUG
management:
endpoints:
web:
exposure:
include: '*'
jmx:
exposure:
include: '*'
spring:
config:
activate:
on-profile: prod
logging:
level:
root: INFO
management:
endpoints:
web:
exposure:
include: health, prometheus
jmx:
exposure:
include: health
view raw application.yml hosted with ❤ by GitHub

AWS IAM: Authentication and Authorization

For the Spring Boot service to interact with Amazon Athena, AWS Glue, and Amazon S3, you need to establish an AWS IAM Role, which the service assumes once authenticated. The Role must be associated with an attached IAM Policy containing the requisite Athena, Glue, and S3 permissions. For development, the service uses a policy similar to the one shown below. Please note this policy is broader than recommended for Production; it does not represent the security best practice of least privilege. In particular, the use of the overly-broad * for Resources should be strictly avoided when creating policies.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:CreatePreparedStatement",
"athena:ListPreparedStatements",
"glue:CreateTable",
"athena:CreateNamedQuery",
"athena:ListNamedQueries",
"athena:GetTableMetadata",
"athena:GetPreparedStatement",
"athena:GetQueryResults",
"athena:GetQueryExecution",
"athena:GetNamedQuery"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:BatchGetPartition",
"glue:GetTable"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject",
"s3:PutBucketPublicAccessBlock"
],
"Resource": [
"arn:aws:s3:::aws-athena-query-results-*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::date-lake-demo-*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:ListAllMyBuckets"
],
"Resource": [
"*"
]
}
]
}

In addition to the authorization granted by the IAM Policy, AWS Lake Formation can be used with Amazon S3, AWS Glue, and Amazon Athena to grant fine-grained database-, table-, column-, and row-level access to datasets.

Swagger UI and the OpenAPI Specification

The easiest way to view and experiment with all the endpoints available through the controller classes is using the Swagger UI, included in the example Spring Boot service, by way of the springdoc-openapi Java library. The Swagger UI is accessed at /v1/swagger-ui/index.html.

Swagger UI showing endpoints exposed by the service’s controller classes

The OpenAPI Specification (formerly Swagger Specification) is an API description format for REST APIs. The /v1/v3/api-docs endpoint allows you to generate an OpenAPI v3 specification file. The OpenAPI file describes the entire API.

Spring Boot service’s OpenAPI v3 specification

The OpenAPI v3 specification can be saved as a file and imported into applications like Postman, the API platform for building and using APIs.

Calling the service’s /users API endpoint using Postman
Running a suite of integration tests against the Spring Boot service using Postman

Integration Tests

Included in the Spring Boot service’s source code is a limited number of example integration tests, not to be confused with unit tests. Each test class uses Spring Framework’s @SpringBootTest annotation. According to the documentation, this annotation can be specified on a test class that runs Spring Boot-based tests. It provides several features over and above the regular Spring TestContext Framework.

package com.example.athena;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import static io.restassured.RestAssured.get;
import static io.restassured.RestAssured.given;
import static io.restassured.http.ContentType.JSON;
@SpringBootTest
class CategoriesResourceTests {
private static final String ResourcePath = "/v1/categories";
private static final int resultsetLimit = 25;
@Test
void findAll() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.greaterThanOrEqualTo(1))
.body("$.size()", Matchers.lessThanOrEqualTo(resultsetLimit));
}
@Test
void findAllWithLimit() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("limit", 3)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.equalTo(3));
}
@Test
void findAllWithOffset() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("offset", 2)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.greaterThanOrEqualTo(1))
.body("$.size()", Matchers.lessThanOrEqualTo(resultsetLimit));
}
@Test
void findAllWithLimitAndOffset() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("limit", 3)
.queryParam("offset", 2)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.equalTo(3));
}
@Test
void findById() {
// Get the first 'id' available
int id = get(ResourcePath + "?limit=1")
.then()
.extract()
.path("[0].id");
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath + "/{id}", id)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("id", Matchers.equalTo(id));
}
}

The integration tests use Rest Assured’s given-when-then pattern of testing, made popular as part of Behavior-Driven Development (BDD). In addition, each test uses the JUnit’s @Test annotation. According to the documentation, this annotation signals that the annotated method is a test method. Therefore, methods using this annotation must not be private or static and must not return a value.

@Test
void findById() {
// Get the first 'id' available
int id = get(ResourcePath + "?limit=1")
.then()
.extract()
.path("[0].id");
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath + "/{id}", id)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("id", Matchers.equalTo(id));
}

Run the integration tests using Gradle from the project’s root: ./gradlew clean build test. A detailed ‘Test Summary’ is produced in the project’s build directory as HTML for easy review.

Test Details
Test Details

Load Testing the Service

In Production, the Spring Boot service will need to handle multiple concurrent users executing queries against Amazon Athena.

Athena’s Recent Queries console shows multi concurrent queries being queued and executed

We could use various load testing tools to evaluate the service’s ability to handle multiple concurrent users. One of the simplest is my favorite go-based utility, hey, which sends load to a URL using a provided number of requests in the provided concurrency level and prints stats. It also supports HTTP2 endpoints. So, for example, we could execute 500 HTTP requests with a concurrency level of 25 against the Spring Boot service’s /users endpoint using hey. The post’s integration tests were run against three Kubernetes replica pods of the service deployed to Amazon EKS.

hey -n 500 -c 25 -T "application/json;charset=UTF-8" \
-h2 https://athena.example-api.com/v1/users

From Athena’s Recent Queries console, we see many simultaneous queries being queued and executed by a hey through the Spring Boot service’s endpoint.

Athena’s Recent Queries console shows simultaneous queries being queued and executed

Metrics

The Spring Boot service implements the micrometer-registry-prometheus extension. The Micrometer metrics library exposes runtime and application metrics. Micrometer defines a core library, providing a registration mechanism for metrics and core metric types. These metrics are exposed by the service’s /v1/actuator/prometheus endpoint.

Metrics exposed using the Prometheus endpoint

Using the Micrometer extension, metrics exposed by the /v1/actuator/prometheus endpoint can be scraped and visualized by tools such as Prometheus. Conveniently, AWS offers the fully-managed Amazon Managed Service for Prometheus (AMP), which easily integrates with Amazon EKS.

Graph of HTTP server requests scraped by Prometheus from the Spring Boot service

Using Prometheus as a datasource, we can build dashboards in Grafana to observe the service’s metrics. Like AMP, AWS also offers the fully-managed Amazon Managed Grafana (AMG).

Grafana dashboard showing metrics from Prometheus for Spring Boot service deployed to Amazon EKS
Grafana dashboard showing JVM metrics from Prometheus for Spring Boot service deployed to Amazon EKS

Conclusion

This post taught us how to create a Spring Boot RESTful Web Service, allowing end-user applications to securely query data stored in a data lake on AWS. The service used AWS SDK for Java to access data stored in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author unless otherwise noted.

, , , , , ,

Leave a comment

End-to-End Data Discovery, Observability, and Governance on AWS with LinkedIn’s Open-source DataHub

Use DataHub’s data catalog capabilities to collect, organize, enrich, and search for metadata across multiple platforms

Introduction

According to Shirshanka Das, Founder of LinkedIn DataHub, Apache Gobblin, and Acryl Data, one of the simplest definitions for a data catalog can be found on the Oracle website: “Simply put, a data catalog is an organized inventory of data assets in the organization. It uses metadata to help organizations manage their data. It also helps data professionals collect, organize, access, and enrich metadata to support data discovery and governance.

Another succinct description of a data catalog’s purpose comes from Alation: “a collection of metadata, combined with data management and search tools, that helps analysts and other data users to find the data that they need, serves as an inventory of available data, and provides information to evaluate the fitness of data for intended uses.

Working with many organizations in the area of Analytics, one of the more common requests I receive regards choosing and implementing a data catalog. Organizations have datasources hosted in corporate data centers, on AWS, by SaaS providers, and with other Cloud Service Providers. Several of these organizations have recently gravitated to DataHub, the open-source metadata platform for the modern data stack, originally developed by LinkedIn.

View of DataHub’s home screen showing a variety of datasources

In this post, we will explore the capabilities of DataHub to build a centralized data catalog on AWS for datasources hosted in multiple AWS accounts, SaaS providers, cloud service providers, and corporate data centers. I will demonstrate how to build a DataHub data catalog using out-of-the-box data source plugins for automated metadata ingestion.

Another example of searching for cataloged entities in DataHub’s browser-based UI

Data Catalog Competitors

Data catalogs are not new; technologies such as data dictionaries have been around as far back as the 1980’s. Gartner publishes their Metadata Management (EMM) Solutions Reviews and Ratings and Metadata Management Magic Quadrant. These reports contain a comprehensive list of traditional commercial enterprise players, modern cloud-native SaaS vendors, and Cloud Service Provider (CSP) offerings. DBMS Tools also hosts a comprehensive list of 30 data catalogs. A sampling of current data catalogs includes:

Open Source Software

Commercial

Cloud Service Providers

Data Catalog Features

DataHub describes itself as “a modern data catalog built to enable end-to-end data discovery, data observability, and data governance.” Sorting through vendor’s marketing jargon and hype, standard features of leading data catalogs include:

  • Metadata ingestion
  • Data discovery
  • Data governance
  • Data observability
  • Data lineage
  • Data dictionary
  • Data classification
  • Usage/popularity statistics
  • Sensitive data handling
  • Data fitness (aka data quality or data profiling)
  • Manage both technical and business metadata
  • Business glossary
  • Tagging
  • Natively supported datasource integrations
  • Advanced metadata search
  • Fine-grain authentication and authorization
  • UI- and API-based interaction

Datasources

When considering a data catalog solution, in my experience, the most common datasources that customers want to discover, inventory, and search include:

  • Relational databases and other OLTP datasources such as PostgreSQL, MySQL, Microsoft SQL Server, and Oracle
  • Cloud Data Warehouses and other OLAP datasources such as Amazon Redshift, Snowflake, and Google BigQuery
  • NoSQL datasources such as MongoDB, MongoDB Atlas, and Azure Cosmos DB
  • Persistent event-streaming platforms such as Apache Kafka (Amazon MSK and Confluent)
  • Distributed storage datasets (e.g., Data Lakes) such as Amazon S3, Apache Hive, and AWS Glue Data Catalogs
  • Business Intelligence (BI), dashboards, and data visualization sources such as Looker, Tableau, and Microsoft Power BI
  • ETL sources, such as Apache Spark, Apache Airflow, Apache NiFi, and dbt

DataHub on AWS

DataHub’s convenient AWS setup guide covers options to deploy DataHub to AWS. For this post, I have hosted DataHub on Kubernetes, using Amazon Elastic Kubernetes Service (Amazon EKS). Alternately, you could choose Google Kubernetes Engine (GKE) on Google Cloud or Azure Kubernetes Service (AKS) on Microsoft Azure.

Conveniently, DataHub offers a Helm chart, making deployment to Kubernetes straightforward. Furthermore, Helm charts are easily integrated with popular CI/CD tools. For this post, I’ve used ArgoCD, the declarative GitOps continuous delivery tool for Kubernetes, to deploy the DataHub Helm charts to Amazon EKS.

ArgoCD UI showing DataHub and its dependencies deployed to Amazon EKS

According to the documentation, DataHub consists of four main components: GMS, MAE Consumer (optional), MCE Consumer (optional), and Frontend. Kubernetes deployment for each of the components is defined as sub-charts under the main DataHub Helm chart.

External Storage Layer Dependencies

Four external storage layer dependencies power the main DataHub components: Kafka, Local DB (MySQL, Postgres, or MariaDB), Search Index (Elasticsearch), and Graph Index (Neo4j or Elasticsearch). DataHub has provided a separate DataHub Prerequisites Helm chart for the dependencies. The dependencies must be deployed before deploying DataHub.

Alternately, you can substitute AWS managed services for the external storage layer dependencies, which is also detailed in the Deploying to AWS documentation. AWS managed service dependency substitutions include Amazon RDS for MySQL, Amazon OpenSearch (fka Amazon Elasticsearch), and Amazon Managed Streaming for Apache Kafka (Amazon MSK). According to DataHub, support for using AWS Neptune as the Graph Index is coming soon.

DataHub CLI and Plug-ins

DataHub comes with the datahub CLI, allowing you to perform many common operations on the command line. You can install and use the DataHub CLI within your development environment or integrate it with your CI/CD tooling.

Available DataHub CLI commands

DataHub uses a plugin architecture. Plugins allow you to install only the datasource dependencies you need. For example, if you want to ingest metadata from Amazon Athena, just install the Athena plugin: pip install 'acryl-datahub[athena]'. DataHub Source, Sink, and Transformer plugins can be displayed using the datahub check plugins CLI command.

Example list of DataHub Source plugins installed
Example list of DataHub Sink and Transformer plugins installed

Secure Metadata Ingestion

Often, datasources are not externally accessible for security reasons. Further, many datasources may not be accessible to individual users, especially in higher environments like UAT, Staging, and Production. They are only accessible to applications or CI/CD tooling. To overcome these limitations when extracting metadata with DataHub, I prefer to perform my DataHub-related development and testing locally but execute all DataHub ingestion securely on AWS.

In my local development environment, I use JetBrains PyCharm to author the Python and YAML-based DataHub configuration files and ingestion pipeline recipes, then commit those files to git and push them to a private GitHub repository. Finally, I use GitHub Actions to test DataHub files.

To run DataHub ingestion jobs and push the results to DataHub running in Kubernetes on Amazon EKS, I have built a custom Python-based Docker container. The container runs the DataHub CLI, required DataHub plugins, and any additional Python dependencies. The container’s pod has the appropriate AWS IAM permissions, using IAM Roles for Service Accounts (IRSA), to securely access datasources to ingest and the DataHub application.

Schedule and Monitor Pipelines

Scheduling and managing multiple metadata ingestion jobs on AWS is best handled with Apache Airflow with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Ingestion jobs run as Airflow DAG tasks, which call the EKS-based DataHub CLI container. With MWAA, datasource connections, credentials, and other sensitive configurations can be kept secure and not be exposed externally or in plain text.

When running the ingestion pipelines on AWS with DataHub, all communications between AWS-based datasources, ingestion jobs running in Airflow, and DataHub, should use secure private IP addressing and DNS resolution instead of transferring metadata over the Internet. Make sure to create all the necessary VPC peering connections, network route table configurations, and VPC endpoints to connect all relevant services.

SaaS services such as Snowflake or MongoDB Atlas, services provided by other Cloud Service Providers such as Google Cloud and Microsoft Azure, and datasources in corporate datasources require alternate networking and security strategies to access metadata securely.

AWS-based DataHub high-level architecture

Markup or Code?

According to the documentation, a DataHub recipe is a configuration file that tells ingestion scripts where to pull data from (source) and where to put it (sink). Recipes normally contain a source, sink, and transformers configuration section. Mark-up language-based job automation written in YAML, JSON, or Domain Specific Languages (DSLs) is often an alternative to writing code. DataHub recipes can be written in YAML. The example recipe shown below is used to ingest metadata from an Amazon RDS for PostgreSQL database, running on AWS.

YAML-based recipes can also use automatic environment variable expansion for convenience, automation, and security. It is considered best practice to secure sensitive configuration values, such as database credentials, in a secure location and reference them as environment variables. For example, note the server: ${DATAHUB_REST_ENDPOINT} entry in the sink section below. The DATAHUB_REST_ENDPOINT environment variable is set ahead of time and re-used for all ingestion jobs. Sensitive database connection information has also been variablized and stored separately.

# Purpose: DataHub example recipe for PostgreSQL datasource
# Author: Gary A. Stafford
# Date: March 2022
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/postgres
source:
type: postgres
config:
# Coordinates
host_port: ${DB_HOST_PORT}
database: tickit
# Credentials
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
# Options
profiling:
enabled: true
# Environment
env: DEV
# see https://datahubproject.io/docs/metadata-ingestion/transformers/#adding-a-set-of-tags
transformers:
type: "simple_add_dataset_tags"
config:
tag_urns:
"urn:li:tag:AWS"
"urn:li:tag:${ACCOUNT_ID}"
"urn:li:tag:us-east-1"
type: "pattern_add_dataset_terms"
config:
term_pattern:
rules:
".*users.*": ["urn:li:glossaryTerm:Classification.Sensitive"]
type: "simple_add_dataset_ownership"
config:
owner_urns:
"urn:li:corpuser:Database Administrators"
ownership_type: "DATAOWNER"
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
config:
server: ${DATAHUB_REST_ENDPOINT}
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/reporting_telemetry/
pipeline_name: "postgres-pipeline-tickit"
reporting:
type: "datahub"
config:
datahub_api:
server: ${DATAHUB_REST_ENDPOINT}

Using Python

You can configure and run a pipeline entirely from within a custom Python script using DataHub’s Python API as an alternative to YAML. Below, we see two nearly identical ingestion recipes to the YAML above, written in Python. Writing ingestion pipeline logic programmatically gives you increased flexibility for automation, error checking, unit-testing, and notification. Below is a basic pipeline written in Python. The code is functional, but not very Pythonic, secure, scalable, or Production ready.

# Purpose: Simple programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
# Reference: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/library/programatic_pipeline.py
from datahub.ingestion.run.pipeline import Pipeline
# The pipeline configuration is similar to the recipe YAML files provided to the CLI tool.
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": "demo-instance.abcd1234.us-east-1.rds.amazonaws.com:5432",
"database": "tickit",
"username": "datahub",
"password": "My5up3r53cr3tPa55w0rd",
"env": "DEV",
"profiling": {
"enabled": "true"
}
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:AWS",
f"urn:li:tag:111222333444",
f"urn:li:tag:us-east-1"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:Database Administrators"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://192.168.111.222:33333&quot;
}
}
}
)
# Run the pipeline and report the results.
pipeline.run()
pipeline.pretty_print_summary()

The second version of the same pipeline is more Production ready. The code is more Pythonic in nature and makes use of error checking, logging, and the AWS Systems Manager (SSM) Parameter Store. Like recipes written in YAML, environment variables can be used for convenience and security. In this example, commonly reused and sensitive connection configuration items have been extracted and placed in the SSM Parameter Store. Additional configuration is pulled from the environment, such as AWS Account ID and AWS Region. The script loads these values at runtime.

# Purpose: Programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
import json
import logging
import boto3
from botocore.exceptions import ClientError
from datahub.ingestion.run.pipeline import Pipeline
logging.basicConfig(
format="[%(asctime)s] %(levelname)s – %(message)s", level=logging.INFO
)
def main():
sts_client = boto3.client("sts")
params = get_parameters()
params["owner"] = "Database Administrators"
params["environment"] = "DEV"
params["database"] = "tickit"
params["region"] = sts_client.meta.region_name
params["account"] = sts_client.get_caller_identity()["Account"]
logging.info(f"Params: {json.dumps(params, indent=4, sort_keys=True)}")
ingestion_pipeline = create_pipeline(params)
run_pipeline(ingestion_pipeline)
def create_pipeline(params) -> Pipeline:
"""Constructs a Pipeline for a PostgreSQL Source and a DataHub Sink
:return: instance of datahub.ingestion.run.pipeline
"""
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": params.get("/datahub_demo/postgres_host_port_tickit"),
"database": params.get("database"),
"username": params.get("/datahub_demo/postgres_username_tickit"),
"password": params.get("/datahub_demo/postgres_password_tickit"),
"profiling": {
"enabled": "true"
},
"env": params.get("environment"),
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:{params.get('account')}",
f"urn:li:tag:{params.get('region')}"
]
}
},
{