Posts Tagged Analytics
Building Data Lakes on AWS with Kafka Connect, Debezium, Apicurio Registry, and Apache Hudi
Posted by Gary A. Stafford in Analytics, AWS, Cloud on February 28, 2023
Learn how to build a near real-time transactional data lake on AWS using a combination of Open Source Software (OSS) and AWS Services
Introduction
In the following post, we will explore one possible architecture for building a near real-time transactional data lake on AWS. The data lake will be built using a combination of open source software (OSS) and fully-managed AWS services. Red Hat’s Debezium, Apache Kafka, and Kafka Connect will be used for change data capture (CDC). In addition, Apache Spark, Apache Hudi, and Hudi’s DeltaStreamer will be used to manage the data lake. To complete our architecture, we will use several fully-managed AWS services, including Amazon RDS, Amazon MKS, Amazon EKS, AWS Glue, and Amazon EMR.

Source Code
The source code, configuration files, and a list of commands shown in this post are open-sourced and available on GitHub.
Kafka
According to the Apache Kafka documentation, “Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.” For this post, we will use Apache Kafka as the core of our change data capture (CDC) process. According to Wikipedia, “change data capture (CDC) is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. CDC is an approach to data integration that is based on the identification, capture, and delivery of the changes made to enterprise data sources.” We will discuss CDC in greater detail later in the post.
There are several options for Apache Kafka on AWS. We will use AWS’s fully-managed Amazon Managed Streaming for Apache Kafka (Amazon MSK) service. Alternatively, you could choose industry-leading SaaS providers, such as Confluent, Aiven, Redpanda, or Instaclustr. Lastly, you could choose to self-manage Apache Kafka on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS).
Kafka Connect
According to the Apache Kafka documentation, “Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency.”
There are multiple options for Kafka Connect on AWS. You can use AWS’s fully-managed, serverless Amazon MSK Connect. Alternatively, you could choose a SaaS provider or self-manage Kafka Connect yourself on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). I am not a huge fan of Amazon MSK Connect during development. In my opinion, iterating on the configuration for a new source and sink connector, especially with transforms and external registry dependencies, can be painfully slow and time-consuming with MSK Connect. I find it much faster to develop and fine-tune my sink and source connectors using a self-managed version of Kafka Connect. For Production workloads, you can easily port the configuration from the native Kafka Connect connector to MSK Connect. I am using a self-managed version of Kafka Connect for this post, running in Amazon EKS.

Debezium
According to the Debezium documentation, “Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases.” Regarding Kafka Connect, according to the Debezium documentation, “Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS). Connectors record the history of data changes in the DBMS by detecting changes as they occur and streaming a record of each change event to a Kafka topic. Consuming applications can then read the resulting event records from the Kafka topic.”
Source Connectors
We will use Kafka Connect along with three Debezium connectors, MySQL, PostgreSQL, and SQL Server, to connect to three corresponding Amazon Relational Database Service (RDS) databases and perform CDC. Changes from the three databases will be represented as messages in separate Kafka topics. In Kafka Connect terminology, these are referred to as Source Connectors. According to Confluent.io, a leader in the Kafka community, “source connectors ingest entire databases and stream table updates to Kafka topics.”
Sink Connector
We will stream the data from the Kafka topics into Amazon S3 using a sink connector. Again, according to Confluent.io, “sink connectors deliver data from Kafka topics to secondary indexes, such as Elasticsearch, or batch systems such as Hadoop for offline analysis.” We will use Confluent’s Amazon S3 Sink Connector for Confluent Platform. We can use Confluent’s sink connector without depending on the entire Confluent platform.
There is also an option to use the Hudi Sink Connector for Kafka, which promises to greatly simplify the processes described in this post. However, the RFC for this Hudi feature appears to be stalled. Last updated in August 2021, the RFC is still in the initial “Under Discussion” phase. Therefore, I would not recommend the connectors use in Production until it is GA (General Availability) and gains broader community support.
Securing Database Credentials
Whether using Amazon MSK Connect or self-managed Kafka Connect, you should ensure your database, Kafka, and schema registry credentials, and other sensitive configuration values are secured. Both MSK Connect and self-managed Kafka Connect can integrate with configuration providers that implement the ConfigProvider class interface, such as AWS Secrets Manager, HashiCorp Vault, Microsoft Azure Key Vault, and Google Cloud Secrets Manager.

For self-managed Kafka Connect, I prefer Jeremy Custenborder’s kafka-config-provider-aws plugin. This plugin provides integration with AWS Secrets Manager. A complete list of Jeremy’s providers can be found on GitHub. Below is a snippet of the Secrets Manager configuration from the connect-distributed.properties
files, which is read by Apache Kafka Connect at startup.
https://garystafford.medium.com/media/db6ff589c946fcd014546c56d0255fe5
Apache Avro
The message in the Kafka topic and corresponding objects in Amazon S3 will be stored in Apache Avro format by the CDC process. The Apache Avro documentation states, “Apache Avro is the leading serialization format for record data, and the first choice for streaming data pipelines.” Avro provides rich data structures and a compact, fast, binary data format.
Again, according to the Apache Avro documentation, “Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.” When Avro data is stored in a file, its schema can be stored with it so any program may process files later.
Alternatively, the schema can be stored separately in a schema registry. According to Apicurio Registry, “in the messaging and event streaming world, data that are published to topics and queues often must be serialized or validated using a Schema (e.g. Apache Avro, JSON Schema, or Google protocol buffers). Schemas can be packaged in each application, but it is often a better architectural pattern to instead register them in an external system [schema registry] and then referenced from each application.”
Schema Registry
Several leading open-source and commercial schema registries exist, including Confluent Schema Registry, AWS Glue Schema Registry, and Red Hat’s open-source Apicurio Registry. In this post, we use a self-managed version of Apicurio Registry running on Amazon EKS. You can relatively easily substitute AWS Glue Schema Registry if you prefer a fully-managed AWS service.
Apicurio Registry
According to the documentation, Apicurio Registry supports adding, removing, and updating OpenAPI, AsyncAPI, GraphQL, Apache Avro, Google protocol buffers, JSON Schema, Kafka Connect schema, WSDL, and XML Schema (XSD) artifact types. Furthermore, content evolution can be controlled by enabling content rules, including validity and compatibility. Lastly, the registry can be configured to store data in various backend storage systems depending on the use case, including Kafka (e.g., Amazon MSK), PostgreSQL (e.g., Amazon RDS), and Infinispan (embedded).

Data Lake Table Formats
Three leading open-source, transactional data lake storage frameworks enable building data lake and data lakehouse architectures: Apache Iceberg, Linux Foundation Delta Lake, and Apache Hudi. They offer comparable features, such as ACID-compliant transactional guarantees, time travel, rollback, and schema evolution. Using any of these three data lake table formats will allow us to store and perform analytics on the latest version of the data in the data lake originating from the three Amazon RDS databases.
Apache Hudi
According to the Apache Hudi documentation, “Apache Hudi is a transactional data lake platform that brings database and data warehouse capabilities to the data lake.” The specifics of how the data is laid out as files in your data lake depends on the Hudi table type you choose, either Copy on Write (CoW) or Merge On Read (MoR).
Like Apache Iceberg and Delta Lake, Apache Hudi is partially supported by AWS’s Analytics services, including AWS Glue, Amazon EMR, Amazon Athena, Amazon Redshift Spectrum, and AWS Lake Formation. In general, CoW has broader support on AWS than MoR. It is important to understand the limitations of Apache Hudi with each AWS analytics service before choosing a table format.
If you are looking for a fully-managed Cloud data lake service built on Hudi, I recommend Onehouse. Born from the roots of Apache Hudi and founded by its original creator, Vinoth Chandar (PMC chair of the Apache Hudi project), the Onehouse product and its services leverage OSS Hudi to offer a data lake platform similar to what companies like Uber have built.
Hudi DeltaStreamer
Hudi also offers DeltaStreamer (aka HoodieDeltaStreamer) for streaming ingestion of CDC data. DeltaStreamer can be run once or continuously, using Apache Spark, similar to an Apache Spark Structured Streaming job, to capture changes to the raw CDC data and write that data to a different part of our data lake.
DeltaStreamer also works with Amazon S3 Event Notifications instead of running continuously. According to DeltaStreamer’s documentation, Amazon S3 object storage provides an event notification service to post notifications when certain events happen in your S3 bucket. AWS will put these events in Amazon Simple Queue Service (Amazon SQS). Apache Hudi provides an S3EventsSource
that can read from Amazon SQS to trigger and process new or changed data as soon as it is available on Amazon S3.
Sample Data for the Data Lake
The data used in this post is from the TICKIT sample database. The TICKIT database represents the backend data store for a platform that brings buyers and sellers of tickets to entertainment events together. It was initially designed to demonstrate Amazon Redshift. The TICKIT database is a small database with approximately 425K rows of data across seven tables: Category, Event, Venue, User, Listing, Sale, and Date.
A data lake most often contains data from multiple sources, each with different storage formats, protocols, and connection methods. To simulate these data sources, I have separated the TICKIT database tables to represent three typical enterprise systems, including a commercial off-the-shelf (COTS) E-commerce platform, a custom Customer Relationship Management (CRM) platform, and a SaaS-based Event Management System (EMS). Each simulated system uses a different Amazon RDS database engine, including MySQL, PostgreSQL, and SQL Server.

Enabling CDC for Amazon RDS
To use Debezium for CDC with Amazon RDS databases, minor changes to the default database configuration for each database engine are required.
CDC for PostgreSQL
Debezium has detailed instructions regarding configuring CDC for Amazon RDS for PostgreSQL. Database parameters specify how the database is configured. According to the Debezium documentation, for PostgreSQL, set the instance parameter rds.logical_replication
to 1
and verify that the wal_level
parameter is set to logical. It is automatically changed when the rds.logical_replication
parameter is set to 1
. This parameter is adjusted using an Amazon RDS custom parameter group. According to the AWS documentation, “with Amazon RDS, you manage database configuration by associating your DB instances and Multi-AZ DB clusters with parameter groups. Amazon RDS defines parameter groups with default settings. You can also define your own parameter groups with customized settings.”

CDC for MySQL
Similarly, Debezium has detailed instructions regarding configuring CDC for MySQL. Like PostgreSQL, MySQL requires a custom DB parameter group.



CDC for SQL Server
Lastly, Debezium has detailed instructions for configuring CDC with Microsoft SQL Server. Enabling CDC requires enabling CDC on the SQL Server database and table(s) and creating a new filegroup and associated file. Debezium recommends locating change tables in a different filegroup than you use for source tables. CDC is only supported with Microsoft SQL Server Standard Edition and higher; Express and Web Editions are not supported.
Kafka Connect Source Connectors
There is a Kafka Connect source connector for each of the three Amazon RDS databases, all of which use Debezium. CDC is performed, moving changes from the three databases into separate Kafka topics in Apache Avro format using the source connectors. The connector configuration is nearly identical whether you are using Amazon MSK Connect or a self-managed version of Kafka Connect.

As shown above, I am using the UI for Apache Kafka by Provectus, self-managed on Amazon EKS, in this post.
PostgreSQL Source Connector
The source_connector_postgres_kafka_avro_tickit
source connector captures all changes to the three tables in the Amazon RDS for PostgreSQL ticket
database’s ems
schema: category
, event
, and venue
. These changes are written to three corresponding Kafka topics as Avro-format messages: tickit.ems.category
, tickit.ems.event
, and tickit.ems.venue
. The messages are transformed by the connector using Debezium’s unwrap
transform. Schemas for the messages are written to Apicurio Registry.
MySQL Source Connector
The source_connector_mysql_kafka_avro_tickit
source connector captures all changes to the three tables in the Amazon RDS for PostgreSQL ecomm
database: date
, listing
, and sale
. These changes are written to three corresponding Kafka topics as Avro-format messages: tickit.ecomm.date
, tickit.ecomm.listing
, and tickit.ecomm.sale
.
SQL Server Source Connector
Lastly, the source_connector_mssql_kafka_avro_tickit
source connector captures all changes to the single user
table in the Amazon RDS for SQL Server ticket
database’s crm
schema. These changes are written to a corresponding Kafka topic as Avro-format messages: tickit.crm.user
.
If using a self-managed version of Kafka Connect, we can deploy, manage, and monitor the source and sink connectors using Kafka Connect’s RESTful API (Connect API).

Once the three source connectors are running, we should see seven new Kafka topics corresponding to the seven TICKIT database tables from the three Amazon RDS database sources. There will be approximately 425K messages consuming 147 MB of space in Avro’s binary format.

Avro’s binary format and the use of a separate schema ensure the messages consume minimal Kafka storage space. For example, the average message Value (payload) size in the tickit.ecomm.sale
topic is a minuscule 98 Bytes. Resource management is critical when you are dealing with hundreds of millions or often billions of daily Kafka messages as a result of the CDC process.

From within the Apicurio Registry UI, we should now see Value schema artifacts for each of the seven Kafka topics.

Also from within the Apicurio Registry UI, we can examine details of individual Value schema artifacts.

Kafka Connect Sink Connector
In addition to the three source connectors, there is a single Kafka Connect sink connector, sink_connector_kafka_s3_avro_tickit
. The sink connector copies messages from the seven Kafka topics, all prefixed with tickit
, to the Bronze area of our Amazon S3-based data lake.

The connector uses Confluent’s S3 Sink Connector (S3SinkConnector
). Like Kafka, the connector writes all changes to Amazon S3 in Apache Avro format, with the schemas already stored in Apicurio Registry.
Once the sink connector and the three source connectors are running with Kafka Connect, we should see a series of seven subdirectories within the Bronze area of the data lake.

Confluent offers a choice of data partitioning strategies. The sink connector we have deployed uses the Daily Partitioner. According to the documentation, the io.confluent.connect.storage.partitioner.DailyPartitioner
is equivalent to the TimeBasedPartitioner
with path.format='year'=YYYY/'month'=MM/'day'=dd
and partition.duration.ms=86400000
(one day for one S3 object in each daily directory). Below is an example of Avro files (S3 objects) containing changes to the sale
table. The objects are partitioned by the year
, month
, and day
they were written to the S3 bucket.

Message Transformation
Previously, while discussing the Kafka Connect source connectors, I mentioned that the connector transforms the messages using the unwrap
transform. By default, the changes picked up by Debezium during the CDC process contain several additional Debezium-related fields of data. An example of an untransformed message generated by Debezium from the PostgreSQL sale
table is shown below.
When the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a consistent snapshot of each database schema. All existing records are captured. Unlike an UPDATE
("op" : "u"
) or a DELETE
("op" : "d"
), these initial snapshot messages, like the one shown below, represent a READ
operation ("op" : "r"
). As a result, there is no before
data only after
.
According to Debezium’s documentation, “Debezium provides a single message transformation [SMT] that crosses the bridge between the complex and simple formats, the UnwrapFromEnvelope SMT.” Below, we see the results of Debezium’s unwrap
transform of the same message. The unwrap
transform flattens the nested JSON structure of the original message, adds the __deleted
field, and includes fields based on the list included in the source connector’s configuration. These fields are prefixed with a double underscore (e.g., __table
).
In the next section, we examine Apache Hudi will manage the data lake. An example of the same message, managed with Hudi, is shown below. Note the five additional Hudi data fields, prefixed with _hoodie_
.
Apache Hudi
With database changes flowing into the Bronze area of our data lake, we are ready to use Apache Hudi to provide data lake capabilities such as ACID transactional guarantees, time travel, rollback, and schema evolution. Using Hudi allows us to store and perform analytics on a specific time-bound version of the data in the data lake. Without Hudi, a query for a single row of data could return multiple results, such as an initial CREATE
record, multiple UPDATE
records, and a DELETE
record.
DeltaStreamer
Using Hudi’s DeltaStreamer, we will continuously stream changes from the Bronze area of the data lake to a Silver area, which Hudi manages. We will run DeltaStreamer continuously, similar to an Apache Spark Structured Streaming job, using Apache Spark on Amazon EMR. Running DeltaStreamer requires using a spark-submit
command that references a series of configuration files. There is a common base set of configuration items and a group of configuration items specific to each database table. Below, we see the base configuration, base.properties
.
The base configuration is referenced by each of the table-specific configuration files. For example, below, we see the tickit.ecomm.sale.properties
configuration file for DeltaStreamer.
To run DeltaStreamer, you can submit an EMR Step or use EMR’s master node to run a spark-submit
command on the cluster. Below, we see an example of the DeltaStreamer spark-submit
command using the Merge on Read (MoR) Hudi table type.
Next, we see the same example of the DeltaStreamer spark-submit
command using the Copy on Write (CoW) Hudi table type.
For this post’s demonstration, we will run a single-table DeltaStreamer Spark job, with one process for each table using CoW. Alternately, Hudi offers HoodieMultiTableDeltaStreamer
, a wrapper on top of HoodieDeltaStreamer
, which enables the ingestion of multiple tables at a single time into Hudi datasets. Currently, HoodieMultiTableDeltaStreamer
only supports sequential processing of tables to be ingested and COPY_ON_WRITE
storage type.
Below, we see an example of three DeltaStreamer Spark jobs running continuously for the date
, listing
, and sale
tables.

Once DeltaStreamer is up and running, we should see a series of subdirectories within the Silver area of the data lake, all managed by Apache Hudi.

Within each subdirectory, partitioned by the table name, is a series of Apache Parquet files, along with other Apache Hudi-specific files. The specific folder structure and files depend on MoR or Cow.

AWS Glue Data Catalog
For this post, we are using an AWS Glue Data Catalog, an Apache Hive-compatible metastore, to persist technical metadata stored in the Silver area of the data lake, managed by Apache Hudi. The AWS Glue Data Catalog database, tickit_cdc_hudi
, will be automatically created the first time DeltaStreamer runs.
Using DeltaStreamer with the table type of MERGE_ON_READ
, there would be two tables in the AWS Glue Data Catalog database for each original table. According to Amazon’s EMR documentation, “Merge on Read (MoR) — Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files.” Hudi creates two tables in the Hive metastore for MoR, a table with the name that you specified, which is a read-optimized view (appended with _ro
), and a table with the same name appended with _rt
, which is a real-time view. You can query both tables.

According to Amazon’s EMR documentation, “Copy on Write (CoW) — Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write. CoW is the default storage type.” Using COPY_ON_WRITE
with DeltaStreamer, there is only a single Hudi table in the AWS Glue Data Catalog database for each corresponding database table.

Examining an individual table in the AWS Glue Data Catalog database, we can see details like the table schema, location of underlying data in S3, input/output formats, Serde serialization library, and table partitions.

Database Changes and Data Lake
We are using Kafka Connect, Apache Hudi, and Hudi’s DeltaStreamer to maintain data parity between our databases and the data lake. Let’s look at an example of how a simple data change is propagated from a database to Kafka, then to the Bronze area of the data lake, and finally to the Hudi-managed Silver area of the data lake, written using the CoW table format.
First, we will make a simple update to a single record in the MySQL database’s sale
table with the salesid
= 200
.
Almost immediately, we can see the change picked up by the Kafka Connect Debezium MySQL source connector.

Almost immediately, in the Bronze area of the data lake, we can see we have a new Avro-formatted file (S3 object) containing the updated record in the partitioned sale
subdirectory.

If we examine the new object in the Bronze area of the data lake, we will see a row representing the updated record with the salesid
= 200
. Note how the operation is now an UPDATE
versus a READ
("op" : "u"
).
Next, in the corresponding Silver area of the data lake, managed by Hudi, we should also see a new Parquet file that contains a record of the change. In this example, the file was written approximately 26 seconds after the original change to the database. This is the end-to-end time from database change to when the updated data is queryable in the data lake.

Similarly, if we examine the new object in the Silver area of the data lake, we will see a row representing the updated record with the salesid
= 200
. The record was committed approximately 15 seconds after the original database change.
Querying Hudi Tables with Amazon EMR
Using an EMR Notebook, we can query the updated database record stored in Amazon S3 using Hudi’s CoW table format. First, running a simple Spark SQL query for the record with salesid
= 200
, returns the latest record, reflecting the changes as indicated by the UPDATE
operation value (u
) and the _hoodie_commit_time
= 2023-02-27 03:17:13.915
UTC.

Hudi Time Travel Query
We can also run a Hudi Time Travel Query, with an as.of.instance
set to some arbitrary point in the future. Again, the latest record is returned, reflecting the changes as indicated by the UPDATE
operation value (u
) and the _hoodie_commit_time
= 2023-02-27 03:17:13.915
UTC.

We can run the same Hudi Time Travel Query with an as.of.instance
set to on or after the original records were written by DeltaStreamer (_hoodie_commit_time
= 2023-02-27 03:13:50.642
), but some arbitrary time before the updated record was written (_hoodie_commit_time
= 2023-02-27 03:17:13.915
). This time, the original record is returned as indicated by the READ
operation value (r
) and the _hoodie_commit_time
= 2023-02-27 03:13:50.642
. This is one of the strengths of Apache Hudi, the ability to query data in the present or at any point in the past.

Hudi Point in Time Query
We can also run a Hudi Point in Time Query, with a time range set from the beginning of time until some arbitrary point in the future. Again, the latest record is returned, reflecting the changes as indicated by the UPDATE
operation value (u
) and the _hoodie_commit_time
= 2023-02-27 03:17:13.915
.

We can run the same Hudi Point in Time Query but change the time range to two arbitrary time values, both before the updated record was written (_hoodie_commit_time
= 2023-02-26 22:39:07.203
). This time, the original record is returned as indicated by the READ
operation value (r
) and the _hoodie_commit_time
of 2023-02-27 03:13:50.642
. Again, this is one of the strengths of Apache Hudi, the ability to query data in the present or at any point in the past.

Querying Hudi Tables with Amazon Athena
We can also use Amazon Athena and run a SQL query against the AWS Glue Data Catalog database’s sale
table to view the updated database record stored in Amazon S3 using Hudi’s CoW table format. The operation (op
) column’s value now indicates an UPDATE
(u
).

How are Deletes Handled?
In this post’s architecture, deleted database records are signified with an operation ("op"
) field value of "d"
for a DELETE
and a __deleted
field value of true
.
https://itnext.io/media/f7c4ad62dcd45543fdf63834814a5aab
Back to our previous Jupyter notebook example, when rerunning the Spark SQL query, the latest record is returned, reflecting the changes as indicated by the DELETE
operation value (d
) and the _hoodie_commit_time
= 2023-02-27 03:52:13.689
.

Alternatively, we could use an additional Spark SQL filter statement to prevent deleted records from being returned (e.g., df.__op != "d"
).

Since we only did what is referred to as a soft delete in Hudi terminology, we can run a time travel query with an as.of.instance
set to some arbitrary time before the deleted record was written (_hoodie_commit_time
= 2023-02-27 03:52:13.689
). This time, the original record is returned as indicated by the READ
operation value (r
) and the _hoodie_commit_time
= 2023-02-27 03:13:50.642
. We could also use a later as.of.instance
to return the version of the record reflecting the the UPDATE
operation value (u
). This also applies to other query types such as point-in-time queries.

Conclusion
In this post, we learned how to build a near real-time transactional data lake on AWS using one possible architecture. The data lake was built using a combination of open source software (OSS) and fully-managed AWS services. Red Hat’s Debezium, Apache Kafka, and Kafka Connect were used for change data capture (CDC). In addition, Apache Spark, Apache Hudi, and Hudi’s DeltaStreamer were used to manage the data lake. To complete our architecture, we used several fully-managed AWS services, including Amazon RDS, Amazon MKS, Amazon EKS, AWS Glue, and Amazon EMR.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Serverless Analytics on AWS: Getting Started with Amazon EMR Serverless and Amazon MSK Serverless
Utilizing the recently released Amazon EMR Serverless and Amazon MSK Serverless for batch and streaming analytics with Apache Spark and Apache Kafka
Introduction
Amazon EMR Serverless
AWS recently announced the general availability (GA) of Amazon EMR Serverless on June 1, 2022. EMR Serverless is a new serverless deployment option in Amazon EMR, in addition to EMR on EC2, EMR on EKS, and EMR on AWS Outposts. EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. According to AWS, with EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks.
Amazon MSK Serverless
Similarly, on April 28, 2022, AWS announced the general availability of Amazon MSK Serverless. According to AWS, Amazon MSK Serverless is a cluster type for Amazon MSK that makes it easy to run Apache Kafka without managing and scaling cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources, so you can use Apache Kafka on demand and only pay for the data you stream and retain.
Serverless Analytics
In the following post, we will learn how to use these two new, powerful, cost-effective, and easy-to-operate serverless technologies to perform batch and streaming analytics. The PySpark examples used in this post are similar to those featured in two earlier posts, which featured non-serverless alternatives Amazon EMR on EC2 and Amazon MSK: Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR and Stream Processing with Apache Spark, Kafka, Avro, and Apicurio Registry on AWS using Amazon MSK and EMR.
Source Code
All the source code demonstrated in this post is open-source and available on GitHub.
git clone --depth 1 -b main \
https://github.com/garystafford/emr-msk-serverless-demo.git
Architecture
The post’s high-level architecture consists of an Amazon EMR Serverless Application, Amazon MSK Serverless Cluster, and Amazon EC2 Kafka client instance. To support these three resources, we will need two Amazon Virtual Private Clouds (VPCs), a minimum of three subnets, an AWS Internet Gateway (IGW) or equivalent, an Amazon S3 Bucket, multiple AWS Identity and Access Management (IAM) Roles and Policies, Security Groups, and Route Tables, and a VPC Gateway Endpoint for S3. All resources are constrained to a single AWS account and a single AWS Region, us-east-1
.

Prerequisites
As a prerequisite for this post, you will need to create the following resources:
- (1) Amazon EMR Serverless Application;
- (1) Amazon MSK Serverless Cluster;
- (1) Amazon S3 Bucket;
- (1) VPC Endpoint for S3;
- (3) Apache Kafka topics;
- PySpark applications, related JAR dependencies, and sample data files uploaded to Amazon S3 Bucket;
Let’s walk through each of these prerequisites.
Amazon EMR Serverless Application
Before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon EMR Serverless, especially, What is Amazon EMR Serverless? Create a new EMR Serverless Application by following the AWS documentation, Getting started with Amazon EMR Serverless. The creation of the EMR Serverless Application includes the following resources:
- Amazon S3 bucket for storage of Spark resources;
- Amazon VPC with at least two private subnets and associated Security Group(s);
- EMR Serverless runtime AWS IAM Role and associated IAM Policy;
- Amazon EMR Serverless Application;
For this post, use the latest version of EMR available in the EMR Studio Serverless Application console, the newly released version 6.7.0, to create a Spark application.

Keep the default pre-initialized capacity, application limits, and application behavior settings.

Since we are connecting to MSK Serverless from EMR Serverless, we need to configure VPC access. Select the new VPC and at least two private subnets in different Availability Zones (AZs).

According to the documentation, the subnets selected for EMR Serverless must be private subnets. The associated route tables for the subnets should not contain direct routes to the Internet.


Amazon MSK Serverless Cluster
Similarly, before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon MSK Serverless, especially MSK Serverless. Create a new MSK Serverless Cluster by following the AWS documentation, Getting started using MSK Serverless clusters. The creation of the MSK Serverless Cluster includes the following resources:
- AWS IAM Role and associated IAM Policy for the Amazon EC2 Kafka client instance;
- VPC with at least one public subnet and associated Security Group(s);
- Amazon EC2 instance used as Apache Kafka client, provisioned in the public subnet of the above VPC;
- Amazon MSK Serverless Cluster;

Associate the new MSK Serverless Cluster with the EMR Serverless Application’s VPC and two private subnets. Also, associate the cluster with the EC2-based Kafka client instance’s VPC and its public subnet.


According to the AWS documentation, Amazon MSK does not support all AZs. For example, I tried to use a subnet in us-east-1e
threw an error. If this happens, choose an alternative AZ.


VPC Endpoint for S3
To access the Spark resource in Amazon S3 from EMR Serverless running in the two private subnets, we need a VPC Endpoint for S3. Specifically, a Gateway Endpoint, which sends traffic to Amazon S3 or DynamoDB using private IP addresses. A gateway endpoint for Amazon S3 enables you to use private IP addresses to access Amazon S3 without exposure to the public Internet. EMR Serverless does not require public IP addresses, and you don’t need an internet gateway (IGW), a NAT device, or a virtual private gateway in your VPC to connect to S3.

Create the VPC Endpoint for S3 (Gateway Endpoint) and add the route table for the two EMR Serverless private subnets. You can add additional routes to that route table, such as VPC peering connections to data sources such as Amazon Redshift or Amazon RDS. However, do not add routes that provide direct Internet access.

Kafka Topics and Sample Messages
Once the MSK Serverless Cluster and EC2-based Kafka client instance are provisioned and running, create the three required Kafka topics using the EC2-based Kafka client instance. I recommend using AWS Systems Manager Session Manager to connect to the client instance as the ec2-user
user. Session Manager provides secure and auditable node management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys. Alternatively, you can SSH into the client instance.

Before creating the topics, use a utility like telnet
to confirm connectivity between the Kafka client and the MSK Serverless Cluster. Verifying connectivity will save you a lot of frustration with potential security and networking issues.
With MSK Serverless Cluster connectivity confirmed, create the three Kafka topics: topicA
, topicB
, and topicC
. I am using the default partitioning and replication settings from the AWS Getting Started Tutorial.
To create some quick sample data, we will copy and paste 250 messages from a file included in the GitHub project, sample_data/sales_messages.txt
, into topicA
. The messages are simple mock sales transactions.
Use the kafka-console-producer
Shell script to publish the messages to the Kafka topic. Use the kafka-console-consumer
Shell script to validate the messages made it to the topic by consuming a few messages.
The output should look similar to the following example.

Spark Resources in Amazon S3
To submit and run the five Spark Jobs included in the project, you will need to copy the following resources to your Amazon S3 bucket: (5) Apache Spark jobs, (5) related JAR dependencies, and (2) sample data files.
PySpark Applications
To start, copy the five PySpark applications to a scripts/
subdirectory within your Amazon S3 bucket.

Sample Data
Next, copy the two sample data files to a sample_data/
subdirectory within your Amazon S3 bucket. The large file contains 2,000 messages, while the small file contains 600 messages. These two files can be used interchangeably with the post’s final streaming example.

PySpark Dependencies
Lastly, the PySpark applications have a handful of JAR dependencies that must be available when the job runs, which are not on the EMR Serverless classpath by default. If you are unsure which JARs are already on the EMR Serverless classpath, you can check the Spark UI’s Environment tab’s Classpath Entries section. Accessing the Spark UI is demonstrated in the first PySpark application example, below.

It is critical to choose the correct version of each JAR dependency based on the version of libraries used with the EMR and MSK. Using the wrong version or inconsistent versions, especially Scala, can result in job failures. Specifically, we are targeting Spark 3.2.1 and Scala 2.12 (EMR v6.7.0: Amazon’s Spark 3.2.1, Scala 2.12.15, Amazon Corretto 8 version of OpenJDK), and Apache Kafka 2.8.1 (MSK Serverless: Kafka 2.8.1).
Download the seven JAR files locally, then copy them to a jars/
subdirectory within your Amazon S3 bucket.

PySpark Applications Examples
With the EMR Serverless Application, MSK Serverless Cluster, Kafka topics, and sample data created, and the Spark resources uploaded to Amazon S3, we are ready to explore four different Spark examples.
Example 1: Kafka Batch Aggregation to the Console
The first PySpark application, 01_example_console.py
, reads the same 250 sample sales messages from topicA
you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to the console (stdout).
There are no hard-coded values in any of the PySpark application examples. All required environment-specific variables, such as your MSK Serverless bootstrap server (host and port) and Amazon S3 bucket name, will be passed to the running Spark jobs as arguments from the spark-submit
command.
To submit your first PySpark job to the EMR Serverless Application, use the emr-serverless
API from the AWS CLI. You will need (4) values: 1) your EMR Serverless Application’s application-id
, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.
Switching to the EMR Serverless Application console, you should see the new Spark job you just submitted in one of several job states.

You can click on the Spark job to get more details. Note the Script arguments and Spark properties passed in from the spark-submit
command.

From the Spark job details tab, access the Spark UI, aka Spark Web UI, from a button in the upper right corner of the screen. If you have experience with Spark, you are most likely familiar with the Spark Web UI to monitor and tune Spark jobs.

From the initial screen, the Spark History Server tab, click on the App ID. You can access an enormous amount of Spark-related information about your job and EMR environment from the Spark Web UI.



The Executors tab will give you access to the Spark job’s output. The output we are most interested in is the driver
executor’s stderr
and stdout
(first row of the second table, shown below).

The stderr
contains output related to the running Spark job. Below we see an example of Kafka consumer configuration values output to stderr
. Several of these values were passed in from the Spark job, including items such as kafka.bootstrap.servers
, security.protocol
, sasl.mechanism
, and sasl.jaas.config
.

The stdout
from the driver
executor contains the console output as directed from the Spark job. Below we see the successfully aggregated results of the first Spark job, output to stdout
.
Example 2: Kafka Batch Aggregation to CSV in S3
Although the console is useful for development and debugging, it is typically not used in Production. Instead, Spark typically sends results to S3 as CSV, JSON, Parquet, or Arvo formatted files, to Kafka, to a database, or to an API endpoint. The second PySpark application, 02_example_csv_s3.py
, reads the same 250 sample sales messages from topicA
you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a CSV file in Amazon S3.
To submit your second PySpark job to the EMR Serverless Application, use the emr-serverless
API from the AWS CLI. Similar to the first example, you will need (4) values: 1) your EMR Serverless Application’s application-id
, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.
If successful, the Spark job should create a single CSV file in the designated Amazon S3 key (directory path) and an empty _SUCCESS
indicator file. The presence of an empty _SUCCESS
file signifies that the save()
operation completed normally.

Below we see the expected pipe-delimited output from the second Spark job.
Example 3: Kafka Batch Aggregation to Kafka
The third PySpark application, 03_example_kafka.py
, reads the same 250 sample sales messages from topicA
you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a second Kafka topic, topicB
. This job now has both read and write options.
To submit your next PySpark job to the EMR Serverless Application, use the emr-serverless
API from the AWS CLI. Similar to the first two examples, you will need (4) values: 1) your EMR Serverless Application’s application-id
, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.
Once the job completes, you can confirm the results by returning to your EC2-based Kafka client. Use the same kafka-console-consumer
command you used previously to show messages from topicB
.
If the Spark job and the Kafka client command worked successfully, you should see aggregated messages similar to the example output below. Note we are not using keys with the Kafka messages, only values for these simple examples.

Example 4: Spark Structured Streaming
For our final example, we will switch from batch to streaming — from read
to readstream
and from write
to writestream
. Before continuing, I suggest reading the Structured Streaming Programming Guide.
In this example, we will demonstrate how to continuously measure a common business metric — real-time sales volumes. Imagine you are sell products globally and want to understand the relationship between the time of day and buying patterns in different geographic regions in real-time. For any given window of time — this 15-minute period, this hour, this day, or this week— you want to know the current sales volumes by country. You are not reviewing previous sales periods or examing running sales totals, but real-time sales during a sliding time window.
We will use two PySpark jobs running concurrently to simulate this metric. The first application, 04_stream_sales_to_kafka.py
, simulates streaming data by continuously writing messages to topicC
— 2,000 messages with a 0.5-second delay between messages. In my tests, the job ran for ~28–29 minutes.
Simultaneously, the PySpark application, 05_streaming_kafka.py
, continuously consumes the sales transaction messages from the same topic, topicC
. Then, Spark aggregates messages over a sliding event-time window and writes the results to the console.
To submit the two PySpark jobs to the EMR Serverless Application, use the emr-serverless
API from the AWS CLI. Again, you will need (4) values: 1) your EMR Serverless Application’s application-id
, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.
Switching to the EMR Serverless Application console, you should see both Spark jobs you just submitted in one of several job states.

Using the Spark UI again, we can review the output from the second job, 05_streaming_kafka.py
.

With Spark Structured Streaming jobs, we have an extra tab in the Spark UI, Structured Streaming. This tab displays all running jobs with their latest [micro]batch number, the aggregate rate of data arriving, and the aggregate rate at which Spark is processing data. Unfortunately, with MSK Serverless, AWS doesn’t appear to allow access to the detailed streaming query statistics via the Run ID, which greatly reduces its value. You receive a 502 error when clicking on the Run ID hyperlink.

The output we are most interested in, again, is contained in the driver
executor’s stderr
and stdout
(first row of the second table, shown below).

Below we see sample output from stderr
. The output shows the results of a micro-batch. According to the Apache Spark documentation, internally, by default, Structured Streaming queries are processed using a micro-batch processing engine. The engine processes data streams as a series of small batch jobs, achieving end-to-end latencies as low as 100ms and exactly-once fault-tolerance guarantees.
The corresponding output to the micro-batch output above is shown below. We see the initial micro-batch results, starting with the first micro-batch before any messages are streamed to topicC
.
If you are familiar with Spark Structured Streaming, you are likely aware that these Spark jobs run continuously. In other words, the streaming jobs will not stop; they continually await more streaming data.

The first job, 04_stream_sales_to_kafka.py
, will run for ~28–29 minutes and stop with a status of Sucess
. However, the second job, 05_streaming_kafka.py
, the Spark Structured Streaming job, must be manually canceled.

Cleaning Up
You can delete your resources from the AWS Management Console or AWS CLI. However, to delete your Amazon S3 bucket, all objects (including all object versions and delete markers) in the bucket must be deleted before the bucket itself can be deleted.
Conclusion
In this post, we discovered how easy it is to adopt a serverless approach to Analytics on AWS. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks. With MSK Serverless, you can use Apache Kafka on demand and pay for the data you stream and retain. In addition, MSK Serverless automatically provisions and scales compute and storage resources. Given suitable analytics use cases, EMR Serverless with MSK Serverless will likely save you time, effort, and expense.
This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.
Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena
Posted by Gary A. Stafford in Analytics, AWS, Big Data, Cloud, Java Development, Software Development on June 26, 2022
Learn how to develop Cloud-native, RESTful Java services that query data in an AWS-based data lake using Amazon Athena’s API
Introduction
AWS provides a collection of fully-managed services that makes building and managing secure data lakes faster and easier, including AWS Lake Formation, AWS Glue, and Amazon S3. Additional analytics services such as Amazon EMR, AWS Glue Studio, and Amazon Redshift allow Data Scientists and Analysts to run high-performance queries on large volumes of semi-structured and structured data quickly and economically.
What is not always as obvious is how teams develop internal and external customer-facing analytics applications built on top of data lakes. For example, imagine sellers on an eCommerce platform, the scenario used in this post, want to make better marketing decisions regarding their products by analyzing sales trends and buyer preferences. Further, suppose the data required for the analysis must be aggregated from multiple systems and data sources; the ideal use case for a data lake.

salesbyseller
endpointIn this post, we will explore an example Java Spring Boot RESTful Web Service that allows end-users to query data stored in a data lake on AWS. The RESTful Web Service will access data stored as Apache Parquet in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena. The service will use Spring Boot and the AWS SDK for Java to expose a secure, RESTful Application Programming Interface (API).

Amazon Athena is a serverless, interactive query service based on Presto, used to query data and analyze big data in Amazon S3 using standard SQL. Using Athena functionality exposed by the AWS SDK for Java and Athena API, the Spring Boot service will demonstrate how to access tables, views, prepared statements, and saved queries (aka named queries).

TL;DR
Do you want to explore the source code for this post’s Spring Boot service or deploy it to Kubernetes before reading the full article? All the source code, Docker, and Kubernetes resources are open-source and available on GitHub.
git clone --depth 1 -b main \ https://github.com/garystafford/athena-spring-app.git
A Docker image for the Spring Boot service is also available on Docker Hub.

Data Lake Data Source
There are endless data sources to build a demonstration data lake on AWS. This post uses the TICKIT sample database provided by AWS and designed for Amazon Redshift, AWS’s cloud data warehousing service. The database consists of seven tables. Two previous posts and associated videos, Building a Data Lake on AWS with Apache Airflow and Building a Data Lake on AWS, detail the setup of the data lake used in this post using AWS Glue and optionally Apache Airflow with Amazon MWAA.
Those two posts use the data lake pattern of segmenting data as bronze (aka raw), silver (aka refined), and gold (aka aggregated), popularized by Databricks. The data lake simulates a typical scenario where data originates from multiple sources, including an e-commerce platform, a CRM system, and a SaaS provider must be aggregated and analyzed.

Spring Projects with IntelliJ IDE
Although not a requirement, I used JetBrains IntelliJ IDEA 2022 (Ultimate Edition) to develop and test the post’s Spring Boot service. Bootstrapping Spring projects with IntelliJ is easy. Developers can quickly create a Spring project using the Spring Initializr plugin bundled with the IntelliJ.

The Spring Initializr plugin’s new project creation wizard is based on start.spring.io. The plugin allows you to quickly select the Spring dependencies you want to incorporate into your project.

Visual Studio Code
There are also several Spring extensions for the popular Visual Studio Code IDE, including Microsoft’s Spring Initializr Java Support extension.

Gradle
This post uses Gradle instead of Maven to develop, test, build, package, and deploy the Spring service. Based on the packages selected in the new project setup shown above, the Spring Initializr plugin’s new project creation wizard creates a build.gradle
file. Additional packages, such as Lombak, Micrometer, and Rest Assured, were added separately.
Amazon Corretto
The Spring boot service is developed for and compiled with the most recent version of Amazon Corretto 17. According to AWS, Amazon Corretto is a no-cost, multiplatform, production-ready distribution of the Open Java Development Kit (OpenJDK). Corretto comes with long-term support that includes performance enhancements and security fixes. Corretto is certified as compatible with the Java SE standard and is used internally at Amazon for many production services.
Source Code
Each API endpoint in the Spring Boot RESTful Web Service has a corresponding POJO data model class, service interface and service implementation class, and controller class. In addition, there are also common classes such as configuration, a client factory, and Athena-specific request/response methods. Lastly, there are additional class dependencies for views and prepared statements.

refined_tickit_public_category
tableThe project’s source code is arranged in a logical hierarchy by package and class type.
Amazon Athena Access
There are three standard methods for accessing Amazon Athena with the AWS SDK for Java: 1) the AthenaClient
service client, 2) the AthenaAsyncClient
service client for accessing Athena asynchronously, and 3) using the JDBC driver with the AWS SDK. The AthenaClient
and AthenaAsyncClient
service clients are both parts of the software.amazon.awssdk.services.athena
package. For simplicity, this post’s Spring Boot service uses the AthenaClient
service client instead of Java’s asynchronously programming model. AWS supplies basic code samples as part of their documentation as a starting point for writing Athena applications using the SDK. The code samples also use the AthenaClient
service client.
POJO-based Data Model Class
For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Plain Old Java Object (POJO). According to Wikipedia, a POGO is an ordinary Java object, not bound by any particular restriction. The POJO class is similar to a JPA Entity, representing persistent data stored in a relational database. In this case, the POJO uses Lombok’s @Data
annotation. According to the documentation, this annotation generates getters for all fields, a useful toString
method, and hashCode
and equals
implementations that check all non-transient fields. It also generates setters for all non-final fields and a constructor.
Each POJO corresponds directly to a ‘silver’ table in the AWS Glue Data Catalog. For example, the Event
POJO corresponds to the refined_tickit_public_event
table in the tickit_demo
Data Catalog database. The POJO defines the Spring Boot service’s data model for data read from the corresponding AWS Glue Data Catalog table.

The Glue Data Catalog table is the interface between the Athena query and the underlying data stored in Amazon S3 object storage. The Athena query targets the table, which returns the underlying data from S3.

Service Class
Retrieving data from the data lake via AWS Glue, using Athena, is handled by a service class. For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Service Interface and implementation class. The service implementation class uses Spring Framework’s @Service
annotation. According to the documentation, it indicates that an annotated class is a “Service,” initially defined by Domain-Driven Design (Evans, 2003) as “an operation offered as an interface that stands alone in the model, with no encapsulated state.” Most importantly for the Spring Boot service, this annotation serves as a specialization of @Component
, allowing for implementation classes to be autodetected through classpath scanning.
Using Spring’s common constructor-based Dependency Injection (DI) method (aka constructor injection), the service auto-wires an instance of the AthenaClientFactory
interface. Note that we are auto-wiring the service interface, not the service implementation, allowing us to wire in a different implementation if desired, such as for testing.
The service calls the AthenaClientFactory
class’s createClient()
method, which returns a connection to Amazon Athena using one of several available authentication methods. The authentication scheme will depend on where the service is deployed and how you want to securely connect to AWS. Some options include environment variables, local AWS profile, EC2 instance profile, or token from the web identity provider.
The service class transforms the payload returned by an instance of GetQueryResultsResponse
into an ordered collection (also known as a sequence), List<E>
, where E
represents a POJO. For example, with the data lake’srefined_tickit_public_event
table, the service returns a List<Event>
. This pattern repeats itself for tables, views, prepared statements, and named queries. Column data types can be transformed and formatted on the fly, new columns added, and existing columns skipped.
For each endpoint defined in the Controller class, for example, get()
, findAll()
, and FindById()
, there is a corresponding method in the Service class. Below, we see an example of the findAll()
method in the SalesByCategoryServiceImp
service class. This method corresponds to the identically named method in the SalesByCategoryController
controller class. Each of these service methods follows a similar pattern of constructing a dynamic Athena SQL query based on input parameters, which is passed to Athena through the AthenaClient
service client using an instance of GetQueryResultsRequest
.
Controller Class
Lastly, there is a corresponding Controller class for each API endpoint in the Spring Boot RESTful Web Service. The controller class uses Spring Framework’s @RestController
annotation. According to the documentation, this annotation is a convenience annotation that is itself annotated with @Controller
and @ResponseBody
. Types that carry this annotation are treated as controllers where @RequestMapping
methods assume @ResponseBody
semantics by default.
The controller class takes a dependency on the corresponding service class application component using constructor-based Dependency Injection (DI). Like the service example above, we are auto-wiring the service interface, not the service implementation.
The controller is responsible for serializing the ordered collection of POJOs into JSON and returning that JSON payload in the body of the HTTP response to the initial HTTP request.
Querying Views
In addition to querying AWS Glue Data Catalog tables (aka Athena tables), we also query views. According to the documentation, a view in Amazon Athena is a logical table, not a physical table. Therefore, the query that defines a view runs each time the view is referenced in a query.
For convenience, each time the Spring Boot service starts, the main AthenaApplication
class calls the View.java
class’s CreateView()
method to check for the existence of the view, view_tickit_sales_by_day_and_category
. If the view does not exist, it is created and becomes accessible to all application end-users. The view is queried through the service’s /salesbycategory
endpoint.

This confirm-or-create pattern is repeated for the prepared statement in the main AthenaApplication
class (detailed in the next section).
Below, we see the View
class called by the service at startup.
Aside from the fact the /salesbycategory
endpoint queries a view, everything else is identical to querying a table. This endpoint uses the same model-service-controller pattern.
Executing Prepared Statements
According to the documentation, you can use the Athena parameterized query feature to prepare statements for repeated execution of the same query with different query parameters. The prepared statement used by the service, tickit_sales_by_seller
, accepts a single parameter, the ID of the seller (sellerid
). The prepared statement is executed using the /salesbyseller
endpoint. This scenario simulates an end-user of the analytics application who wants to retrieve enriched sales information about their sales.
The pattern of querying data is similar to tables and views, except instead of using the common SELECT...FROM...WHERE
SQL query pattern, we use the EXECUTE...USING
pattern.
For example, to execute the prepared statement for a seller with an ID of 3, we would use EXECUTE tickit_sales_by_seller USING 3;
. We pass the seller’s ID of 3 as a path parameter similar to other endpoints exposed by the service: /v1/salesbyseller/3
.

Again, aside from the fact the /salesbyseller
endpoint executes a prepared statement and passes a parameter; everything else is identical to querying a table or a view, using the same model-service-controller pattern.
Working with Named Queries
In addition to tables, views, and prepared statements, Athena has the concept of saved queries, referred to as named queries in the Athena API and when using AWS CloudFormation. You can use the Athena console or API to save, edit, run, rename, and delete queries. The queries are persisted using a NamedQueryId
, a unique identifier (UUID) of the query. You must reference the NamedQueryId
when working with existing named queries.

There are multiple ways to use and reuse existing named queries programmatically. For this demonstration, I created the named query, buyer_likes_by_category
, in advance and then stored the resulting NamedQueryId
as an application property, injected at runtime or kubernetes deployment time through a local environment variable.