Posts Tagged Analytics

Building Data Lakes on AWS with Kafka Connect, Debezium, Apicurio Registry, and Apache Hudi

Learn how to build a near real-time transactional data lake on AWS using a combination of Open Source Software (OSS) and AWS Services

Introduction

In the following post, we will explore one possible architecture for building a near real-time transactional data lake on AWS. The data lake will be built using a combination of open source software (OSS) and fully-managed AWS services. Red Hat’s Debezium, Apache Kafka, and Kafka Connect will be used for change data capture (CDC). In addition, Apache Spark, Apache Hudi, and Hudi’s DeltaStreamer will be used to manage the data lake. To complete our architecture, we will use several fully-managed AWS services, including Amazon RDS, Amazon MKS, Amazon EKS, AWS Glue, and Amazon EMR.

The data lake architecture used in this post’s demonstration

Source Code

The source code, configuration files, and a list of commands shown in this post are open-sourced and available on GitHub.

Kafka

According to the Apache Kafka documentation, “Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.” For this post, we will use Apache Kafka as the core of our change data capture (CDC) process. According to Wikipedia, “change data capture (CDC) is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. CDC is an approach to data integration that is based on the identification, capture, and delivery of the changes made to enterprise data sources.” We will discuss CDC in greater detail later in the post.

There are several options for Apache Kafka on AWS. We will use AWS’s fully-managed Amazon Managed Streaming for Apache Kafka (Amazon MSK) service. Alternatively, you could choose industry-leading SaaS providers, such as ConfluentAivenRedpanda, or Instaclustr. Lastly, you could choose to self-manage Apache Kafka on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS).

Kafka Connect

According to the Apache Kafka documentation, “Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency.

There are multiple options for Kafka Connect on AWS. You can use AWS’s fully-managed, serverless Amazon MSK Connect. Alternatively, you could choose a SaaS provider or self-manage Kafka Connect yourself on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). I am not a huge fan of Amazon MSK Connect during development. In my opinion, iterating on the configuration for a new source and sink connector, especially with transforms and external registry dependencies, can be painfully slow and time-consuming with MSK Connect. I find it much faster to develop and fine-tune my sink and source connectors using a self-managed version of Kafka Connect. For Production workloads, you can easily port the configuration from the native Kafka Connect connector to MSK Connect. I am using a self-managed version of Kafka Connect for this post, running in Amazon EKS.

Example Production-ready architecture using Amazon services for CDC

Debezium

According to the Debezium documentation, “Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases.” Regarding Kafka Connect, according to the Debezium documentation, “Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS). Connectors record the history of data changes in the DBMS by detecting changes as they occur and streaming a record of each change event to a Kafka topic. Consuming applications can then read the resulting event records from the Kafka topic.

Source Connectors

We will use Kafka Connect along with three Debezium connectors, MySQL, PostgreSQL, and SQL Server, to connect to three corresponding Amazon Relational Database Service (RDS) databases and perform CDC. Changes from the three databases will be represented as messages in separate Kafka topics. In Kafka Connect terminology, these are referred to as Source Connectors. According to Confluent.io, a leader in the Kafka community, “source connectors ingest entire databases and stream table updates to Kafka topics.

Sink Connector

We will stream the data from the Kafka topics into Amazon S3 using a sink connector. Again, according to Confluent.io, “sink connectors deliver data from Kafka topics to secondary indexes, such as Elasticsearch, or batch systems such as Hadoop for offline analysis.” We will use Confluent’s Amazon S3 Sink Connector for Confluent Platform. We can use Confluent’s sink connector without depending on the entire Confluent platform.

There is also an option to use the Hudi Sink Connector for Kafka, which promises to greatly simplify the processes described in this post. However, the RFC for this Hudi feature appears to be stalled. Last updated in August 2021, the RFC is still in the initial “Under Discussion” phase. Therefore, I would not recommend the connectors use in Production until it is GA (General Availability) and gains broader community support.

Securing Database Credentials

Whether using Amazon MSK Connect or self-managed Kafka Connect, you should ensure your database, Kafka, and schema registry credentials, and other sensitive configuration values are secured. Both MSK Connect and self-managed Kafka Connect can integrate with configuration providers that implement the ConfigProvider class interface, such as AWS Secrets ManagerHashiCorp VaultMicrosoft Azure Key Vault, and Google Cloud Secrets Manager.

Example of database credentials safely stored AWS Secrets Manager

For self-managed Kafka Connect, I prefer Jeremy Custenborder’s kafka-config-provider-aws plugin. This plugin provides integration with AWS Secrets Manager. A complete list of Jeremy’s providers can be found on GitHub. Below is a snippet of the Secrets Manager configuration from the connect-distributed.properties files, which is read by Apache Kafka Connect at startup.

https://garystafford.medium.com/media/db6ff589c946fcd014546c56d0255fe5

Apache Avro

The message in the Kafka topic and corresponding objects in Amazon S3 will be stored in Apache Avro format by the CDC process. The Apache Avro documentation states, “Apache Avro is the leading serialization format for record data, and the first choice for streaming data pipelines.” Avro provides rich data structures and a compact, fast, binary data format.

Again, according to the Apache Avro documentation, “Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.” When Avro data is stored in a file, its schema can be stored with it so any program may process files later.

Alternatively, the schema can be stored separately in a schema registry. According to Apicurio Registry, “in the messaging and event streaming world, data that are published to topics and queues often must be serialized or validated using a Schema (e.g. Apache Avro, JSON Schema, or Google protocol buffers). Schemas can be packaged in each application, but it is often a better architectural pattern to instead register them in an external system [schema registry] and then referenced from each application.

Schema Registry

Several leading open-source and commercial schema registries exist, including Confluent Schema RegistryAWS Glue Schema Registry, and Red Hat’s open-source Apicurio Registry. In this post, we use a self-managed version of Apicurio Registry running on Amazon EKS. You can relatively easily substitute AWS Glue Schema Registry if you prefer a fully-managed AWS service.

Apicurio Registry

According to the documentation, Apicurio Registry supports adding, removing, and updating OpenAPI, AsyncAPI, GraphQL, Apache Avro, Google protocol buffers, JSON Schema, Kafka Connect schema, WSDL, and XML Schema (XSD) artifact types. Furthermore, content evolution can be controlled by enabling content rules, including validity and compatibility. Lastly, the registry can be configured to store data in various backend storage systems depending on the use case, including Kafka (e.g., Amazon MSK), PostgreSQL (e.g., Amazon RDS), and Infinispan (embedded).

Post’s architecture using Amazon MSK, self-managed Kafka Connect, and Apicurio Registry for CDC

Data Lake Table Formats

Three leading open-source, transactional data lake storage frameworks enable building data lake and data lakehouse architectures: Apache IcebergLinux Foundation Delta Lake, and Apache Hudi. They offer comparable features, such as ACID-compliant transactional guarantees, time travel, rollback, and schema evolution. Using any of these three data lake table formats will allow us to store and perform analytics on the latest version of the data in the data lake originating from the three Amazon RDS databases.

Apache Hudi

According to the Apache Hudi documentation, “Apache Hudi is a transactional data lake platform that brings database and data warehouse capabilities to the data lake.” The specifics of how the data is laid out as files in your data lake depends on the Hudi table type you choose, either Copy on Write (CoW) or Merge On Read (MoR).

Like Apache Iceberg and Delta Lake, Apache Hudi is partially supported by AWS’s Analytics services, including AWS GlueAmazon EMRAmazon AthenaAmazon Redshift Spectrum, and AWS Lake Formation. In general, CoW has broader support on AWS than MoR. It is important to understand the limitations of Apache Hudi with each AWS analytics service before choosing a table format.

If you are looking for a fully-managed Cloud data lake service built on Hudi, I recommend Onehouse. Born from the roots of Apache Hudi and founded by its original creator, Vinoth Chandar (PMC chair of the Apache Hudi project), the Onehouse product and its services leverage OSS Hudi to offer a data lake platform similar to what companies like Uber have built.

Hudi DeltaStreamer

Hudi also offers DeltaStreamer (aka HoodieDeltaStreamer) for streaming ingestion of CDC data. DeltaStreamer can be run once or continuously, using Apache Spark, similar to an Apache Spark Structured Streaming job, to capture changes to the raw CDC data and write that data to a different part of our data lake.

DeltaStreamer also works with Amazon S3 Event Notifications instead of running continuously. According to DeltaStreamer’s documentation, Amazon S3 object storage provides an event notification service to post notifications when certain events happen in your S3 bucket. AWS will put these events in Amazon Simple Queue Service (Amazon SQS). Apache Hudi provides an S3EventsSource that can read from Amazon SQS to trigger and process new or changed data as soon as it is available on Amazon S3.

Sample Data for the Data Lake

The data used in this post is from the TICKIT sample database. The TICKIT database represents the backend data store for a platform that brings buyers and sellers of tickets to entertainment events together. It was initially designed to demonstrate Amazon Redshift. The TICKIT database is a small database with approximately 425K rows of data across seven tables: Category, Event, Venue, User, Listing, Sale, and Date.

A data lake most often contains data from multiple sources, each with different storage formats, protocols, and connection methods. To simulate these data sources, I have separated the TICKIT database tables to represent three typical enterprise systems, including a commercial off-the-shelf (COTS) E-commerce platform, a custom Customer Relationship Management (CRM) platform, and a SaaS-based Event Management System (EMS). Each simulated system uses a different Amazon RDS database engine, including MySQL, PostgreSQL, and SQL Server.

Breakdown of TICKIT tables and database engines

Enabling CDC for Amazon RDS

To use Debezium for CDC with Amazon RDS databases, minor changes to the default database configuration for each database engine are required.

CDC for PostgreSQL

Debezium has detailed instructions regarding configuring CDC for Amazon RDS for PostgreSQL. Database parameters specify how the database is configured. According to the Debezium documentation, for PostgreSQL, set the instance parameter rds.logical_replication to 1 and verify that the wal_level parameter is set to logical. It is automatically changed when the rds.logical_replication parameter is set to 1. This parameter is adjusted using an Amazon RDS custom parameter group. According to the AWS documentation, “with Amazon RDS, you manage database configuration by associating your DB instances and Multi-AZ DB clusters with parameter groups. Amazon RDS defines parameter groups with default settings. You can also define your own parameter groups with customized settings.

Example Amazon RDS Parameter groups with CDC configuration

CDC for MySQL

Similarly, Debezium has detailed instructions regarding configuring CDC for MySQL. Like PostgreSQL, MySQL requires a custom DB parameter group.

Example Amazon RDS Parameter groups with CDC configuration
Example Amazon RDS Parameter groups with CDC configuration
Example Amazon RDS Parameter groups with CDC configuration

CDC for SQL Server

Lastly, Debezium has detailed instructions for configuring CDC with Microsoft SQL Server. Enabling CDC requires enabling CDC on the SQL Server database and table(s) and creating a new filegroup and associated file. Debezium recommends locating change tables in a different filegroup than you use for source tables. CDC is only supported with Microsoft SQL Server Standard Edition and higher; Express and Web Editions are not supported.

— enable the database for CDC
EXEC msdb.dbo.rds_cdc_enable_db 'tickit'
USE tickit
GO
SELECT * FROM sys.filegroups;
SELECT * FROM sys.database_files;
— add new filegroup to database
ALTER DATABASE tickit ADD FILEGROUP CDC_FG1;
— add new file to filegroup
ALTER DATABASE tickit
ADD FILE
(
NAME = cdc_data1,
FILENAME = 'D:\rdsdbdata\DATA\sqldba_data1.ndf',
SIZE = 500 MB,
FILEGROWTH = 50 MB
) TO FILEGROUP CDC_FG1;
— enabling cdc for a sql server table
EXEC sys.sp_cdc_enable_table
@source_schema = N'crm',
@source_name = N'user',
@role_name = N'admin',
@filegroup_name = N'CDC_FG1',
@supports_net_changes = 0
GO
view raw ms_sql_cdc.sql hosted with ❤ by GitHub

Kafka Connect Source Connectors

There is a Kafka Connect source connector for each of the three Amazon RDS databases, all of which use Debezium. CDC is performed, moving changes from the three databases into separate Kafka topics in Apache Avro format using the source connectors. The connector configuration is nearly identical whether you are using Amazon MSK Connect or a self-managed version of Kafka Connect.

UI for Apache Kafka showing the three default Kafka Connect topics

As shown above, I am using the UI for Apache Kafka by Provectus, self-managed on Amazon EKS, in this post.

PostgreSQL Source Connector

The source_connector_postgres_kafka_avro_tickit source connector captures all changes to the three tables in the Amazon RDS for PostgreSQL ticket database’s ems schema: categoryevent, and venue. These changes are written to three corresponding Kafka topics as Avro-format messages: tickit.ems.categorytickit.ems.event, and tickit.ems.venue. The messages are transformed by the connector using Debezium’s unwrap transform. Schemas for the messages are written to Apicurio Registry.

{
"name": "source_connector_postgres_kafka_avro_tickit",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "tickit",
"database.hostname": "${secretManager:demo/postgres:host}",
"database.password": "${secretManager:demo/postgres:password}",
"database.port": 5432,
"database.server.name": "tickit",
"database.user": "${secretManager:demo/postgres:username}",
"decimal.handling.mode": "double",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"plugin.name": "pgoutput",
"schema.name.adjustment.mode": "avro",
"table.include.list": "ems.category,ems.event,ems.venue",
"tasks.max": 1,
"topic.prefix": "tickit",
"transforms": "unwrap",
"transforms.unwrap.add.fields": "name,op,db,table,schema,lsn,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2"
}
}

MySQL Source Connector

The source_connector_mysql_kafka_avro_tickit source connector captures all changes to the three tables in the Amazon RDS for PostgreSQL ecomm database: datelisting, and sale. These changes are written to three corresponding Kafka topics as Avro-format messages: tickit.ecomm.datetickit.ecomm.listing, and tickit.ecomm.sale.

{
"name": "source_connector_mysql_kafka_avro_tickit",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "${secretManager:demo/mysql:host}",
"database.include.list": "ecomm",
"database.password": "${secretManager:demo/mysql:password}",
"database.port": 3306,
"database.server.id": 184054,
"database.user": "${secretManager:demo/mysql:username}",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"schema.name.adjustment.mode": "avro",
"schema.history.internal.kafka.bootstrap.servers": "${secretManager:demo/mysql:bootstrap.servers}",
"schema.history.internal.kafka.topic": "schemahistory.tickit",
"tasks.max": 1,
"topic.prefix": "tickit",
"transforms": "unwrap",
"transforms.unwrap.add.fields": "name,op,db,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2"
}
}

SQL Server Source Connector

Lastly, the source_connector_mssql_kafka_avro_tickit source connector captures all changes to the single user table in the Amazon RDS for SQL Server ticket database’s crm schema. These changes are written to a corresponding Kafka topic as Avro-format messages: tickit.crm.user.

{
"name": "source_connector_mssql_kafka_avro_tickit",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.encrypt": "false",
"database.hostname": "${secretManager:demo/mssql:host}",
"database.names": "tickit",
"database.password": "${secretManager:demo/mssql:password}",
"database.port": 1433,
"database.user": "${secretManager:demo/mssql:username}",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"schema.name.adjustment.mode": "avro",
"schema.history.internal.kafka.bootstrap.servers": "${secretManager:demo/mssql:bootstrap.servers}",
"schema.history.internal.kafka.topic": "schemahistory.tickit",
"table.include.list": "crm.user",
"tasks.max": 1,
"topic.prefix": "tickit",
"transforms": "unwrap",
"transforms.unwrap.add.fields": "name,op,db,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2"
}
}

If using a self-managed version of Kafka Connect, we can deploy, manage, and monitor the source and sink connectors using Kafka Connect’s RESTful API (Connect API).

Using Kafka Connect’s RESTful API to confirm all connectors are running

Once the three source connectors are running, we should see seven new Kafka topics corresponding to the seven TICKIT database tables from the three Amazon RDS database sources. There will be approximately 425K messages consuming 147 MB of space in Avro’s binary format.

Seven Kafka topics representing each table within the three databases

Avro’s binary format and the use of a separate schema ensure the messages consume minimal Kafka storage space. For example, the average message Value (payload) size in the tickit.ecomm.sale topic is a minuscule 98 Bytes. Resource management is critical when you are dealing with hundreds of millions or often billions of daily Kafka messages as a result of the CDC process.

Analysis results of the tickit.ecomm.sale topic

From within the Apicurio Registry UI, we should now see Value schema artifacts for each of the seven Kafka topics.

Apicurio Registry UI showing the Kafka topic’s Avro message’s value schemas

Also from within the Apicurio Registry UI, we can examine details of individual Value schema artifacts.

Apicurio Registry UI showing the tickit.ecomm.sale topic’s value schemas

Kafka Connect Sink Connector

In addition to the three source connectors, there is a single Kafka Connect sink connector, sink_connector_kafka_s3_avro_tickit. The sink connector copies messages from the seven Kafka topics, all prefixed with tickit, to the Bronze area of our Amazon S3-based data lake.

Sink connector Kafka consumer, shown in the Kafka UI, consumer messages from seven topics

The connector uses Confluent’s S3 Sink Connector (S3SinkConnector). Like Kafka, the connector writes all changes to Amazon S3 in Apache Avro format, with the schemas already stored in Apicurio Registry.

{
"name": "sink_connector_kafka_s3_avro_tickit",
"config": {
"behavior.on.null.values": "ignore",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"flush.size": 10000,
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"locale": "en-US",
"partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
"rotate.schedule.interval.ms": 60000,
"s3.bucket.name": "<your_data_lake_s3_bucket>",
"s3.part.size": 5242880,
"s3.region": "us-east-1",
"schema.compatibility": "NONE",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"table.name.format": "${topic}",
"tasks.max": 1,
"timestamp.extractor": "Wallclock",
"timezone": "UTC",
"topics.dir": "cdc_hudi_data_lake/bronze",
"topics.regex": "tickit.(.*)",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;
}
}

Once the sink connector and the three source connectors are running with Kafka Connect, we should see a series of seven subdirectories within the Bronze area of the data lake.

Raw CDC data in the Bronze area of the data lake

Confluent offers a choice of data partitioning strategies. The sink connector we have deployed uses the Daily Partitioner. According to the documentation, the io.confluent.connect.storage.partitioner.DailyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd and partition.duration.ms=86400000 (one day for one S3 object in each daily directory). Below is an example of Avro files (S3 objects) containing changes to the sale table. The objects are partitioned by the yearmonth, and day they were written to the S3 bucket.

Partitioned Avro-formatted data in the data lakes

Message Transformation

Previously, while discussing the Kafka Connect source connectors, I mentioned that the connector transforms the messages using the unwrap transform. By default, the changes picked up by Debezium during the CDC process contain several additional Debezium-related fields of data. An example of an untransformed message generated by Debezium from the PostgreSQL sale table is shown below.

When the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a consistent snapshot of each database schema. All existing records are captured. Unlike an UPDATE ("op" : "u") or a DELETE ("op" : "d"), these initial snapshot messages, like the one shown below, represent a READ operation ("op" : "r"). As a result, there is no before data only after.

{
"before" : null,
"after" : {
"full.ecomm.sale.Value" : {
"salesid" : 1,
"listid" : 1,
"sellerid" : 36861,
"buyerid" : 21191,
"eventid" : 7872,
"dateid" : 1875,
"qtysold" : 4,
"pricepaid" : 728.0,
"commission" : 109.2,
"saletime" : "2/18/2020 02:36:48"
}
},
"source" : {
"version" : "2.1.2.Final",
"connector" : "mysql",
"name" : "full",
"ts_ms" : 1677415227000,
"snapshot" : {
"string" : "first_in_data_collection"
},
"db" : "ecomm",
"sequence" : null,
"table" : {
"string" : "sale"
},
"server_id" : 0,
"gtid" : null,
"file" : "mysql-bin-changelog.074478",
"pos" : 154,
"row" : 0,
"thread" : null,
"query" : null
},
"op" : "r",
"ts_ms" : {
"long" : 1677415227078
},
"transaction" : null
}

According to Debezium’s documentation, “Debezium provides a single message transformation [SMT] that crosses the bridge between the complex and simple formats, the UnwrapFromEnvelope SMT.” Below, we see the results of Debezium’s unwrap transform of the same message. The unwrap transform flattens the nested JSON structure of the original message, adds the __deleted field, and includes fields based on the list included in the source connector’s configuration. These fields are prefixed with a double underscore (e.g., __table).

{
"salesid" : 1,
"listid" : 1,
"sellerid" : 36861,
"buyerid" : 21191,
"eventid" : 7872,
"dateid" : 1875,
"qtysold" : 4,
"pricepaid" : 728.0,
"commission" : 109.2,
"saletime" : "2/18/2020 02:36:48",
"__name" : {
"string" : "tickit"
},
"__op" : {
"string" : "r"
},
"__db" : {
"string" : "ecomm"
},
"__table" : {
"string" : "sale"
},
"__source_ts_ms" : {
"long" : 1677379962000
},
"__deleted" : {
"string" : "false"
}
}

In the next section, we examine Apache Hudi will manage the data lake. An example of the same message, managed with Hudi, is shown below. Note the five additional Hudi data fields, prefixed with _hoodie_.

{
"_hoodie_commit_time": "20230226135257126",
"_hoodie_commit_seqno": "20230226135257126_0_69229",
"_hoodie_record_key": "1",
"_hoodie_partition_path": "__table=sale",
"_hoodie_file_name": "773bbbb7-b6ec-4ade-9f08-942726584e42-0_0-35-50_20230226135257126.parquet",
"salesid": 1,
"listid": 1,
"sellerid": 36861,
"buyerid": 21191,
"eventid": 7872,
"dateid": 1875,
"qtysold": 4,
"pricepaid": 728,
"commission": 109.2,
"saletime": "2/18/2020 02:36:48",
"__name": "tickit",
"__op": "r",
"__db": "ecomm",
"__table": "sale",
"__source_ts_ms": 1677418032000,
"__deleted": "false"
}

Apache Hudi

With database changes flowing into the Bronze area of our data lake, we are ready to use Apache Hudi to provide data lake capabilities such as ACID transactional guarantees, time travel, rollback, and schema evolution. Using Hudi allows us to store and perform analytics on a specific time-bound version of the data in the data lake. Without Hudi, a query for a single row of data could return multiple results, such as an initial CREATE record, multiple UPDATE records, and a DELETE record.

DeltaStreamer

Using Hudi’s DeltaStreamer, we will continuously stream changes from the Bronze area of the data lake to a Silver area, which Hudi manages. We will run DeltaStreamer continuously, similar to an Apache Spark Structured Streaming job, using Apache Spark on Amazon EMR. Running DeltaStreamer requires using a spark-submit command that references a series of configuration files. There is a common base set of configuration items and a group of configuration items specific to each database table. Below, we see the base configuration, base.properties.

hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.hive_sync.mode=hms
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=tickit_cdc_hudi
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.write.hive_style_partitioning=true
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
hoodie.index.type=GLOBAL_BLOOM
hoodie.bloom.index.update.partition.path=true
view raw base.properties hosted with ❤ by GitHub

The base configuration is referenced by each of the table-specific configuration files. For example, below, we see the tickit.ecomm.sale.properties configuration file for DeltaStreamer.

include=base.properties
hoodie.datasource.hive_sync.partition_fields=__table
hoodie.datasource.hive_sync.table=sale
hoodie.datasource.write.partitionpath.field=__table
hoodie.datasource.write.recordkey.field=salesid
hoodie.deltastreamer.schemaprovider.registry.url=http://http://<your_registry_host>:<port>/apis/ccompat/v6/subjects/tickit.ecomm.sale-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_s3_bucket>/cdc_hudi_data_lake/bronze/tickit.ecomm.sale/

To run DeltaStreamer, you can submit an EMR Step or use EMR’s master node to run a spark-submit command on the cluster. Below, we see an example of the DeltaStreamer spark-submit command using the Merge on Read (MoR) Hudi table type.

DATA_LAKE_BUCKET="<your_data_lake_s3_bucket>"
TARGET_TABLE="tickit.ecomm.sale"
spark-submit \
–name %{TARGET_TABLE} \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–conf spark.yarn.submit.waitAppCompletion=false \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /usr/lib/hudi/hudi-utilities-bundle.jar` \
–props file://${PWD}/${TARGET_TABLE}.properties \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–target-table ${TARGET_TABLE} \
–target-base-path s3://${DATA_LAKE_BUCKET}/cdc_hudi_data_lake/silver/${TARGET_TABLE}/ \
–enable-sync \
–continuous \
–op UPSERT \
> ${TARGET_TABLE}.log 2>&1 &

Next, we see the same example of the DeltaStreamer spark-submit command using the Copy on Write (CoW) Hudi table type.

DATA_LAKE_BUCKET="<your_data_lake_s3_bucket>"
TARGET_TABLE="tickit.ecomm.sale"
spark-submit \
–name %{TARGET_TABLE} \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–conf spark.yarn.submit.waitAppCompletion=false \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /usr/lib/hudi/hudi-utilities-bundle.jar` \
–props file://${PWD}/${TARGET_TABLE}.properties \
–table-type COPY_ON_WRITE \
–source-ordering-field __source_ts_ms \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–target-table ${TARGET_TABLE} \
–target-base-path s3://${DATA_LAKE_BUCKET}/cdc_hudi_data_lake/silver/${TARGET_TABLE}/ \
–enable-sync \
–continuous \
–op UPSERT \
> ${TARGET_TABLE}.log 2>&1 &

For this post’s demonstration, we will run a single-table DeltaStreamer Spark job, with one process for each table using CoW. Alternately, Hudi offers HoodieMultiTableDeltaStreamer, a wrapper on top of HoodieDeltaStreamer, which enables the ingestion of multiple tables at a single time into Hudi datasets. Currently, HoodieMultiTableDeltaStreamer only supports sequential processing of tables to be ingested and COPY_ON_WRITE storage type.

Below, we see an example of three DeltaStreamer Spark jobs running continuously for the datelisting, and sale tables.

Using the Spark UI (History Server) to view active DeltaStreamer [Spark] jobs

Once DeltaStreamer is up and running, we should see a series of subdirectories within the Silver area of the data lake, all managed by Apache Hudi.

Silver area of the data lake, managed by Apache Hudi

Within each subdirectory, partitioned by the table name, is a series of Apache Parquet files, along with other Apache Hudi-specific files. The specific folder structure and files depend on MoR or Cow.

Example of Apache Parquet files managed by Hudi in Amazon S3

AWS Glue Data Catalog

For this post, we are using an AWS Glue Data Catalog, an Apache Hive-compatible metastore, to persist technical metadata stored in the Silver area of the data lake, managed by Apache Hudi. The AWS Glue Data Catalog database, tickit_cdc_hudi, will be automatically created the first time DeltaStreamer runs.

Using DeltaStreamer with the table type of MERGE_ON_READ, there would be two tables in the AWS Glue Data Catalog database for each original table. According to Amazon’s EMR documentation, “Merge on Read (MoR) — Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files.” Hudi creates two tables in the Hive metastore for MoR, a table with the name that you specified, which is a read-optimized view (appended with _ro), and a table with the same name appended with _rt, which is a real-time view. You can query both tables.

AWS Glue Data Catalog showing the Apache Hudi Merge on Read (MoR) tables

According to Amazon’s EMR documentation, “Copy on Write (CoW) — Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write. CoW is the default storage type.” Using COPY_ON_WRITE with DeltaStreamer, there is only a single Hudi table in the AWS Glue Data Catalog database for each corresponding database table.

AWS Glue Data Catalog showing the Apache Hudi Copy on Write (CoW) tables

Examining an individual table in the AWS Glue Data Catalog database, we can see details like the table schema, location of underlying data in S3, input/output formats, Serde serialization library, and table partitions.

AWS Glue Data Catalog database showing the details of the sale table

Database Changes and Data Lake

We are using Kafka Connect, Apache Hudi, and Hudi’s DeltaStreamer to maintain data parity between our databases and the data lake. Let’s look at an example of how a simple data change is propagated from a database to Kafka, then to the Bronze area of the data lake, and finally to the Hudi-managed Silver area of the data lake, written using the CoW table format.

First, we will make a simple update to a single record in the MySQL database’s sale table with the salesid = 200.

UPDATE ecomm.sale s
SET s.buyerid = 11694
s.qtysold = 3,
s.pricepaid = 600.00,
s.commission = 90.00
WHERE s.salesid = 200;
view raw sale_update.sql hosted with ❤ by GitHub
Database update made at 2023–02–27 03:16:59 UTC

Almost immediately, we can see the change picked up by the Kafka Connect Debezium MySQL source connector.

Database update logged by source connector at 2023–02–27 03:16:59 UTC

Almost immediately, in the Bronze area of the data lake, we can see we have a new Avro-formatted file (S3 object) containing the updated record in the partitioned sale subdirectory.

New S3 object containing updated record created at 2023–02–27 03:17:07 UTC

If we examine the new object in the Bronze area of the data lake, we will see a row representing the updated record with the salesid = 200. Note how the operation is now an UPDATE versus a READ ("op" : "u").

{
"salesid" : 200,
"listid" : 214,
"sellerid" : 31484,
"buyerid" : 11694,
"eventid" : 3272,
"dateid" : 1891,
"qtysold" : 4,
"pricepaid" : 600.0,
"commission" : 90.0,
"saletime" : "3/6/2020 02:18:11",
"__name" : {
"string" : "tickit"
},
"__op" : {
"string" : "u"
},
"__db" : {
"string" : "ecomm"
},
"__table" : {
"string" : "sale"
},
"__source_ts_ms" : {
"long" : 1677467818000
},
"__deleted" : {
"string" : "false"
}
}

Next, in the corresponding Silver area of the data lake, managed by Hudi, we should also see a new Parquet file that contains a record of the change. In this example, the file was written approximately 26 seconds after the original change to the database. This is the end-to-end time from database change to when the updated data is queryable in the data lake.

New Hudi-managed S3 object containing updated record created at 2023–02–27 03:17:25 UTC

Similarly, if we examine the new object in the Silver area of the data lake, we will see a row representing the updated record with the salesid = 200. The record was committed approximately 15 seconds after the original database change.

{
"_hoodie_commit_time": "20230227031713915",
"_hoodie_commit_seqno": "20230227031713915_0_2168",
"_hoodie_record_key": "200",
"_hoodie_partition_path": "__table=sale",
"_hoodie_file_name": "cce62f4e-a111-4aae-bfd1-2c5af1c6bdeb-0_0-82-84_20230227031713915.parquet",
"salesid": 200,
"listid": 214,
"sellerid": 31484,
"buyerid": 11694,
"eventid": 3272,
"dateid": 1891,
"qtysold": 4,
"pricepaid": 600,
"commission": 90,
"saletime": "3/6/2020 02:18:11",
"__name": "tickit",
"__op": "u",
"__db": "ecomm",
"__table": "sale",
"__source_ts_ms": 1677467818000,
"__deleted": "false"
}
New Hudi-managed S3 object containing updated record created at 2023–02–27 03:17:13.915 UTC

Querying Hudi Tables with Amazon EMR

Using an EMR Notebook, we can query the updated database record stored in Amazon S3 using Hudi’s CoW table format. First, running a simple Spark SQL query for the record with salesid = 200, returns the latest record, reflecting the changes as indicated by the UPDATE operation value (u) and the _hoodie_commit_time = 2023-02-27 03:17:13.915 UTC.

Hudi Time Travel Query

We can also run a Hudi Time Travel Query, with an as.of.instance set to some arbitrary point in the future. Again, the latest record is returned, reflecting the changes as indicated by the UPDATE operation value (u) and the _hoodie_commit_time = 2023-02-27 03:17:13.915 UTC.

We can run the same Hudi Time Travel Query with an as.of.instance set to on or after the original records were written by DeltaStreamer (_hoodie_commit_time = 2023-02-27 03:13:50.642), but some arbitrary time before the updated record was written (_hoodie_commit_time = 2023-02-27 03:17:13.915). This time, the original record is returned as indicated by the READ operation value (r) and the _hoodie_commit_time = 2023-02-27 03:13:50.642. This is one of the strengths of Apache Hudi, the ability to query data in the present or at any point in the past.

Hudi Point in Time Query

We can also run a Hudi Point in Time Query, with a time range set from the beginning of time until some arbitrary point in the future. Again, the latest record is returned, reflecting the changes as indicated by the UPDATE operation value (u) and the _hoodie_commit_time = 2023-02-27 03:17:13.915.

We can run the same Hudi Point in Time Query but change the time range to two arbitrary time values, both before the updated record was written (_hoodie_commit_time = 2023-02-26 22:39:07.203). This time, the original record is returned as indicated by the READ operation value (r) and the _hoodie_commit_time of 2023-02-27 03:13:50.642. Again, this is one of the strengths of Apache Hudi, the ability to query data in the present or at any point in the past.

Querying Hudi Tables with Amazon Athena

We can also use Amazon Athena and run a SQL query against the AWS Glue Data Catalog database’s sale table to view the updated database record stored in Amazon S3 using Hudi’s CoW table format. The operation (op) column’s value now indicates an UPDATE (u).

Amazon Athena query showing the database update reflected in the data lake managed by Hudi

How are Deletes Handled?

In this post’s architecture, deleted database records are signified with an operation ("op") field value of "d" for a DELETE and a __deleted field value of true.

https://itnext.io/media/f7c4ad62dcd45543fdf63834814a5aab

Back to our previous Jupyter notebook example, when rerunning the Spark SQL query, the latest record is returned, reflecting the changes as indicated by the DELETE operation value (d) and the _hoodie_commit_time = 2023-02-27 03:52:13.689.

Alternatively, we could use an additional Spark SQL filter statement to prevent deleted records from being returned (e.g., df.__op != "d").

Since we only did what is referred to as a soft delete in Hudi terminology, we can run a time travel query with an as.of.instance set to some arbitrary time before the deleted record was written (_hoodie_commit_time = 2023-02-27 03:52:13.689). This time, the original record is returned as indicated by the READ operation value (r) and the _hoodie_commit_time = 2023-02-27 03:13:50.642. We could also use a later as.of.instance to return the version of the record reflecting the the UPDATE operation value (u). This also applies to other query types such as point-in-time queries.

Conclusion

In this post, we learned how to build a near real-time transactional data lake on AWS using one possible architecture. The data lake was built using a combination of open source software (OSS) and fully-managed AWS services. Red Hat’s Debezium, Apache Kafka, and Kafka Connect were used for change data capture (CDC). In addition, Apache Spark, Apache Hudi, and Hudi’s DeltaStreamer were used to manage the data lake. To complete our architecture, we used several fully-managed AWS services, including Amazon RDS, Amazon MKS, Amazon EKS, AWS Glue, and Amazon EMR.

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

Serverless Analytics on AWS: Getting Started with Amazon EMR Serverless and Amazon MSK Serverless

Utilizing the recently released Amazon EMR Serverless and Amazon MSK Serverless for batch and streaming analytics with Apache Spark and Apache Kafka

Introduction

Amazon EMR Serverless

AWS recently announced the general availability (GA) of Amazon EMR Serverless on June 1, 2022. EMR Serverless is a new serverless deployment option in Amazon EMR, in addition to EMR on EC2EMR on EKS, and EMR on AWS Outposts. EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. According to AWS, with EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks.

Amazon MSK Serverless

Similarly, on April 28, 2022, AWS announced the general availability of Amazon MSK Serverless. According to AWS, Amazon MSK Serverless is a cluster type for Amazon MSK that makes it easy to run Apache Kafka without managing and scaling cluster capacity. MSK Serverless automatically provisions and scales compute and storage resources, so you can use Apache Kafka on demand and only pay for the data you stream and retain.

Serverless Analytics

In the following post, we will learn how to use these two new, powerful, cost-effective, and easy-to-operate serverless technologies to perform batch and streaming analytics. The PySpark examples used in this post are similar to those featured in two earlier posts, which featured non-serverless alternatives Amazon EMR on EC2 and Amazon MSK: Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR and Stream Processing with Apache Spark, Kafka, Avro, and Apicurio Registry on AWS using Amazon MSK and EMR.

Source Code

All the source code demonstrated in this post is open-source and available on GitHub.

git clone --depth 1 -b main \
https://github.com/garystafford/emr-msk-serverless-demo.git

Architecture

The post’s high-level architecture consists of an Amazon EMR Serverless Application, Amazon MSK Serverless Cluster, and Amazon EC2 Kafka client instance. To support these three resources, we will need two Amazon Virtual Private Clouds (VPCs), a minimum of three subnets, an AWS Internet Gateway (IGW) or equivalent, an Amazon S3 Bucket, multiple AWS Identity and Access Management (IAM) Roles and Policies, Security Groups, and Route Tables, and a VPC Gateway Endpoint for S3. All resources are constrained to a single AWS account and a single AWS Region, us-east-1.

High-level AWS serverless analytics architecture used in this post

Prerequisites

As a prerequisite for this post, you will need to create the following resources:

  1. (1) Amazon EMR Serverless Application;
  2. (1) Amazon MSK Serverless Cluster;
  3. (1) Amazon S3 Bucket;
  4. (1) VPC Endpoint for S3;
  5. (3) Apache Kafka topics;
  6. PySpark applications, related JAR dependencies, and sample data files uploaded to Amazon S3 Bucket;

Let’s walk through each of these prerequisites.

Amazon EMR Serverless Application

Before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon EMR Serverless, especially, What is Amazon EMR Serverless? Create a new EMR Serverless Application by following the AWS documentation, Getting started with Amazon EMR Serverless. The creation of the EMR Serverless Application includes the following resources:

  1. Amazon S3 bucket for storage of Spark resources;
  2. Amazon VPC with at least two private subnets and associated Security Group(s);
  3. EMR Serverless runtime AWS IAM Role and associated IAM Policy;
  4. Amazon EMR Serverless Application;

For this post, use the latest version of EMR available in the EMR Studio Serverless Application console, the newly released version 6.7.0, to create a Spark application.

EMR Studio Serverless Application creation console

Keep the default pre-initialized capacity, application limits, and application behavior settings.

EMR Studio Serverless Application creation console

Since we are connecting to MSK Serverless from EMR Serverless, we need to configure VPC access. Select the new VPC and at least two private subnets in different Availability Zones (AZs).

EMR Studio Serverless Application creation console

According to the documentation, the subnets selected for EMR Serverless must be private subnets. The associated route tables for the subnets should not contain direct routes to the Internet.

Error resulting from trying to associate a public subnet with EMR Serverless
EMR Studio Serverless Application details console showing new Application

Amazon MSK Serverless Cluster

Similarly, before continuing, I suggest familiarizing yourself with the AWS documentation for Amazon MSK Serverless, especially MSK Serverless. Create a new MSK Serverless Cluster by following the AWS documentation, Getting started using MSK Serverless clusters. The creation of the MSK Serverless Cluster includes the following resources:

  1. AWS IAM Role and associated IAM Policy for the Amazon EC2 Kafka client instance;
  2. VPC with at least one public subnet and associated Security Group(s);
  3. Amazon EC2 instance used as Apache Kafka client, provisioned in the public subnet of the above VPC;
  4. Amazon MSK Serverless Cluster;
Amazon MSK Serverless Create cluster console

Associate the new MSK Serverless Cluster with the EMR Serverless Application’s VPC and two private subnets. Also, associate the cluster with the EC2-based Kafka client instance’s VPC and its public subnet.

Amazon MSK Serverless Create cluster console — VPC 1
Amazon MSK Serverless Create cluster console — VPC 2

According to the AWS documentation, Amazon MSK does not support all AZs. For example, I tried to use a subnet in us-east-1e threw an error. If this happens, choose an alternative AZ.

Error resulting from using an unsupported AZ
Successfully created Amazon MSK Serverless Cluster

VPC Endpoint for S3

To access the Spark resource in Amazon S3 from EMR Serverless running in the two private subnets, we need a VPC Endpoint for S3. Specifically, a Gateway Endpoint, which sends traffic to Amazon S3 or DynamoDB using private IP addresses. A gateway endpoint for Amazon S3 enables you to use private IP addresses to access Amazon S3 without exposure to the public Internet. EMR Serverless does not require public IP addresses, and you don’t need an internet gateway (IGW), a NAT device, or a virtual private gateway in your VPC to connect to S3.

VPC Endpoint for S3 associated with route table for private subnets

Create the VPC Endpoint for S3 (Gateway Endpoint) and add the route table for the two EMR Serverless private subnets. You can add additional routes to that route table, such as VPC peering connections to data sources such as Amazon Redshift or Amazon RDS. However, do not add routes that provide direct Internet access.

Route table for private subnets showing VPC Endpoint to S3 route (first route shown)

Kafka Topics and Sample Messages

Once the MSK Serverless Cluster and EC2-based Kafka client instance are provisioned and running, create the three required Kafka topics using the EC2-based Kafka client instance. I recommend using AWS Systems Manager Session Manager to connect to the client instance as the ec2-user user. Session Manager provides secure and auditable node management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys. Alternatively, you can SSH into the client instance.

Before creating the topics, use a utility like telnet to confirm connectivity between the Kafka client and the MSK Serverless Cluster. Verifying connectivity will save you a lot of frustration with potential security and networking issues.

sudo yum install telnet -y
telnet <your_bootstrap_server_host> 9098
# > Trying 192.168.XX.XX…
# > Connected to boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com.
# > Escape character is '^]'.

With MSK Serverless Cluster connectivity confirmed, create the three Kafka topics: topicAtopicB, and topicC. I am using the default partitioning and replication settings from the AWS Getting Started Tutorial.

cd kafka_2.12-2.8.1
# *** CHANGE ME ***
export BOOTSTRAP_SERVER=<your_bootstrap_server> # e.g., boot-12ab34cd.c2.kafka-serverless.us-east-1.amazonaws.com:9098
bin/kafka-topics.sh –create –topic topicA \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicB \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
bin/kafka-topics.sh –create –topic topicC \
–partitions 6 \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties
# list topics to confirm creation
bin/kafka-topics.sh –list \
–bootstrap-server $BOOTSTRAP_SERVER \
–command-config config/client.properties

To create some quick sample data, we will copy and paste 250 messages from a file included in the GitHub project, sample_data/sales_messages.txt, into topicA. The messages are simple mock sales transactions.

{"payment_id":16940,"customer_id":130,"amount":5.99,"payment_date":"2021-05-08 21:21:56.996577 +00:00","city":"guas Lindas de Gois","district":"Gois","country":"Brazil"}
{"payment_id":16406,"customer_id":459,"amount":5.99,"payment_date":"2021-05-08 21:22:59.996577 +00:00","city":"Qomsheh","district":"Esfahan","country":"Iran"}
{"payment_id":16315,"customer_id":408,"amount":6.99,"payment_date":"2021-05-08 21:32:05.996577 +00:00","city":"Jaffna","district":"Northern","country":"Sri Lanka"}
{"payment_id":16185,"customer_id":333,"amount":7.99,"payment_date":"2021-05-08 21:33:07.996577 +00:00","city":"Baku","district":"Baki","country":"Azerbaijan"}
{"payment_id":17097,"customer_id":222,"amount":9.99,"payment_date":"2021-05-08 21:33:47.996577 +00:00","city":"Jaroslavl","district":"Jaroslavl","country":"Russian Federation"}
{"payment_id":16579,"customer_id":549,"amount":3.99,"payment_date":"2021-05-08 21:36:33.996577 +00:00","city":"Santiago de Compostela","district":"Galicia","country":"Spain"}
{"payment_id":16050,"customer_id":269,"amount":4.99,"payment_date":"2021-05-08 21:40:19.996577 +00:00","city":"Salinas","district":"California","country":"United States"}
{"payment_id":17126,"customer_id":239,"amount":7.99,"payment_date":"2021-05-08 22:00:12.996577 +00:00","city":"Ciomas","district":"West Java","country":"Indonesia"}
{"payment_id":16933,"customer_id":126,"amount":7.99,"payment_date":"2021-05-08 22:29:06.996577 +00:00","city":"Po","district":"So Paulo","country":"Brazil"}
{"payment_id":16297,"customer_id":399,"amount":8.99,"payment_date":"2021-05-08 22:30:47.996577 +00:00","city":"Okara","district":"Punjab","country":"Pakistan"}

Use the kafka-console-producer Shell script to publish the messages to the Kafka topic. Use the kafka-console-consumer Shell script to validate the messages made it to the topic by consuming a few messages.

bin/kafka-console-producer.sh \
–topic topicA \
–bootstrap-server $BOOTSTRAP_SERVER \
–producer.config config/client.properties
# copy and paste contents of 'sales_messages.txt' and then Ctrl+C to exit
# check for messages in topic
bin/kafka-console-consumer.sh \
–topic topicA \
–from-beginning –max-messages 5 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVER \
–consumer.config config/client.properties

The output should look similar to the following example.

Sample message output from Kafka topic

Spark Resources in Amazon S3

To submit and run the five Spark Jobs included in the project, you will need to copy the following resources to your Amazon S3 bucket: (5) Apache Spark jobs, (5) related JAR dependencies, and (2) sample data files.

PySpark Applications
To start, copy the five PySpark applications to a scripts/ subdirectory within your Amazon S3 bucket.

PySpark applications uploaded to the Amazon S3 bucket

Sample Data
Next, copy the two sample data files to a sample_data/ subdirectory within your Amazon S3 bucket. The large file contains 2,000 messages, while the small file contains 600 messages. These two files can be used interchangeably with the post’s final streaming example.

Sample sales data uploaded to the Amazon S3 bucket

PySpark Dependencies
Lastly, the PySpark applications have a handful of JAR dependencies that must be available when the job runs, which are not on the EMR Serverless classpath by default. If you are unsure which JARs are already on the EMR Serverless classpath, you can check the Spark UI’s Environment tab’s Classpath Entries section. Accessing the Spark UI is demonstrated in the first PySpark application example, below.

Spark UI’s Environment tab showing Classpath Entries

It is critical to choose the correct version of each JAR dependency based on the version of libraries used with the EMR and MSK. Using the wrong version or inconsistent versions, especially Scala, can result in job failures. Specifically, we are targeting Spark 3.2.1 and Scala 2.12 (EMR v6.7.0: Amazon’s Spark 3.2.1, Scala 2.12.15, Amazon Corretto 8 version of OpenJDK), and Apache Kafka 2.8.1 (MSK Serverless: Kafka 2.8.1).

Download the seven JAR files locally, then copy them to a jars/ subdirectory within your Amazon S3 bucket.

Dependency JARs uploaded to the Amazon S3 bucket

PySpark Applications Examples

With the EMR Serverless Application, MSK Serverless Cluster, Kafka topics, and sample data created, and the Spark resources uploaded to Amazon S3, we are ready to explore four different Spark examples.

Example 1: Kafka Batch Aggregation to the Console

The first PySpark application, 01_example_console.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to the console (stdout).

There are no hard-coded values in any of the PySpark application examples. All required environment-specific variables, such as your MSK Serverless bootstrap server (host and port) and Amazon S3 bucket name, will be passed to the running Spark jobs as arguments from the spark-submit command.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to the console (stdout)
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("01-example-console") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.format("console") \
.option("numRows", 25) \
.option("truncate", False) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your first PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. You will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 01-example-console \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/01_example_console.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see the new Spark job you just submitted in one of several job states.

EMR Studio Serverless Application details console

You can click on the Spark job to get more details. Note the Script arguments and Spark properties passed in from the spark-submit command.

EMR Studio Serverless Application details Job details view

From the Spark job details tab, access the Spark UI, aka Spark Web UI, from a button in the upper right corner of the screen. If you have experience with Spark, you are most likely familiar with the Spark Web UI to monitor and tune Spark jobs.

Spark History Server UI

From the initial screen, the Spark History Server tab, click on the App ID. You can access an enormous amount of Spark-related information about your job and EMR environment from the Spark Web UI.

Spark UI’s Stages tab
Spark UI’s Stages tab showing a Directed acyclic graph (DAG) Visualization
Spark UI’s Environment tab showing environment variables, including versions of Spark, Java, and Scala

The Executors tab will give you access to the Spark job’s output. The output we are most interested in is the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI ‘s Executors tab

The stderr contains output related to the running Spark job. Below we see an example of Kafka consumer configuration values output to stderr. Several of these values were passed in from the Spark job, including items such as kafka.bootstrap.serverssecurity.protocolsasl.mechanism, and sasl.jaas.config.

driver executor’s stderr output to the console

The stdout from the driver executor contains the console output as directed from the Spark job. Below we see the successfully aggregated results of the first Spark job, output to stdout.

+——————+——+——+
|country |sales |orders|
+——————+——+——+
|India |138.80|20 |
|China |133.80|20 |
|Mexico |106.86|14 |
|Japan |100.86|14 |
|Brazil |96.87 |13 |
|Russian Federation|94.87 |13 |
|United States |92.86 |14 |
|Nigeria |58.93 |7 |
|Philippines |58.92 |8 |
|South Africa |46.94 |6 |
|Argentina |42.93 |7 |
|Germany |39.96 |4 |
|Indonesia |38.95 |5 |
|Italy |35.95 |5 |
|Iran |33.95 |5 |
|South Korea |33.94 |6 |
|Poland |30.97 |3 |
|Pakistan |25.97 |3 |
|Taiwan |25.96 |4 |
|Mozambique |23.97 |3 |
|Ukraine |23.96 |4 |
|Vietnam |23.96 |4 |
|Venezuela |22.97 |3 |
|France |20.98 |2 |
|Peru |19.98 |2 |
+——————+——+——+
only showing top 25 rows

Example 2: Kafka Batch Aggregation to CSV in S3

Although the console is useful for development and debugging, it is typically not used in Production. Instead, Spark typically sends results to S3 as CSV, JSON, Parquet, or Arvo formatted files, to Kafka, to a database, or to an API endpoint. The second PySpark application, 02_example_csv_s3.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a CSV file in Amazon S3.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to CSV file in Amazon S3
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("02-example-csv-s3") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.write \
.csv(path=f"s3a://{args.s3_bucket}/output/", header=True, sep="|")
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your second PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first example, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 02-example-csv-s3 \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/02_example_csv_s3.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

If successful, the Spark job should create a single CSV file in the designated Amazon S3 key (directory path) and an empty _SUCCESS indicator file. The presence of an empty _SUCCESS file signifies that the save() operation completed normally.

Amazon S3 bucket showing CSV file output by Spark job

Below we see the expected pipe-delimited output from the second Spark job.

country|sales|orders
India|138.80|20
China|133.80|20
Mexico|106.86|14
Japan|100.86|14
Brazil|96.87|13
Russian Federation|94.87|13
United States|92.86|14
Nigeria|58.93|7
Philippines|58.92|8
South Africa|46.94|6
Argentina|42.93|7
Germany|39.96|4
Indonesia|38.95|5
Italy|35.95|5
Iran|33.95|5

Example 3: Kafka Batch Aggregation to Kafka

The third PySpark application, 03_example_kafka.py, reads the same 250 sample sales messages from topicA you published earlier, aggregates the messages, and writes the total sales and quantity of orders by country to a second Kafka topic, topicB. This job now has both read and write options.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads messages from Kafka topicA and write aggregated messages to topicB
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
from pyspark.sql.window import Window
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("03-example-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales, args)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"endingOffsets":
"latest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.read \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"topic":
args.write_topic,
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
window = Window.partitionBy("country").orderBy("amount")
window_agg = Window.partitionBy("country")
df_sales \
.selectExpr("CAST(value AS STRING)") \
.select(F.from_json("value", schema=schema).alias("data")) \
.select("data.*") \
.withColumn("row", F.row_number().over(window)) \
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \
.filter(F.col("row") == 1).drop("row") \
.select("country",
F.format_number("sales", 2).alias("sales"),
F.format_number("orders", 0).alias("orders")) \
.coalesce(1) \
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \
.select(F.to_json(F.struct("*"))).toDF("value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicA", required=False, help="Kafka topic to read from")
parser.add_argument("–write_topic", default="topicB", required=False, help="Kafka topic to write to")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit your next PySpark job to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Similar to the first two examples, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 03-example-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/03_example_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Once the job completes, you can confirm the results by returning to your EC2-based Kafka client. Use the same kafka-console-consumer command you used previously to show messages from topicB.

bin/kafka-console-consumer.sh \
–topic topicB \
–from-beginning –max-messages 10 \
–property print.value=true \
–property print.offset=true \
–property print.partition=true \
–property print.timestamp=true \
–bootstrap-server $BOOTSTRAP_SERVERS \
–consumer.config config/client.properties

If the Spark job and the Kafka client command worked successfully, you should see aggregated messages similar to the example output below. Note we are not using keys with the Kafka messages, only values for these simple examples.

Aggregated messages from Kafka topic

Example 4: Spark Structured Streaming

For our final example, we will switch from batch to streaming — from read to readstream and from write to writestream. Before continuing, I suggest reading the Structured Streaming Programming Guide.

In this example, we will demonstrate how to continuously measure a common business metric — real-time sales volumes. Imagine you are sell products globally and want to understand the relationship between the time of day and buying patterns in different geographic regions in real-time. For any given window of time — this 15-minute period, this hour, this day, or this week— you want to know the current sales volumes by country. You are not reviewing previous sales periods or examing running sales totals, but real-time sales during a sliding time window.

We will use two PySpark jobs running concurrently to simulate this metric. The first application, 04_stream_sales_to_kafka.py, simulates streaming data by continuously writing messages to topicC — 2,000 messages with a 0.5-second delay between messages. In my tests, the job ran for ~28–29 minutes.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Write messages from a CSV file to Kafka topicC
# to simulate real-time streaming sales data
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers and –s3_bucket arguments
import argparse
import time
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("04-stream-sales-to-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
df_sales = read_from_csv(spark, schema, args)
df_sales.cache()
write_to_kafka(spark, df_sales, args)
def read_from_csv(spark, schema, args):
df_sales = spark.read \
.csv(path=f"s3a://{args.s3_bucket}/sample_data/{args.sample_data_file}",
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, df_sales, args):
options_write = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"topic":
args.write_topic,
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp())
df_message \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
.save()
df_message.show(1)
time.sleep(args.message_delay)
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("–write_topic", default="topicC", required=False, help="Kafka topic to write to")
parser.add_argument("–sample_data_file", default="sales_incremental_large.csv", required=False, help="data file")
parser.add_argument("–message_delay", default=0.5, required=False, help="message publishing delay")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

Simultaneously, the PySpark application, 05_streaming_kafka.py, continuously consumes the sales transaction messages from the same topic, topicC. Then, Spark aggregates messages over a sliding event-time window and writes the results to the console.

# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Reads stream of messages from Kafka topicC and
# writes stream of aggregations over sliding event-time window to console (stdout)
# References: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires –bootstrap_servers argument
import argparse
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType, TimestampType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("05-streaming-kafka") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
df_sales = read_from_kafka(spark, args)
summarize_sales(df_sales)
def read_from_kafka(spark, args):
options_read = {
"kafka.bootstrap.servers":
args.bootstrap_servers,
"subscribe":
args.read_topic,
"startingOffsets":
"earliest",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark \
.readStream \
.format("kafka") \
.options(**options_read) \
.load()
return df_sales
def summarize_sales(df_sales):
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])
ds_sales = df_sales \
.selectExpr("CAST(value AS STRING)", "timestamp") \
.select(F.from_json("value", schema=schema).alias("data"), "timestamp") \
.select("data.*", "timestamp") \
.withWatermark("timestamp", "10 minutes") \
.groupBy("country",
F.window("timestamp", "10 minutes", "5 minutes")) \
.agg(F.sum("amount"), F.count("amount")) \
.orderBy(F.col("window").desc(),
F.col("sum(amount)").desc()) \
.select("country",
F.format_number("sum(amount)", 2).alias("sales"),
F.format_number("count(amount)", 0).alias("orders"),
"window.start", "window.end") \
.coalesce(1) \
.writeStream \
.queryName("streaming_to_console") \
.trigger(processingTime="1 minute") \
.outputMode("complete") \
.format("console") \
.option("numRows", 10) \
.option("truncate", False) \
.start()
ds_sales.awaitTermination()
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("–bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("–read_topic", default="topicC", required=False, help="Kafka topic to read from")
args = parser.parse_args()
return args
if __name__ == "__main__":
main()

To submit the two PySpark jobs to the EMR Serverless Application, use the emr-serverless API from the AWS CLI. Again, you will need (4) values: 1) your EMR Serverless Application’s application-id, 2) the ARN of your EMR Serverless Application’s execution IAM Role, 3) your MSK Serverless bootstrap server (host and port), and 4) the name of your Amazon S3 bucket containing the Spark resources.

# run 04 and 05 simultaneously
aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 04-stream-sales-to-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/04_stream_sales_to_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>",
"–s3_bucket=<your_s3_bucket>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'
aws emr-serverless start-job-run \
–application-id <your_application_id> \
–execution-role-arn <your_execution_role_arn> \
–name 05-streaming-kafka \
–job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<your_s3_bucket>/scripts/05_streaming_kafka.py",
"entryPointArguments": [
"–bootstrap_servers=<your_bootstrap_server>"
],
"sparkSubmitParameters": "–conf spark.jars=s3://<your_s3_bucket>/jars/*.jar"
}
}'

Switching to the EMR Serverless Application console, you should see both Spark jobs you just submitted in one of several job states.

EMR Studio Serverless Application details console

Using the Spark UI again, we can review the output from the second job, 05_streaming_kafka.py.

Spark UI’s Jobs tab

With Spark Structured Streaming jobs, we have an extra tab in the Spark UI, Structured Streaming. This tab displays all running jobs with their latest [micro]batch number, the aggregate rate of data arriving, and the aggregate rate at which Spark is processing data. Unfortunately, with MSK Serverless, AWS doesn’t appear to allow access to the detailed streaming query statistics via the Run ID, which greatly reduces its value. You receive a 502 error when clicking on the Run ID hyperlink.

Spark UI’s Structured Streaming tab

The output we are most interested in, again, is contained in the driver executor’s stderr and stdout (first row of the second table, shown below).

Spark UI’s Executors tab

Below we see sample output from stderr. The output shows the results of a micro-batch. According to the Apache Spark documentation, internally, by default, Structured Streaming queries are processed using a micro-batch processing engine. The engine processes data streams as a series of small batch jobs, achieving end-to-end latencies as low as 100ms and exactly-once fault-tolerance guarantees.

22/07/25 14:29:04 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-1b3adb9c-766a-4aec-97a9-decfd7be10e7/commits/.10.b017bcdc-f142-4b28-8891-d3f2d471b740.tmp to file:/tmp/temporary-1b3adb9c-766a-4aec-97a9-decfd7be10e7/commits/10
22/07/25 14:29:04 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "bec9640f-ac16-4d00-bd4a-ed8d29b5f768",
"runId" : "a2e00c3a-924c-4728-b25a-56ee855da7da",
"name" : "streaming_to_console",
"timestamp" : "2022-07-25T14:29:00.000Z",
"batchId" : 10,
"numInputRows" : 73,
"inputRowsPerSecond" : 1.2166666666666666,
"processedRowsPerSecond" : 16.375056078959172,
"durationMs" : {
"addBatch" : 4359,
"getBatch" : 1,
"latestOffset" : 5,
"queryPlanning" : 37,
"triggerExecution" : 4458,
"walCommit" : 27
},
"eventTime" : {
"avg" : "2022-07-25T14:28:29.947Z",
"max" : "2022-07-25T14:28:59.290Z",
"min" : "2022-07-25T14:28:00.559Z",
"watermark" : "2022-07-25T14:17:59.737Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 476,
"numRowsUpdated" : 132,
"allUpdatesTimeMs" : 319,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 14173,
"memoryUsedBytes" : 262776,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 400,
"numStateStoreInstances" : 400,
"customMetrics" : {
"loadedMapCacheHitCount" : 15600,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 154208
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topicC]]",
"startOffset" : {
"topicC" : {
"2" : 101,
"5" : 96,
"4" : 88,
"1" : 84,
"3" : 112,
"0" : 100
}
},
"endOffset" : {
"topicC" : {
"2" : 114,
"5" : 110,
"4" : 98,
"1" : 98,
"3" : 124,
"0" : 110
}
},
"latestOffset" : {
"topicC" : {
"2" : 114,
"5" : 110,
"4" : 98,
"1" : 98,
"3" : 124,
"0" : 110
}
},
"numInputRows" : 73,
"inputRowsPerSecond" : 1.2166666666666666,
"processedRowsPerSecond" : 16.375056078959172,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@52e344bf",
"numOutputRows" : 238
}
}
view raw microbatch.txt hosted with ❤ by GitHub
Example of a Spark Structured Streaming MicroBatch output

The corresponding output to the micro-batch output above is shown below. We see the initial micro-batch results, starting with the first micro-batch before any messages are streamed to topicC.

——————————————-
Batch: 0
——————————————-
+——-+—–+——+—–+—+
|country|sales|orders|start|end|
+——-+—–+——+—–+—+
+——-+—–+——+—–+—+
——————————————-
Batch: 1
——————————————-
+———-+—–+——+——————-+——————-+
|country |sales|orders|start |end |
+———-+—–+——+——————-+——————-+
|Azerbaijan|7.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Sri Lanka |6.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Iran |5.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Brazil |5.99 |1 |2022-07-25 14:15:00|2022-07-25 14:25:00|
|Azerbaijan|7.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Sri Lanka |6.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Brazil |5.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
|Iran |5.99 |1 |2022-07-25 14:10:00|2022-07-25 14:20:00|
+———-+—–+——+——————-+——————-+
——————————————-
Batch: 2
——————————————-
+——————+—–+——+——————-+——————-+
|country |sales|orders|start |end |
+——————+—–+——+——————-+——————-+
|Russian Federation|43.94|6 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|China |37.94|6 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Mexico |34.96|4 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|India |33.95|5 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|United States |26.96|4 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Philippines |22.97|3 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Nigeria |22.97|3 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Iran |14.98|2 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|Vietnam |13.98|2 |2022-07-25 14:20:00|2022-07-25 14:30:00|
|United Kingdom |11.99|1 |2022-07-25 14:20:00|2022-07-25 14:30:00|
+——————+—–+——+——————-+——————-+
only showing top 10 rows
Example of a Spark Structured Streaming MicroBatch results to console

If you are familiar with Spark Structured Streaming, you are likely aware that these Spark jobs run continuously. In other words, the streaming jobs will not stop; they continually await more streaming data.

EMR Studio Serverless Application details console

The first job, 04_stream_sales_to_kafka.py, will run for ~28–29 minutes and stop with a status of Sucess. However, the second job, 05_streaming_kafka.py, the Spark Structured Streaming job, must be manually canceled.

EMR Studio Serverless Application details console

Cleaning Up

You can delete your resources from the AWS Management Console or AWS CLI. However, to delete your Amazon S3 bucket, all objects (including all object versions and delete markers) in the bucket must be deleted before the bucket itself can be deleted.

# delete applicatiom, cluster, and ec2 client
aws kafka delete-cluster –cluster-arn <your_msk_serverless_cluster_arn>
aws emr-serverless delete-application –application-id <your_application_id>
aws ec2 terminate-instances –instance-ids <your_ec2_instance_id>
# all objects (including all object versions and delete markers) in the bucket
# must be deleted before the bucket itself can be deleted.
aws s3api delete-bucket –bucket <your_s3_bucket>

Conclusion

In this post, we discovered how easy it is to adopt a serverless approach to Analytics on AWS. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks. With MSK Serverless, you can use Apache Kafka on demand and pay for the data you stream and retain. In addition, MSK Serverless automatically provisions and scales compute and storage resources. Given suitable analytics use cases, EMR Serverless with MSK Serverless will likely save you time, effort, and expense.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , ,

Leave a comment

Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena

Learn how to develop Cloud-native, RESTful Java services that query data in an AWS-based data lake using Amazon Athena’s API

Introduction

AWS provides a collection of fully-managed services that makes building and managing secure data lakes faster and easier, including AWS Lake Formation, AWS Glue, and Amazon S3. Additional analytics services such as Amazon EMR, AWS Glue Studio, and Amazon Redshift allow Data Scientists and Analysts to run high-performance queries on large volumes of semi-structured and structured data quickly and economically.

What is not always as obvious is how teams develop internal and external customer-facing analytics applications built on top of data lakes. For example, imagine sellers on an eCommerce platform, the scenario used in this post, want to make better marketing decisions regarding their products by analyzing sales trends and buyer preferences. Further, suppose the data required for the analysis must be aggregated from multiple systems and data sources; the ideal use case for a data lake.

Example of a personalized sales report generated from the Spring Boot service’s salesbyseller endpoint

In this post, we will explore an example Java Spring Boot RESTful Web Service that allows end-users to query data stored in a data lake on AWS. The RESTful Web Service will access data stored as Apache Parquet in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena. The service will use Spring Boot and the AWS SDK for Java to expose a secure, RESTful Application Programming Interface (API).

High-level AWS architecture demonstrated in this post

Amazon Athena is a serverless, interactive query service based on Presto, used to query data and analyze big data in Amazon S3 using standard SQL. Using Athena functionality exposed by the AWS SDK for Java and Athena API, the Spring Boot service will demonstrate how to access tablesviewsprepared statements, and saved queries (aka named queries).

Amazon Athena Query Editor

TL;DR

Do you want to explore the source code for this post’s Spring Boot service or deploy it to Kubernetes before reading the full article? All the source code, Docker, and Kubernetes resources are open-source and available on GitHub.

git clone --depth 1 -b main \
    https://github.com/garystafford/athena-spring-app.git

A Docker image for the Spring Boot service is also available on Docker Hub.

Spring Boot service image available on Docker Hub

Data Lake Data Source

There are endless data sources to build a demonstration data lake on AWS. This post uses the TICKIT sample database provided by AWS and designed for Amazon Redshift, AWS’s cloud data warehousing service. The database consists of seven tables. Two previous posts and associated videos, Building a Data Lake on AWS with Apache Airflow and Building a Data Lake on AWS, detail the setup of the data lake used in this post using AWS Glue and optionally Apache Airflow with Amazon MWAA.

Those two posts use the data lake pattern of segmenting data as bronze (aka raw), silver (aka refined), and gold (aka aggregated), popularized by Databricks. The data lake simulates a typical scenario where data originates from multiple sources, including an e-commerce platform, a CRM system, and a SaaS provider must be aggregated and analyzed.

High-level data lake architecture demonstrated in the previous post

Spring Projects with IntelliJ IDE

Although not a requirement, I used JetBrains IntelliJ IDEA 2022 (Ultimate Edition) to develop and test the post’s Spring Boot service. Bootstrapping Spring projects with IntelliJ is easy. Developers can quickly create a Spring project using the Spring Initializr plugin bundled with the IntelliJ.

JetBrains IntelliJ IDEA plugin support for Spring projects

The Spring Initializr plugin’s new project creation wizard is based on start.spring.io. The plugin allows you to quickly select the Spring dependencies you want to incorporate into your project.

Adding dependencies to a new Spring project in IntelliJ

Visual Studio Code

There are also several Spring extensions for the popular Visual Studio Code IDE, including Microsoft’s Spring Initializr Java Support extension.

Spring Initializr Java Support extension for Visual Studio Code by Microsoft

Gradle

This post uses Gradle instead of Maven to develop, test, build, package, and deploy the Spring service. Based on the packages selected in the new project setup shown above, the Spring Initializr plugin’s new project creation wizard creates a build.gradle file. Additional packages, such as LombakMicrometer, and Rest Assured, were added separately.

plugins {
id 'org.springframework.boot' version '2.7.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
id 'io.freefair.lombok' version '6.5.0-rc1'
}
group = 'aws.example'
version = '1.0.0'
sourceCompatibility = '17'
def awsSdkVersion = '2.17.225'
def springBootVersion = '2.7.1'
def restAssuredVersion = '5.1.1'
repositories {
mavenCentral()
}
dependencies {
// aws sdk
runtimeOnly "software.amazon.awssdk:bom:${awsSdkVersion}"
implementation "software.amazon.awssdk:athena:${awsSdkVersion}"
// spring
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}"
developmentOnly "org.springframework.boot:spring-boot-devtools:${springBootVersion}"
implementation 'org.springdoc:springdoc-openapi-ui:1.6.9'
implementation 'org.springframework:spring-context:5.3.20'
// testings
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
testImplementation "io.rest-assured:rest-assured:${restAssuredVersion}"
testImplementation "io.rest-assured:json-path:${restAssuredVersion}"
testImplementation "io.rest-assured:xml-path:${restAssuredVersion}"
testImplementation "io.rest-assured:json-schema-validator:${restAssuredVersion}"
// monitoring
implementation 'io.micrometer:micrometer-registry-prometheus:1.9.1'
}
tasks.named('test') {
useJUnitPlatform()
}
view raw build.gradle hosted with ❤ by GitHub

Amazon Corretto

The Spring boot service is developed for and compiled with the most recent version of Amazon Corretto 17. According to AWS, Amazon Corretto is a no-cost, multiplatform, production-ready distribution of the Open Java Development Kit (OpenJDK). Corretto comes with long-term support that includes performance enhancements and security fixes. Corretto is certified as compatible with the Java SE standard and is used internally at Amazon for many production services.

Source Code

Each API endpoint in the Spring Boot RESTful Web Service has a corresponding POJO data model class, service interface and service implementation class, and controller class. In addition, there are also common classes such as configuration, a client factory, and Athena-specific request/response methods. Lastly, there are additional class dependencies for views and prepared statements.

Java class relationships related to querying the Amazon Athena refined_tickit_public_category table

The project’s source code is arranged in a logical hierarchy by package and class type.

.
└── com
└── example
└── athena
├── AthenaApplication.java
├── common
│   ├── AthenaClientFactory.java
│   ├── AthenaClientFactoryImp.java
│   ├── AthenaCommon.java
│   ├── NamedQuery.java
│   ├── PreparedStatement.java
│   └── View.java
├── config
│   └── ConfigProperties.java
└── tickit
├── controller
│   ├── BuyerLikesByCategoryController.java
│   ├── CategoryController.java
│   ├── DateDetailController.java
│   ├── EventController.java
│   ├── ListingController.java
│   ├── SaleBySellerController.java
│   ├── SaleController.java
│   ├── SalesByCategoryController.java
│   ├── UserController.java
│   └── VenueController.java
├── model
│   ├── crm
│   │   └── User.java
│   ├── ecomm
│   │   ├── DateDetail.java
│   │   ├── Listing.java
│   │   └── Sale.java
│   ├── resultsets
│   │   ├── BuyerLikesByCategory.java
│   │   ├── SaleBySeller.java
│   │   └── SalesByCategory.java
│   └── saas
│   ├── Category.java
│   ├── Event.java
│   └── Venue.java
└── service
├── BuyerLikesByCategoryServiceImp.java
├── BuyersLikesByCategoryService.java
├── CategoryService.java
├── CategoryServiceImp.java
├── DateDetailService.java
├── DateDetailServiceImp.java
├── EventService.java
├── EventServiceImp.java
├── ListingService.java
├── ListingServiceImp.java
├── SaleBySellerService.java
├── SaleBySellerServiceImp.java
├── SaleService.java
├── SaleServiceImp.java
├── SalesByCategoryService.java
├── SalesByCategoryServiceImp.java
├── UserService.java
├── UserServiceImp.java
├── VenueService.java
└── VenueServiceImp.java

Amazon Athena Access

There are three standard methods for accessing Amazon Athena with the AWS SDK for Java: 1) the AthenaClient service client, 2) the AthenaAsyncClient service client for accessing Athena asynchronously, and 3) using the JDBC driver with the AWS SDK. The AthenaClient and AthenaAsyncClient service clients are both parts of the software.amazon.awssdk.services.athena package. For simplicity, this post’s Spring Boot service uses the AthenaClient service client instead of Java’s asynchronously programming model. AWS supplies basic code samples as part of their documentation as a starting point for writing Athena applications using the SDK. The code samples also use the AthenaClient service client.

POJO-based Data Model Class

For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Plain Old Java Object (POJO). According to Wikipedia, a POGO is an ordinary Java object, not bound by any particular restriction. The POJO class is similar to a JPA Entity, representing persistent data stored in a relational database. In this case, the POJO uses Lombok’s @Data annotation. According to the documentation, this annotation generates getters for all fields, a useful toString method, and hashCode and equals implementations that check all non-transient fields. It also generates setters for all non-final fields and a constructor.

package com.example.athena.tickit.model.saas;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {
private int id;
private int venueId;
private int catId;
private int dateId;
private String name;
private LocalDateTime startTime;
}
view raw Event.java hosted with ❤ by GitHub

Each POJO corresponds directly to a ‘silver’ table in the AWS Glue Data Catalog. For example, the Event POJO corresponds to the refined_tickit_public_event table in the tickit_demo Data Catalog database. The POJO defines the Spring Boot service’s data model for data read from the corresponding AWS Glue Data Catalog table.

Glue Data Catalog refined_tickit_public_event table

The Glue Data Catalog table is the interface between the Athena query and the underlying data stored in Amazon S3 object storage. The Athena query targets the table, which returns the underlying data from S3.

Tickit Category data stored as Apache Parquet files in Amazon S3

Service Class

Retrieving data from the data lake via AWS Glue, using Athena, is handled by a service class. For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Service Interface and implementation class. The service implementation class uses Spring Framework’s @Service annotation. According to the documentation, it indicates that an annotated class is a “Service,” initially defined by Domain-Driven Design (Evans, 2003) as “an operation offered as an interface that stands alone in the model, with no encapsulated state.” Most importantly for the Spring Boot service, this annotation serves as a specialization of @Component, allowing for implementation classes to be autodetected through classpath scanning.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.saas.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class EventServiceImp implements EventService {
private static final Logger logger = LoggerFactory.getLogger(EventServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public EventServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<Event> findAll(Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE eventid IS NOT NULL";
String query = String.format("""
SELECT *
FROM refined_tickit_public_event
%s
ORDER BY eventid
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}
public Event findById(int id) {
String query = String.format("""
SELECT DISTINCT *
FROM refined_tickit_public_event
WHERE eventid=%s""", id);
Event event;
try {
event = startQuery(query).get(0);
} catch (IndexOutOfBoundsException e) {
logger.error(e.getMessage());
return null;
}
return event;
}
private List<Event> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<Event> events = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return events;
}
private List<Event> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<Event> events = new ArrayList<>();
try {
// Max Results can be set but if it's not set,
// it will choose the maximum page size
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Event event = new Event();
event.setId(parseInt(allData.get(0).varCharValue()));
event.setVenueId(parseInt(allData.get(1).varCharValue()));
event.setCatId(parseInt(allData.get(2).varCharValue()));
event.setDateId(parseInt(allData.get(3).varCharValue()));
event.setName(allData.get(4).varCharValue());
event.setStartTime(LocalDateTime.parse(allData.get(5).varCharValue(), formatter));
events.add(event);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return events;
}
}

Using Spring’s common constructor-based Dependency Injection (DI) method (aka constructor injection), the service auto-wires an instance of the AthenaClientFactory interface. Note that we are auto-wiring the service interface, not the service implementation, allowing us to wire in a different implementation if desired, such as for testing.

The service calls the AthenaClientFactoryclass’s createClient() method, which returns a connection to Amazon Athena using one of several available authentication methods. The authentication scheme will depend on where the service is deployed and how you want to securely connect to AWS. Some options include environment variables, local AWS profile, EC2 instance profile, or token from the web identity provider.

return AthenaClient.builder()
.credentialsProvider(EnvironmentVariableCredentialsProvider.create())
.build();

The service class transforms the payload returned by an instance of GetQueryResultsResponse into an ordered collection (also known as a sequence), List<E>, where E represents a POJO. For example, with the data lake’srefined_tickit_public_event table, the service returns a List<Event>. This pattern repeats itself for tables, views, prepared statements, and named queries. Column data types can be transformed and formatted on the fly, new columns added, and existing columns skipped.

List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Event event = new Event();
event.setId(parseInt(allData.get(0).varCharValue()));
event.setVenueId(parseInt(allData.get(1).varCharValue()));
event.setCatId(parseInt(allData.get(2).varCharValue()));
event.setDateId(parseInt(allData.get(3).varCharValue()));
event.setName(allData.get(4).varCharValue());
event.setStartTime(LocalDateTime.parse(allData.get(5).varCharValue(), formatter));
events.add(event);
}
}

For each endpoint defined in the Controller class, for example, get()findAll(), and FindById(), there is a corresponding method in the Service class. Below, we see an example of the findAll() method in the SalesByCategoryServiceImp service class. This method corresponds to the identically named method in the SalesByCategoryController controller class. Each of these service methods follows a similar pattern of constructing a dynamic Athena SQL query based on input parameters, which is passed to Athena through the AthenaClient service client using an instance of GetQueryResultsRequest.

public List<SalesByCategory> findAll(String calendarDate, Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE caldate IS NOT NULL";
if (calendarDate != null) {
whereClause = whereClause + " AND caldate=date('" + calendarDate + "')";
}
String query = String.format("""
SELECT *
FROM tickit_sales_by_day_and_category
%s
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}

Controller Class

Lastly, there is a corresponding Controller class for each API endpoint in the Spring Boot RESTful Web Service. The controller class uses Spring Framework’s @RestController annotation. According to the documentation, this annotation is a convenience annotation that is itself annotated with @Controller and @ResponseBody. Types that carry this annotation are treated as controllers where @RequestMapping methods assume @ResponseBody semantics by default.

The controller class takes a dependency on the corresponding service class application component using constructor-based Dependency Injection (DI). Like the service example above, we are auto-wiring the service interface, not the service implementation.

package com.example.athena.tickit.controller;
import com.example.athena.tickit.model.saas.Event;
import com.example.athena.tickit.service.EventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping(value = "/events")
public class EventController {
private final EventService service;
@Autowired
public EventController(EventService service) {
this.service = service;
}
@RequestMapping(method = RequestMethod.GET)
public ResponseEntity<List<Event>> findAll(
@RequestParam(required = false) Integer limit,
@RequestParam(required = false) Integer offset
) {
List<Event> events = service.findAll(limit, offset);
if (events.size() == 0) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(null);
}
return ResponseEntity.status(HttpStatus.OK).body(events);
}
@RequestMapping(value = "/{id}", method = RequestMethod.GET)
public ResponseEntity<Event> findById(@PathVariable("id") int id) {
Event event = service.findById(id);
if (event == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(null);
}
return ResponseEntity.status(HttpStatus.OK).body(event);
}
}

The controller is responsible for serializing the ordered collection of POJOs into JSON and returning that JSON payload in the body of the HTTP response to the initial HTTP request.

Querying Views

In addition to querying AWS Glue Data Catalog tables (aka Athena tables), we also query views. According to the documentation, a view in Amazon Athena is a logical table, not a physical table. Therefore, the query that defines a view runs each time the view is referenced in a query.

For convenience, each time the Spring Boot service starts, the main AthenaApplication class calls the View.java class’s CreateView() method to check for the existence of the view, view_tickit_sales_by_day_and_category. If the view does not exist, it is created and becomes accessible to all application end-users. The view is queried through the service’s /salesbycategory endpoint.

Java class relationships related to querying the Amazon Athena view

This confirm-or-create pattern is repeated for the prepared statement in the main AthenaApplication class (detailed in the next section).

package com.example.athena;
import com.example.athena.common.PreparedStatement;
import com.example.athena.common.View;
import com.example.athena.config.ConfigProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@SpringBootApplication
@EnableConfigurationProperties(ConfigProperties.class)
public class AthenaApplication {
private final PreparedStatement preparedStatement;
private final View view;
@Autowired
public AthenaApplication(PreparedStatement preparedStatement, View view) {
this.preparedStatement = preparedStatement;
this.view = view;
}
public static void main(String[] args) {
SpringApplication.run(AthenaApplication.class, args);
}
@Bean
void CreatePreparedStatement() {
preparedStatement.CreatePreparedStatement();
}
@Bean
void createView() {
view.CreateView();
}
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**").allowedOrigins("*");
}
};
}
}

Below, we see the View class called by the service at startup.

package com.example.athena.common;
import com.example.athena.config.ConfigProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.GetTableMetadataRequest;
import software.amazon.awssdk.services.athena.model.GetTableMetadataResponse;
import software.amazon.awssdk.services.athena.model.MetadataException;
@Component
public class View {
private static final Logger logger = LoggerFactory.getLogger(View.class);
private final AthenaClientFactory athenaClientFactoryImp;
private final ConfigProperties configProperties;
private final AthenaCommon athenaCommon;
@Autowired
public View(AthenaClientFactory athenaClientFactoryImp,
ConfigProperties configProperties,
AthenaCommon athenaCommon) {
this.athenaClientFactoryImp = athenaClientFactoryImp;
this.configProperties = configProperties;
this.athenaCommon = athenaCommon;
}
public void CreateView() {
String viewName = "view_tickit_sales_by_day_and_category";
String createViewSqlStatement = String.format("""
CREATE VIEW %s AS
SELECT cast(d.caldate AS DATE) AS caldate,
c.catgroup,
c.catname,
sum(round(cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold, 2)) AS saleamount,
sum(cast(s.commission AS DECIMAL(8,2))) AS commission
FROM refined_tickit_public_sales AS s
LEFT JOIN refined_tickit_public_event AS e ON e.eventid = s.eventid
LEFT JOIN refined_tickit_public_date AS d ON d.dateid = s.dateid
LEFT JOIN refined_tickit_public_category AS c ON c.catid = e.catid
GROUP BY caldate,
catgroup,
catname
ORDER BY caldate,
catgroup,
catname;""", viewName);
try (AthenaClient athenaClient = athenaClientFactoryImp.createClient()) {
try {
GetTableMetadataResponse getPreparedStatementRequest = getGetTableMetadataResponse(viewName, athenaClient);
logger.debug(String.format("View already exists: %s", getPreparedStatementRequest.tableMetadata().name()));
} catch (MetadataException e) { // View does not exist
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, createViewSqlStatement);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
// Confirm View was created
GetTableMetadataResponse getPreparedStatementRequest = getGetTableMetadataResponse(viewName, athenaClient);
logger.debug(String.format("View created successfully: %s", getPreparedStatementRequest.tableMetadata().name()));
}
}
}
private GetTableMetadataResponse getGetTableMetadataResponse(String viewName, AthenaClient athenaClient) {
GetTableMetadataRequest getTableMetadataRequest = GetTableMetadataRequest.builder()
.catalogName(configProperties.getCatalog())
.databaseName(configProperties.getDatabase())
.tableName(viewName)
.build();
return athenaClient.getTableMetadata(getTableMetadataRequest);
}
}
view raw View.java hosted with ❤ by GitHub

Aside from the fact the /salesbycategory endpoint queries a view, everything else is identical to querying a table. This endpoint uses the same model-service-controller pattern.

Executing Prepared Statements

According to the documentation, you can use the Athena parameterized query feature to prepare statements for repeated execution of the same query with different query parameters. The prepared statement used by the service, tickit_sales_by_seller, accepts a single parameter, the ID of the seller (sellerid). The prepared statement is executed using the /salesbyseller endpoint. This scenario simulates an end-user of the analytics application who wants to retrieve enriched sales information about their sales.

package com.example.athena.common;
import com.example.athena.config.ConfigProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.CreatePreparedStatementRequest;
import software.amazon.awssdk.services.athena.model.GetPreparedStatementRequest;
import software.amazon.awssdk.services.athena.model.GetPreparedStatementResponse;
import software.amazon.awssdk.services.athena.model.ResourceNotFoundException;
@Component
public class PreparedStatement {
private static final Logger logger = LoggerFactory.getLogger(PreparedStatement.class);
private final AthenaClientFactory athenaClientFactoryImp;
private final ConfigProperties configProperties;
@Autowired
public PreparedStatement(AthenaClientFactory athenaClientFactoryImp, ConfigProperties configProperties) {
this.athenaClientFactoryImp = athenaClientFactoryImp;
this.configProperties = configProperties;
}
public void CreatePreparedStatement() {
String preparedStatementName = "tickit_sales_by_seller";
String preparedStatementSql = """
SELECT cast(d.caldate AS DATE) AS caldate,
s.pricepaid,
s.qtysold,
round(cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold, 2) AS saleamount,
cast(s.commission AS DECIMAL(8,2)) AS commission,
round((cast(s.commission AS DECIMAL(8,2)) / (cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold)) * 100, 2) AS commissionprcnt,
e.eventname,
concat(u1.firstname, ' ', u1.lastname) AS seller,
concat(u2.firstname, ' ', u2.lastname) AS buyer,
c.catgroup,
c.catname
FROM refined_tickit_public_sales AS s
LEFT JOIN refined_tickit_public_listing AS l ON l.listid = s.listid
LEFT JOIN refined_tickit_public_users AS u1 ON u1.userid = s.sellerid
LEFT JOIN refined_tickit_public_users AS u2 ON u2.userid = s.buyerid
LEFT JOIN refined_tickit_public_event AS e ON e.eventid = s.eventid
LEFT JOIN refined_tickit_public_date AS d ON d.dateid = s.dateid
LEFT JOIN refined_tickit_public_category AS c ON c.catid = e.catid
WHERE s.sellerid = ?
ORDER BY caldate,
eventname;""";
try (AthenaClient athenaClient = athenaClientFactoryImp.createClient()) {
try {
GetPreparedStatementResponse getPreparedStatementResponse = getGetPreparedStatementResponse(preparedStatementName, athenaClient);
logger.debug(String.format("Prepared statement already exists: %s", getPreparedStatementResponse.preparedStatement().statementName()));
} catch (ResourceNotFoundException e) { // PreparedStatement does not exist
CreatePreparedStatementRequest createPreparedStatementRequest = CreatePreparedStatementRequest.builder()
.statementName(preparedStatementName)
.description("Returns all sales by seller based on the seller's userid")
.workGroup(configProperties.getWorkGroup())
.queryStatement(preparedStatementSql).build();
athenaClient.createPreparedStatement(createPreparedStatementRequest);
// Confirm PreparedStatement was created
GetPreparedStatementResponse getPreparedStatementResponse = getGetPreparedStatementResponse(preparedStatementName, athenaClient);
logger.debug(String.format("Prepared statement created successfully: %s", getPreparedStatementResponse.preparedStatement().statementName()));
}
}
}
private GetPreparedStatementResponse getGetPreparedStatementResponse(String preparedStatementName, AthenaClient athenaClient) {
GetPreparedStatementRequest getPreparedStatementRequest = GetPreparedStatementRequest.builder()
.statementName(preparedStatementName)
.workGroup(configProperties.getWorkGroup()).build();
return athenaClient.getPreparedStatement(getPreparedStatementRequest);
}
}

The pattern of querying data is similar to tables and views, except instead of using the common SELECT...FROM...WHERE SQL query pattern, we use the EXECUTE...USING pattern.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.tickit.model.resultsets.SaleBySeller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
@Service
public class SaleBySellerServiceImp implements SaleBySellerService {
private static final Logger logger = LoggerFactory.getLogger(SaleBySellerServiceImp.class);
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public SaleBySellerServiceImp(AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<SaleBySeller> find(int id) {
String query = String.format("""
EXECUTE tickit_sales_by_seller USING %s;""", id);
return startQuery(query);
}
private List<SaleBySeller> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<SaleBySeller> saleBySellers = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return saleBySellers;
}
private List<SaleBySeller> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<SaleBySeller> saleBySellers = new ArrayList<>();
try {
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
SaleBySeller saleBySeller = new SaleBySeller();
saleBySeller.setCalDate(LocalDate.parse(allData.get(0).varCharValue()));
saleBySeller.setPricePaid(new BigDecimal(allData.get(1).varCharValue()));
saleBySeller.setQtySold(Integer.parseInt(allData.get(2).varCharValue()));
saleBySeller.setSaleAmount(new BigDecimal(allData.get(3).varCharValue()));
saleBySeller.setCommission(new BigDecimal(allData.get(4).varCharValue()));
saleBySeller.setCommissionPrcnt(Double.valueOf(allData.get(5).varCharValue()));
saleBySeller.setEventName(allData.get(6).varCharValue());
saleBySeller.setSeller(allData.get(7).varCharValue());
saleBySeller.setBuyer(allData.get(8).varCharValue());
saleBySeller.setCatGroup(allData.get(9).varCharValue());
saleBySeller.setCatName(allData.get(10).varCharValue());
saleBySellers.add(saleBySeller);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return saleBySellers;
}
}

For example, to execute the prepared statement for a seller with an ID of 3, we would use EXECUTE tickit_sales_by_seller USING 3;. We pass the seller’s ID of 3 as a path parameter similar to other endpoints exposed by the service: /v1/salesbyseller/3.

Sales by seller query results from Athena using the seller’s ID as a parameter for the prepared statement

Again, aside from the fact the /salesbyseller endpoint executes a prepared statement and passes a parameter; everything else is identical to querying a table or a view, using the same model-service-controller pattern.

Working with Named Queries

In addition to tables, views, and prepared statements, Athena has the concept of saved queries, referred to as named queries in the Athena API and when using AWS CloudFormation. You can use the Athena console or API to save, edit, run, rename, and delete queries. The queries are persisted using a NamedQueryId, a unique identifier (UUID) of the query. You must reference the NamedQueryId when working with existing named queries.

Example of saved query (named query) used in this post

There are multiple ways to use and reuse existing named queries programmatically. For this demonstration, I created the named query, buyer_likes_by_category, in advance and then stored the resulting NamedQueryId as an application property, injected at runtime or kubernetes deployment time through a local environment variable.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.resultsets.BuyerLikesByCategory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class BuyerLikesByCategoryServiceImp implements BuyersLikesByCategoryService {
private static final Logger logger = LoggerFactory.getLogger(BuyerLikesByCategoryServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public BuyerLikesByCategoryServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<BuyerLikesByCategory> get() {
return getNamedQueryResults(configProperties.getNamedQueryId());
}
private List<BuyerLikesByCategory> getNamedQueryResults(String queryId) {
logger.debug(String.format("NamedQueryId: %s", queryId));
AthenaClient athenaClient = athenaClientFactory.createClient();
GetNamedQueryRequest getNamedQueryRequest = GetNamedQueryRequest.builder()
.namedQueryId(queryId)
.build();
GetNamedQueryResponse getNamedQueryResponse = athenaClient.getNamedQuery(getNamedQueryRequest);
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, getNamedQueryResponse.namedQuery().queryString());
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<BuyerLikesByCategory> buyerLikesByCategories = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return buyerLikesByCategories;
}
private List<BuyerLikesByCategory> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<BuyerLikesByCategory> buyerLikesByCategories = new ArrayList<>();
try {
// Max Results can be set but if it's not set,
// it will choose the maximum page size
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
BuyerLikesByCategory buyerLikesByCategory = new BuyerLikesByCategory();
buyerLikesByCategory.setSports(parseInt(allData.get(0).varCharValue()));
buyerLikesByCategory.setTheatre(parseInt(allData.get(1).varCharValue()));
buyerLikesByCategory.setConcerts(parseInt(allData.get(2).varCharValue()));
buyerLikesByCategory.setJazz(parseInt(allData.get(3).varCharValue()));
buyerLikesByCategory.setClassical(parseInt(allData.get(4).varCharValue()));
buyerLikesByCategory.setOpera(parseInt(allData.get(5).varCharValue()));
buyerLikesByCategory.setRock(parseInt(allData.get(6).varCharValue()));
buyerLikesByCategory.setVegas(parseInt(allData.get(7).varCharValue()));
buyerLikesByCategory.setBroadway(parseInt(allData.get(8).varCharValue()));
buyerLikesByCategory.setMusicals(parseInt(allData.get(9).varCharValue()));
buyerLikesByCategories.add(buyerLikesByCategory);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return buyerLikesByCategories;
}
}

Alternately, you might iterate through a list of named queries to find one that matches the name at startup. However, this method would undoubtedly impact service performance, startup time, and cost. Lastly, you could use a method like NamedQuery() included in the unused NamedQuery class at startup, similar to the view and prepared statement. That named query’s unique NamedQueryId would be persisted as a system property, referencable by the service class. The downside is that you would create a duplicate of the named query each time you start the service. Therefore, this method is also not recommended.

Configuration

Two components responsible for persisting configuration for the Spring Boot service are the application.yml properties file and ConfigProperties class. The class uses Spring Framework’s @ConfigurationProperties annotation. According to the documentation, this annotation is used for externalized configuration. Add this to a class definition or a @Bean method in a @Configuration class if you want to bind and validate some external Properties (e.g., from a .properties or .yml file). Binding is performed by calling setters on the annotated class or, if @ConstructorBinding in use, by binding to the constructor parameters.

The @ConfigurationProperties annotation includes the prefix of athena. This value corresponds to the athena prefix in the the application.yml properties file. The fields in the ConfigProperties class are bound to the properties in the the application.yml. For example, the property, namedQueryId, is bound to the property, athena.named.query.id. Further, that property is bound to an external environment variable, NAMED_QUERY_ID. These values could be supplied from an external configuration system, a Kubernetes secret, or external secrets management system.

spring:
profiles:
active: dev
server:
port: 8080
servlet:
contextPath: /v1
athena:
region: us-east-1
workgroup: primary
catalog: AwsDataCatalog
database: tickit_demo
limit: 25
client-execution-timeout: 100000
retry-sleep: 1000
results-bucket: ${RESULTS_BUCKET}
named-query-id: ${NAMED_QUERY_ID}
spring:
config:
activate:
on-profile: dev
logging:
level:
root: DEBUG
management:
endpoints:
web:
exposure:
include: '*'
jmx:
exposure:
include: '*'
spring:
config:
activate:
on-profile: prod
logging:
level:
root: INFO
management:
endpoints:
web:
exposure:
include: health, prometheus
jmx:
exposure:
include: health
view raw application.yml hosted with ❤ by GitHub

AWS IAM: Authentication and Authorization

For the Spring Boot service to interact with Amazon Athena, AWS Glue, and Amazon S3, you need to establish an AWS IAM Role, which the service assumes once authenticated. The Role must be associated with an attached IAM Policy containing the requisite Athena, Glue, and S3 permissions. For development, the service uses a policy similar to the one shown below. Please note this policy is broader than recommended for Production; it does not represent the security best practice of least privilege. In particular, the use of the overly-broad * for Resources should be strictly avoided when creating policies.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:CreatePreparedStatement",
"athena:ListPreparedStatements",
"glue:CreateTable",
"athena:CreateNamedQuery",
"athena:ListNamedQueries",
"athena:GetTableMetadata",
"athena:GetPreparedStatement",
"athena:GetQueryResults",
"athena:GetQueryExecution",
"athena:GetNamedQuery"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:BatchGetPartition",
"glue:GetTable"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject",
"s3:PutBucketPublicAccessBlock"
],
"Resource": [
"arn:aws:s3:::aws-athena-query-results-*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::date-lake-demo-*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:ListAllMyBuckets"
],
"Resource": [
"*"
]
}
]
}

In addition to the authorization granted by the IAM Policy, AWS Lake Formation can be used with Amazon S3, AWS Glue, and Amazon Athena to grant fine-grained database-, table-, column-, and row-level access to datasets.

Swagger UI and the OpenAPI Specification

The easiest way to view and experiment with all the endpoints available through the controller classes is using the Swagger UI, included in the example Spring Boot service, by way of the springdoc-openapi Java library. The Swagger UI is accessed at /v1/swagger-ui/index.html.

Swagger UI showing endpoints exposed by the service’s controller classes

The OpenAPI Specification (formerly Swagger Specification) is an API description format for REST APIs. The /v1/v3/api-docs endpoint allows you to generate an OpenAPI v3 specification file. The OpenAPI file describes the entire API.

Spring Boot service’s OpenAPI v3 specification

The OpenAPI v3 specification can be saved as a file and imported into applications like Postman, the API platform for building and using APIs.

Calling the service’s /users API endpoint using Postman
Running a suite of integration tests against the Spring Boot service using Postman

Integration Tests

Included in the Spring Boot service’s source code is a limited number of example integration tests, not to be confused with unit tests. Each test class uses Spring Framework’s @SpringBootTest annotation. According to the documentation, this annotation can be specified on a test class that runs Spring Boot-based tests. It provides several features over and above the regular Spring TestContext Framework.

package com.example.athena;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import static io.restassured.RestAssured.get;
import static io.restassured.RestAssured.given;
import static io.restassured.http.ContentType.JSON;
@SpringBootTest
class CategoriesResourceTests {
private static final String ResourcePath = "/v1/categories";
private static final int resultsetLimit = 25;
@Test
void findAll() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.greaterThanOrEqualTo(1))
.body("$.size()", Matchers.lessThanOrEqualTo(resultsetLimit));
}
@Test
void findAllWithLimit() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("limit", 3)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.equalTo(3));
}
@Test
void findAllWithOffset() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("offset", 2)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.greaterThanOrEqualTo(1))
.body("$.size()", Matchers.lessThanOrEqualTo(resultsetLimit));
}
@Test
void findAllWithLimitAndOffset() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("limit", 3)
.queryParam("offset", 2)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.equalTo(3));
}
@Test
void findById() {
// Get the first 'id' available
int id = get(ResourcePath + "?limit=1")
.then()
.extract()
.path("[0].id");
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath + "/{id}", id)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("id", Matchers.equalTo(id));
}
}

The integration tests use Rest Assured’s given-when-then pattern of testing, made popular as part of Behavior-Driven Development (BDD). In addition, each test uses the JUnit’s @Test annotation. According to the documentation, this annotation signals that the annotated method is a test method. Therefore, methods using this annotation must not be private or static and must not return a value.

@Test
void findById() {
// Get the first 'id' available
int id = get(ResourcePath + "?limit=1")
.then()
.extract()
.path("[0].id");
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath + "/{id}", id)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("id", Matchers.equalTo(id));
}

Run the integration tests using Gradle from the project’s root: ./gradlew clean build test. A detailed ‘Test Summary’ is produced in the project’s build directory as HTML for easy review.

Test Details
Test Details

Load Testing the Service

In Production, the Spring Boot service will need to handle multiple concurrent users executing queries against Amazon Athena.

Athena’s Recent Queries console shows multi concurrent queries being queued and executed

We could use various load testing tools to evaluate the service’s ability to handle multiple concurrent users. One of the simplest is my favorite go-based utility, hey, which sends load to a URL using a provided number of requests in the provided concurrency level and prints stats. It also supports HTTP2 endpoints. So, for example, we could execute 500 HTTP requests with a concurrency level of 25 against the Spring Boot service’s /users endpoint using hey. The post’s integration tests were run against three Kubernetes replica pods of the service deployed to Amazon EKS.

hey -n 500 -c 25 -T "application/json;charset=UTF-8" \
-h2 https://athena.example-api.com/v1/users

From Athena’s Recent Queries console, we see many simultaneous queries being queued and executed by a hey through the Spring Boot service’s endpoint.

Athena’s Recent Queries console shows simultaneous queries being queued and executed

Metrics

The Spring Boot service implements the micrometer-registry-prometheus extension. The Micrometer metrics library exposes runtime and application metrics. Micrometer defines a core library, providing a registration mechanism for metrics and core metric types. These metrics are exposed by the service’s /v1/actuator/prometheus endpoint.

Metrics exposed using the Prometheus endpoint

Using the Micrometer extension, metrics exposed by the /v1/actuator/prometheus endpoint can be scraped and visualized by tools such as Prometheus. Conveniently, AWS offers the fully-managed Amazon Managed Service for Prometheus (AMP), which easily integrates with Amazon EKS.

Graph of HTTP server requests scraped by Prometheus from the Spring Boot service

Using Prometheus as a datasource, we can build dashboards in Grafana to observe the service’s metrics. Like AMP, AWS also offers the fully-managed Amazon Managed Grafana (AMG).

Grafana dashboard showing metrics from Prometheus for Spring Boot service deployed to Amazon EKS
Grafana dashboard showing JVM metrics from Prometheus for Spring Boot service deployed to Amazon EKS

Conclusion

This post taught us how to create a Spring Boot RESTful Web Service, allowing end-user applications to securely query data stored in a data lake on AWS. The service used AWS SDK for Java to access data stored in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author unless otherwise noted.

, , , , , ,

2 Comments

End-to-End Data Discovery, Observability, and Governance on AWS with LinkedIn’s Open-source DataHub

Use DataHub’s data catalog capabilities to collect, organize, enrich, and search for metadata across multiple platforms

Introduction

According to Shirshanka Das, Founder of LinkedIn DataHub, Apache Gobblin, and Acryl Data, one of the simplest definitions for a data catalog can be found on the Oracle website: “Simply put, a data catalog is an organized inventory of data assets in the organization. It uses metadata to help organizations manage their data. It also helps data professionals collect, organize, access, and enrich metadata to support data discovery and governance.

Another succinct description of a data catalog’s purpose comes from Alation: “a collection of metadata, combined with data management and search tools, that helps analysts and other data users to find the data that they need, serves as an inventory of available data, and provides information to evaluate the fitness of data for intended uses.

Working with many organizations in the area of Analytics, one of the more common requests I receive regards choosing and implementing a data catalog. Organizations have datasources hosted in corporate data centers, on AWS, by SaaS providers, and with other Cloud Service Providers. Several of these organizations have recently gravitated to DataHub, the open-source metadata platform for the modern data stack, originally developed by LinkedIn.

View of DataHub’s home screen showing a variety of datasources

In this post, we will explore the capabilities of DataHub to build a centralized data catalog on AWS for datasources hosted in multiple AWS accounts, SaaS providers, cloud service providers, and corporate data centers. I will demonstrate how to build a DataHub data catalog using out-of-the-box data source plugins for automated metadata ingestion.

Another example of searching for cataloged entities in DataHub’s browser-based UI

Data Catalog Competitors

Data catalogs are not new; technologies such as data dictionaries have been around as far back as the 1980’s. Gartner publishes their Metadata Management (EMM) Solutions Reviews and Ratings and Metadata Management Magic Quadrant. These reports contain a comprehensive list of traditional commercial enterprise players, modern cloud-native SaaS vendors, and Cloud Service Provider (CSP) offerings. DBMS Tools also hosts a comprehensive list of 30 data catalogs. A sampling of current data catalogs includes:

Open Source Software

Commercial

Cloud Service Providers

Data Catalog Features

DataHub describes itself as “a modern data catalog built to enable end-to-end data discovery, data observability, and data governance.” Sorting through vendor’s marketing jargon and hype, standard features of leading data catalogs include:

  • Metadata ingestion
  • Data discovery
  • Data governance
  • Data observability
  • Data lineage
  • Data dictionary
  • Data classification
  • Usage/popularity statistics
  • Sensitive data handling
  • Data fitness (aka data quality or data profiling)
  • Manage both technical and business metadata
  • Business glossary
  • Tagging
  • Natively supported datasource integrations
  • Advanced metadata search
  • Fine-grain authentication and authorization
  • UI- and API-based interaction

Datasources

When considering a data catalog solution, in my experience, the most common datasources that customers want to discover, inventory, and search include:

  • Relational databases and other OLTP datasources such as PostgreSQL, MySQL, Microsoft SQL Server, and Oracle
  • Cloud Data Warehouses and other OLAP datasources such as Amazon Redshift, Snowflake, and Google BigQuery
  • NoSQL datasources such as MongoDB, MongoDB Atlas, and Azure Cosmos DB
  • Persistent event-streaming platforms such as Apache Kafka (Amazon MSK and Confluent)
  • Distributed storage datasets (e.g., Data Lakes) such as Amazon S3, Apache Hive, and AWS Glue Data Catalogs
  • Business Intelligence (BI), dashboards, and data visualization sources such as Looker, Tableau, and Microsoft Power BI
  • ETL sources, such as Apache Spark, Apache Airflow, Apache NiFi, and dbt

DataHub on AWS

DataHub’s convenient AWS setup guide covers options to deploy DataHub to AWS. For this post, I have hosted DataHub on Kubernetes, using Amazon Elastic Kubernetes Service (Amazon EKS). Alternately, you could choose Google Kubernetes Engine (GKE) on Google Cloud or Azure Kubernetes Service (AKS) on Microsoft Azure.

Conveniently, DataHub offers a Helm chart, making deployment to Kubernetes straightforward. Furthermore, Helm charts are easily integrated with popular CI/CD tools. For this post, I’ve used ArgoCD, the declarative GitOps continuous delivery tool for Kubernetes, to deploy the DataHub Helm charts to Amazon EKS.

ArgoCD UI showing DataHub and its dependencies deployed to Amazon EKS

According to the documentation, DataHub consists of four main components: GMS, MAE Consumer (optional), MCE Consumer (optional), and Frontend. Kubernetes deployment for each of the components is defined as sub-charts under the main DataHub Helm chart.

External Storage Layer Dependencies

Four external storage layer dependencies power the main DataHub components: Kafka, Local DB (MySQL, Postgres, or MariaDB), Search Index (Elasticsearch), and Graph Index (Neo4j or Elasticsearch). DataHub has provided a separate DataHub Prerequisites Helm chart for the dependencies. The dependencies must be deployed before deploying DataHub.

Alternately, you can substitute AWS managed services for the external storage layer dependencies, which is also detailed in the Deploying to AWS documentation. AWS managed service dependency substitutions include Amazon RDS for MySQL, Amazon OpenSearch (fka Amazon Elasticsearch), and Amazon Managed Streaming for Apache Kafka (Amazon MSK). According to DataHub, support for using AWS Neptune as the Graph Index is coming soon.

DataHub CLI and Plug-ins

DataHub comes with the datahub CLI, allowing you to perform many common operations on the command line. You can install and use the DataHub CLI within your development environment or integrate it with your CI/CD tooling.

Available DataHub CLI commands

DataHub uses a plugin architecture. Plugins allow you to install only the datasource dependencies you need. For example, if you want to ingest metadata from Amazon Athena, just install the Athena plugin: pip install 'acryl-datahub[athena]'. DataHub Source, Sink, and Transformer plugins can be displayed using the datahub check plugins CLI command.

Example list of DataHub Source plugins installed
Example list of DataHub Sink and Transformer plugins installed

Secure Metadata Ingestion

Often, datasources are not externally accessible for security reasons. Further, many datasources may not be accessible to individual users, especially in higher environments like UAT, Staging, and Production. They are only accessible to applications or CI/CD tooling. To overcome these limitations when extracting metadata with DataHub, I prefer to perform my DataHub-related development and testing locally but execute all DataHub ingestion securely on AWS.

In my local development environment, I use JetBrains PyCharm to author the Python and YAML-based DataHub configuration files and ingestion pipeline recipes, then commit those files to git and push them to a private GitHub repository. Finally, I use GitHub Actions to test DataHub files.

To run DataHub ingestion jobs and push the results to DataHub running in Kubernetes on Amazon EKS, I have built a custom Python-based Docker container. The container runs the DataHub CLI, required DataHub plugins, and any additional Python dependencies. The container’s pod has the appropriate AWS IAM permissions, using IAM Roles for Service Accounts (IRSA), to securely access datasources to ingest and the DataHub application.

Schedule and Monitor Pipelines

Scheduling and managing multiple metadata ingestion jobs on AWS is best handled with Apache Airflow with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Ingestion jobs run as Airflow DAG tasks, which call the EKS-based DataHub CLI container. With MWAA, datasource connections, credentials, and other sensitive configurations can be kept secure and not be exposed externally or in plain text.

When running the ingestion pipelines on AWS with DataHub, all communications between AWS-based datasources, ingestion jobs running in Airflow, and DataHub, should use secure private IP addressing and DNS resolution instead of transferring metadata over the Internet. Make sure to create all the necessary VPC peering connections, network route table configurations, and VPC endpoints to connect all relevant services.

SaaS services such as Snowflake or MongoDB Atlas, services provided by other Cloud Service Providers such as Google Cloud and Microsoft Azure, and datasources in corporate datasources require alternate networking and security strategies to access metadata securely.

AWS-based DataHub high-level architecture

Markup or Code?

According to the documentation, a DataHub recipe is a configuration file that tells ingestion scripts where to pull data from (source) and where to put it (sink). Recipes normally contain a source, sink, and transformers configuration section. Mark-up language-based job automation written in YAML, JSON, or Domain Specific Languages (DSLs) is often an alternative to writing code. DataHub recipes can be written in YAML. The example recipe shown below is used to ingest metadata from an Amazon RDS for PostgreSQL database, running on AWS.

YAML-based recipes can also use automatic environment variable expansion for convenience, automation, and security. It is considered best practice to secure sensitive configuration values, such as database credentials, in a secure location and reference them as environment variables. For example, note the server: ${DATAHUB_REST_ENDPOINT} entry in the sink section below. The DATAHUB_REST_ENDPOINT environment variable is set ahead of time and re-used for all ingestion jobs. Sensitive database connection information has also been variablized and stored separately.

# Purpose: DataHub example recipe for PostgreSQL datasource
# Author: Gary A. Stafford
# Date: March 2022
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/postgres
source:
type: postgres
config:
# Coordinates
host_port: ${DB_HOST_PORT}
database: tickit
# Credentials
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
# Options
profiling:
enabled: true
# Environment
env: DEV
# see https://datahubproject.io/docs/metadata-ingestion/transformers/#adding-a-set-of-tags
transformers:
type: "simple_add_dataset_tags"
config:
tag_urns:
"urn:li:tag:AWS"
"urn:li:tag:${ACCOUNT_ID}"
"urn:li:tag:us-east-1"
type: "pattern_add_dataset_terms"
config:
term_pattern:
rules:
".*users.*": ["urn:li:glossaryTerm:Classification.Sensitive"]
type: "simple_add_dataset_ownership"
config:
owner_urns:
"urn:li:corpuser:Database Administrators"
ownership_type: "DATAOWNER"
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
config:
server: ${DATAHUB_REST_ENDPOINT}
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/reporting_telemetry/
pipeline_name: "postgres-pipeline-tickit"
reporting:
type: "datahub"
config:
datahub_api:
server: ${DATAHUB_REST_ENDPOINT}

Using Python

You can configure and run a pipeline entirely from within a custom Python script using DataHub’s Python API as an alternative to YAML. Below, we see two nearly identical ingestion recipes to the YAML above, written in Python. Writing ingestion pipeline logic programmatically gives you increased flexibility for automation, error checking, unit-testing, and notification. Below is a basic pipeline written in Python. The code is functional, but not very Pythonic, secure, scalable, or Production ready.

# Purpose: Simple programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
# Reference: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/library/programatic_pipeline.py
from datahub.ingestion.run.pipeline import Pipeline
# The pipeline configuration is similar to the recipe YAML files provided to the CLI tool.
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": "demo-instance.abcd1234.us-east-1.rds.amazonaws.com:5432",
"database": "tickit",
"username": "datahub",
"password": "My5up3r53cr3tPa55w0rd",
"env": "DEV",
"profiling": {
"enabled": "true"
}
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:AWS",
f"urn:li:tag:111222333444",
f"urn:li:tag:us-east-1"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:Database Administrators"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://192.168.111.222:33333&quot;
}
}
}
)
# Run the pipeline and report the results.
pipeline.run()
pipeline.pretty_print_summary()

The second version of the same pipeline is more Production ready. The code is more Pythonic in nature and makes use of error checking, logging, and the AWS Systems Manager (SSM) Parameter Store. Like recipes written in YAML, environment variables can be used for convenience and security. In this example, commonly reused and sensitive connection configuration items have been extracted and placed in the SSM Parameter Store. Additional configuration is pulled from the environment, such as AWS Account ID and AWS Region. The script loads these values at runtime.

# Purpose: Programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
import json
import logging
import boto3
from botocore.exceptions import ClientError
from datahub.ingestion.run.pipeline import Pipeline
logging.basicConfig(
format="[%(asctime)s] %(levelname)s – %(message)s", level=logging.INFO
)
def main():
sts_client = boto3.client("sts")
params = get_parameters()
params["owner"] = "Database Administrators"
params["environment"] = "DEV"
params["database"] = "tickit"
params["region"] = sts_client.meta.region_name
params["account"] = sts_client.get_caller_identity()["Account"]
logging.info(f"Params: {json.dumps(params, indent=4, sort_keys=True)}")
ingestion_pipeline = create_pipeline(params)
run_pipeline(ingestion_pipeline)
def create_pipeline(params) -> Pipeline:
"""Constructs a Pipeline for a PostgreSQL Source and a DataHub Sink
:return: instance of datahub.ingestion.run.pipeline
"""
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": params.get("/datahub_demo/postgres_host_port_tickit"),
"database": params.get("database"),
"username": params.get("/datahub_demo/postgres_username_tickit"),
"password": params.get("/datahub_demo/postgres_password_tickit"),
"profiling": {
"enabled": "true"
},
"env": params.get("environment"),
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:{params.get('account')}",
f"urn:li:tag:{params.get('region')}"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:{params.get('owner')}"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": params.get("/datahub_demo/datahub_rest_endpoint_public")
}
}
}
)
return pipeline
def run_pipeline(pipeline):
"""Runs the ingestion pipeline and prints summary of the results
:param pipeline: instance of datahub.ingestion.run.pipeline
:return:
"""
pipeline.run()
pipeline.pretty_print_summary()
def get_parameters() -> dict:
"""
Load parameter values from AWS Systems Manager (SSM) Parameter Store
:return: dict of parameter k/v's
"""
ssm_client = boto3.client("ssm")
params: dict = {}
try:
# make a single SSM API call for all parameters
response = ssm_client.get_parameters_by_path(
Path="/datahub_demo"
)
# create a dictionary of parameter k/v's
for param in response.get("Parameters"):
params[param["Name"]] = param["Value"]
logging.debug(f"Params: {params}")
except ClientError as e:
logging.error(e)
exit(1)
return params
if __name__ == '__main__':
main()

Sinking to DataHub

When syncing metadata to DataHub, you have two choices, the GMS REST API or Kafka. According to DataHub, the advantage of the REST-based interface is that any errors can immediately be reported. On the other hand, the advantage of the Kafka-based interface is that it is asynchronous and can handle higher throughput. For this post, I am DataHub’s REST API.

DataHub ingestion pipeline results for a Microsoft SQL Server datasource
Another example of a DataHub ingestion pipeline results for a Google BigQuery datasource

Column-level Metadata

In addition to column names and data types, it is possible to extract column descriptions and key types from certain datasources. Column descriptions, tags, and glossary terms can also be input through the DataHub UI. Below, we see an example of an Amazon Redshift fact table, whose table and column descriptions were ingested as part of the metadata.

Amazon Redshift fact table showing column-level metadata, tags, owners, and documentation

Business Glossary

DataHub can assign business glossary terms to entities. The DataHub Business Glossary plugin pulls business glossary metadata from a YAML-based configuration file.

# see sample: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/bootstrap_data/business_glossary.yml
version: 1
source: DataHub
owners:
users:
datahub
url: "https://github.com/datahub-project/datahub/"
nodes:
name: Classification
description: A set of terms related to Data Classification
terms:
name: Sensitive
description: Sensitive Data
custom_properties:
is_confidential: false
name: Confidential
description: Confidential Data
custom_properties:
is_confidential: true
name: HighlyConfidential
description: Highly Confidential Data
custom_properties:
is_confidential: true
name: PersonalInformation
description: All terms related to personal information
owners:
users:
datahub
terms:
name: ID
description: An individual's unqiue identifier
inherits:
Classification.Sensitive
name: Name
description: An individual's Name
inherits:
Classification.Sensitive
name: SSN
description: An individual's SSN
inherits:
Classification.Confidential
name: DriverLicense
description: An individual's Driver License ID
inherits:
Classification.Confidential
name: Email
description: An individual's email address
inherits:
Classification.Confidential
name: Address
description: A physical address
name: Gender
description: The gender identity of the individual
inherits:
Classification.Sensitive

Business glossary terms can be reviewed in the Glossary Terms tab of the DataHub’s UI. Below, we see the three terms associated with the Classification glossary node: Confidential, HighlyConfidential, and Sensitive.

Example of a related set of terms in DataHub’s Business Glossary

We can search for entities inventoried in DataHub using their assigned business glossary terms.

Dataset search results based on a term in DataHub’s Business Glossary

Finally, we see an example of an AWS Athena data catalog table with business glossary terms applied to columns within the table’s schema.

AWS Athena table showing column-level descriptions, glossary terms, tags, owners, and documentation

SQL-based Profiler

DataHub also can extract statistics about entities in DataHub using the SQL-based Profiler. According to the DataHub documentation, the Profiler can extract the following:

  • Row and column counts for each table
  • Column null counts and proportions
  • Column distinct counts and proportions
  • Column min, max, mean, median, standard deviation, quantile values
  • Column histograms or frequencies of unique values

In addition, we can also track the historical stats for each profiled entity each time metadata is ingested.

Amazon Redshift fact table showing SQL-based profiler column-level statistics
Another example, a Google BigQuery table showing SQL-based profiler column-level statistics

Data Lineage

DataHub’s data lineage features allow us to view upstream and downstream relationships between different types of entities. DataHub can trace lineage across multiple platforms, datasets, pipelines, charts, and dashboards.

Below, we see a simple example of dataset entity-to-entity lineage in Amazon Redshift and then Apache Spark on Amazon EMR. The fact table has a downstream relationship to four database views. The views are based on SQL queries that include the upstream table as a datasource.

Visual lineage view of Amazon Redshift fact table and its four downstream view dependencies
Another visual lineage example of an Apache Spark job with Apache Hive tables as both the source and sink

DataHub Analytics

DataHub provides basic metadata quality and usage analytics in the DataHub UI: user activity, counts of datasource types, business glossary terms, environments, and actions.

Examples of DataHub’s metadata quality and usage analytics capabilities
More examples of DataHub’s metadata quality and usage analytics capabilities

Conclusion

In this post, we explored the features of a data catalog and learned about some of the leading commercial and open-source data catalogs. Next, we learned how DataHub could collect, organize, enrich, and search metadata across multiple datasources. Lastly, we discovered how easy it is to catalog metadata from datasources spread across multiple CSP, SaaS providers, and corporate data centers, and centralize those results in DataHub.

In addition to the basic features reviewed in this post, DataHub offers a growing number of additional capabilities, including GraphQL and Timeline APIs, robust authentication and authorization, application monitoring observability, and Great Expectations integration. All these qualities make DataHub an excellent choice for a data catalog.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Data Preparation on AWS: Comparing Available ELT Options to Cleanse and Normalize Data

Comparing the features and performance of different AWS analytics services for Extract, Load, Transform (ELT)

Introduction

According to Wikipedia, “Extract, load, transform (ELT) is an alternative to extract, transform, load (ETL) used with data lake implementations. In contrast to ETL, in ELT models the data is not transformed on entry to the data lake but stored in its original raw format. This enables faster loading times. However, ELT requires sufficient processing power within the data processing engine to carry out the transformation on demand, to return the results in a timely manner.

As capital investments and customer demand continue to drive the growth of the cloud-based analytics market, the choice of tools seems endless, and that can be a problem. Customers face a constant barrage of commercial and open-source tools for their batch, streaming, and interactive exploratory data analytics needs. The major Cloud Service Providers (CSPs) have even grown to a point where they now offer multiple services to accomplish similar analytics tasks.

This post will examine the choice of analytics services available on AWS capable of performing ELT. Specifically, this post will compare the features and performance of AWS Glue Studio, Amazon Glue DataBrew, Amazon Athena, and Amazon EMR using multiple ELT use cases and service configurations.

Data pipeline architecture showing a choice of AWS ELT services

Analytics Use Case

We will address a simple yet common analytics challenge for this comparison — preparing a nightly data feed for analysis the next day. Each night a batch of approximately 1.2 GB of raw CSV-format healthcare data will be exported from a Patient Administration System (PAS) and uploaded to Amazon S3. The data must be cleansed, deduplicated, refined, normalized, and made available to the Data Science team the following morning. The team of Data Scientists will perform complex data analytics on the data and build machine learning models designed for early disease detection and prevention.

Sample Dataset

The dataset used for this comparison is generated by Synthea, an open-source patient population simulation. The high-quality, synthetic, realistic patient data and associated health records cover every aspect of healthcare. The dataset contains the patient-related healthcare history for allergies, care plans, conditions, devices, encounters, imaging studies, immunizations, medications, observations, organizations, patients, payers, procedures, providers, and supplies.

The Synthea dataset was first introduced in my March 2021 post examining the handling of sensitive PII data using Amazon Macie: Data Lakes: Discovery, Security, and Privacy of Sensitive Data.

The Synthea synthetic patient data is available in different record volumes and various data formats, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Since this post seeks to measure the performance of different AWS ELT-capable services, we will use a larger version of the Synthea dataset containing hundreds of thousands to millions of records.

AWS Glue Data Catalog

The dataset comprises nine uncompressed CSV files uploaded to Amazon S3 and cataloged to an AWS Glue Data Catalog, a persistent metadata store, using an AWS Glue Crawler.

Raw Synthea CSV data, in S3, cataloged in AWS Glue Data Catalog

Test Cases

We will use three data preparation test cases based on the Synthea dataset to examine the different AWS ELT-capable services.

Specifications for three different test cases

Test Case 1: Encounters for Symptom

An encounter is a health care contact between the patient and the provider responsible for diagnosing and treating the patient. In our first test case, we will process 1.26M encounters records for an ongoing study of patient symptoms by our Data Science team.

id date patient code description reasoncode reasondescription
714fd61a-f9fd-43ff-87b9-3cc45a3f1e53 2014-01-09 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
23e07532-8b96-4d05-b14e-d4c5a5288ed2 2014-08-18 33f33990-ae8b-4be8-938f-e47ad473abfe 185349003 Outpatient Encounter
45044100-aaba-4209-8ad1-15383c76842d 2015-07-12 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 36971009 Sinusitis (disorder)
ffdddbfb-35e8-4a74-a801-89e97feed2f3 2014-08-12 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
352d1693-591a-4615-9b1b-f145648f49cc 2016-05-25 36d131ee-dd5b-4acb-acbe-19961c32c099 185349003 Outpatient Encounter
4620bd2f-8010-46a9-82ab-8f25eb621c37 2016-10-07 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 195662009 Acute viral pharyngitis (disorder)
815494d8-2570-4918-a8de-fd4000d8100f 2010-08-02 660bec03-9e58-47f2-98b9-2f1c564f3838 698314001 Consultation for treatment
67ec5c2d-f41e-4538-adbe-8c06c71ddc35 2010-11-22 660bec03-9e58-47f2-98b9-2f1c564f3838 170258001 Outpatient Encounter
dbe481ce-b961-4f43-ac0a-07fa8cfa8bdd 2012-11-21 660bec03-9e58-47f2-98b9-2f1c564f3838 50849002 Emergency room admission
b5f1ab7e-5e67-4070-bcf0-52451eb20551 2013-12-04 660bec03-9e58-47f2-98b9-2f1c564f3838 185345009 Encounter for symptom 10509002 Acute bronchitis (disorder)
view raw encounters.csv hosted with ❤ by GitHub
Sample of raw encounters data

Data preparation includes the following steps:

  1. Load 1.26M encounter records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Select only the records where the description column contains “Encounter for symptom.”
  4. Remove any rows with an empty reasoncodes column.
  5. Extract a new year, month, and day column from the date column.
  6. Remove the date column.
  7. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  8. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  9. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 2: Observations

Clinical observations ensure that treatment plans are up-to-date and correctly administered and allow healthcare staff to carry out timely and regular bedside assessments. We will process 5.38M encounters records for our Data Science team in our second test case.

date patient encounter code description value units
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8302-2 Body Height 175.76 cm
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 29463-7 Body Weight 56.51 kg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 39156-5 Body Mass Index 18.29 kg/m2
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8480-6 Systolic Blood Pressure 119.0 mmHg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8462-4 Diastolic Blood Pressure 77.0 mmHg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8302-2 Body Height 177.25 cm
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 29463-7 Body Weight 59.87 kg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 39156-5 Body Mass Index 19.05 kg/m2
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8480-6 Systolic Blood Pressure 113.0 mmHg
2012-03-26 36d131ee-dd5b-4acb-acbe-19961c32c099 296a1fd4-56de-451c-a5fe-b50f9a18472d 8302-2 Body Height 174.17 cm
Sample of raw observations data

Data preparation includes the following steps:

  1. Load 5.38M observation records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Extract a new year, month, and day column from the date column.
  4. Remove the date column.
  5. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  6. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  7. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 3: Sinusitis Study

A medical condition is a broad term that includes all diseases, lesions, and disorders. In our second test case, we will join the conditions records with the patient records and filter for any condition containing the term ‘sinusitis’ in preparation for our Data Science team.

start stop patient encounter code description
2012-09-05 2012-10-16 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 05a6ef43-d690-455e-ab2f-1ea19d902274 44465007 Sprain of ankle
2014-09-08 2014-09-28 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 1cdcbe46-caaf-4b3f-b58c-9ca9ccb13013 283371005 Laceration of forearm
2014-11-28 2014-12-13 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 b222e257-98da-4a1b-a46c-45d5ad01bbdc 195662009 Acute viral pharyngitis (disorder)
1980-01-09 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 40055000 Chronic sinusitis (disorder)
1989-06-25 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 201834006 Localized primary osteoarthritis of the hand
1996-01-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 196416002 Impacted molars
2016-02-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 748cda45-c267-46b2-b00d-3b405a44094e 15777000 Prediabetes
2016-04-27 2016-05-20 01858c8d-f81c-4a95-ab4f-bd79fb62b284 a64734f1-5b21-4a59-b2e8-ebfdb9058f8b 444814009 Viral sinusitis (disorder)
2014-02-06 2014-02-19 d32e9ad2-4ea1-4bb9-925d-c00fe85851ae c64d3637-8922-4531-bba5-f3051ece6354 43878008 Streptococcal sore throat (disorder)
1982-05-18 08858d24-52f2-41dd-9fe9-cbf1f77b28b2 3fff3d52-a769-475f-b01b-12622f4fee17 368581000119106 Neuropathy due to type 2 diabetes mellitus (disorder)
view raw conditions.csv hosted with ❤ by GitHub
Sample of raw conditions data

Data preparation includes the following steps:

  1. Load 483k condition records using the existing AWS Glue Data Catalog table.
  2. Inner join the condition records with the 132k patient records based on patient ID.
  3. Remove any duplicate records.
  4. Drop approximately 15 unneeded columns.
  5. Select only the records where the description column contains the term “sinusitis.”
  6. Remove any rows with empty ethnicity, race, gender, or marital columns.
  7. Create a new column, condition_age, based on a calculation of the age in days at which the patient’s condition was diagnosed.
  8. Write the resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet-format files. No partitions are necessary.
  9. Given the small resultset, bucket the data such that only one file is written to minimize the impact of too many small files on future query performance.
  10. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog.

AWS ELT Options

There are numerous options on AWS to handle the batch transformation use case described above; a non-exhaustive list includes:

  1. AWS Glue Studio (UI-driven with AWS Glue PySpark Extensions)
  2. Amazon Glue DataBrew
  3. Amazon Athena
  4. Amazon EMR with Apache Spark
  5. AWS Glue Studio (Apache Spark script)
  6. AWS Glue Jobs (Legacy jobs)
  7. Amazon EMR with Presto
  8. Amazon EMR with Trino
  9. Amazon EMR with Hive
  10. AWS Step Functions and AWS Lambda
  11. Amazon Redshift Spectrum
  12. Partner solutions on AWS, such as Databricks, Snowflake, Upsolver, StreamSets, Stitch, and Fivetran
  13. Self-managed custom solutions using a combination of OSS, such as dbt, Airbyte, Dagster, Meltano, Apache NiFi, Apache Drill, Apache Beam, Pandas, Apache Airflow, and Kubernetes

For this comparison, we will choose the first five options listed above to develop our ELT data preparation pipelines: AWS Glue Studio (UI-driven job creation with AWS Glue PySpark Extensions), Amazon Glue DataBrew, Amazon Athena, Amazon EMR with Apache Spark, and AWS Glue Studio (Apache Spark script).

Data pipeline architecture showing a choice of AWS ELT services

AWS Glue Studio

According to the documentation, “AWS Glue Studio is a new graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. You can visually compose data transformation workflows and seamlessly run them on AWS Glue’s Apache Spark-based serverless ETL engine. You can inspect the schema and data results in each step of the job.

AWS Glue Studio’s visual job creation capability uses the AWS Glue PySpark Extensions, an extension of the PySpark Python dialect for scripting ETL jobs. The extensions provide easier integration with AWS Glue Data Catalog and other AWS-managed data services. As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Spark scripts with AWS Glue Studio. In fact, we can use the exact same scripts run on Amazon EMR.

For the tests, we are using the G.2X worker type, Glue version 3.0 (Spark 3.1.1 and Python 3.7), and Python as the language choice for this comparison. We will test three worker configurations using both UI-driven job creation with AWS Glue PySpark Extensions and Apache Spark script options:

  • 10 workers with a maximum of 20 DPUs
  • 20 workers with a maximum of 40 DPUs
  • 40 workers with a maximum of 80 DPUs
AWS Glue Studio visual job creation UI for Test Case 3: Sinusitis Study

AWS Glue Studio Spark job details for Test Case 2: Observations

AWS Glue Studio job runs for Test Case 2: Observations

AWS Glue DataBrew

According to the documentation, “AWS Glue DataBrew is a visual data preparation tool that enables users to clean and normalize data without writing any code. Using DataBrew helps reduce the time it takes to prepare data for analytics and machine learning (ML) by up to 80 percent, compared to custom-developed data preparation. You can choose from over 250 ready-made transformations to automate data preparation tasks, such as filtering anomalies, converting data to standard formats, and correcting invalid values.

DataBrew allows you to set the maximum number of DataBrew nodes that can be allocated when a job runs. For this comparison, we will test three different node configurations:

  • 3 maximum nodes
  • 10 maximum nodes
  • 20 maximum nodes
AWS Glue DataBrew Project for Test Case 3: Sinusitis Study

AWS Glue DataBrew Recipe for Test Case 1: Encounters for Symptom

AWS Glue DataBrew recipe job runs for Test Case 1: Encounters for Symptom

Amazon Athena

According to the documentation, “Athena helps you analyze unstructured, semi-structured, and structured data stored in Amazon S3. Examples include CSV, JSON, or columnar data formats such as Apache Parquet and Apache ORC. You can use Athena to run ad-hoc queries using ANSI SQL, without the need to aggregate or load the data into Athena.

Although Athena is classified as an ad-hoc query engine, using a CREATE TABLE AS SELECT (CTAS) query, we can create a new table in the AWS Glue Data Catalog and write to Amazon S3 from the results of a SELECT statement from another query. That other query statement performs a transformation on the data using SQL.

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 2: Observations

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 3: Sinusitis Study

Amazon Athena is a fully managed AWS service and has no performance settings to adjust or monitor.

Amazon Athena CTAS statement for Test Case 1: Encounters for Symptom

Parquet data partitioned by year in Amazon S3 for Test Case 1: Encounters for Symptom, using Athena

CTAS and Partitions

A notable limitation of Amazon Athena for the batch use case is the 100 partition limit with CTAS queries. Athena [only] supports writing to 100 unique partition and bucket combinations with CTAS. Partitioned by year, month, and day, the observations test case requires 2,558 partitions, and the observations test case requires 10,433 partitions. There is a recommended workaround using an INSERT INTO statement. However, the workaround requires additional SQL logic, computation, and most important cost. It is not practical, in my opinion, compared to other methods when a higher number of partitions are needed. To avoid the partition limit with CTAS, we will only partition by year and bucket by month when using Athena. Take this limitation into account when comparing the final results.

Amazon EMR with Apache Spark

According to the documentation, “Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. You can quickly and easily create managed Spark clusters from the AWS Management Console, AWS CLI, or the Amazon EMR API.

For this comparison, we are using two different Spark 3.1.2 EMR clusters:

  • (1) r5.xlarge Master node and (2) r5.2xlarge Core nodes
  • (1) r5.2xlarge Master node and (4) r5.2xlarge Core nodes

All Spark jobs are written in both Python (PySpark) and Scala. We are using the AWS Glue Data Catalog as the metastore for Spark SQL instead of Apache Hive.

4-node Amazon EMR cluster shown in Amazon EMR Management Console

Completed EMR Steps (Spark Jobs) on 4-node Amazon EMR cluster

# Purpose: Process data for sinusitis study using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "sinusitis_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
patient,
code,
description,
datediff(
date(substr(start, 1, 10)),
date(substr(birthdate, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions as c, patients as p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.bucketBy(1, "patient") \
.sortBy("patient", "code") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 3: Sinusitis Study

# Purpose: Process encounters dataset using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "encounter_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition",
"true") \
.config("hive.exec.dynamic.partition.mode",
"nonstrict") \
.config("hive.exec.max.dynamic.partitions",
"10000") \
.config("hive.exec.max.dynamic.partitions.pernode",
"10000") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
id,
patient,
code,
description,
reasoncode,
reasondescription,
year(date) as year,
month(date) as month,
day(date) as day
FROM encounters
WHERE description='Encounter for symptom';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.partitionBy("year", "month", "day") \
.bucketBy(1, "patient") \
.sortBy("patient") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 1: Encounters for Symptom

package main.spark.demo
// Purpose: Process observations dataset using Spark on Amazon EMR with Scala
// Author: Gary A. Stafford
// Date: 2022-03-06
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object Observations {
def main(args: Array[String]): Unit = {
val (spark: SparkSession, sc: SparkContext) = createSession
performELT(spark, sc)
}
private def createSession = {
val spark: SparkSession = SparkSession.builder
.appName("Observations ELT App")
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
.config("hive.exec.dynamic.partition",
"true")
.config("hive.exec.dynamic.partition.mode",
"nonstrict")
.config("hive.exec.max.dynamic.partitions",
"10000")
.config("hive.exec.max.dynamic.partitions.pernode",
"10000")
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("INFO")
(spark, sc)
}
private def performELT(spark: SparkSession, sc: SparkContext) = {
val tableName: String = sc.getConf.get("spark.executorEnv.TABLE_NAME")
val dataLakeBucket: String = sc.getConf.get("spark.executorEnv.DATA_LAKE_BUCKET")
spark.sql("USE synthea_patient_big_data;")
val sql_query_data: String =
"""
SELECT DISTINCT
patient,
encounter,
code,
description,
value,
units,
year(date) as year,
month(date) as month,
day(date) as day
FROM observations
WHERE date <> 'date';
"""
val observationsDF: DataFrame = spark
.sql(sql_query_data)
observationsDF
.coalesce(1)
.write
.partitionBy("year", "month", "day")
.bucketBy(1, "patient")
.sortBy("patient")
.mode("overwrite")
.format("parquet")
.option("path", s"s3://${dataLakeBucket}/${tableName}/")
.saveAsTable(tableName = tableName)
spark.sql(s"ALTER TABLE ${tableName} SET TBLPROPERTIES ('classification'='parquet');")
}
}
Spark jobs written in Scala had nearly identical execution times, such as Test Case 2: Observations

Partitions in the AWS Glue Data Catalog table for Test Case 1: Encounters for Symptom

Results

Data pipelines were developed and tested for each of the three test cases using the five chosen AWS ELT services and configuration variations. Each pipeline was then run 3–5 times, for a total of approximately 150 runs. The resulting AWS Glue Data Catalog table and data in Amazon S3 were deleted between each pipeline run. Each new run created a new data catalog table and wrote new results to Amazon S3. The median execution times from these tests are shown below.

Number of raw and processed records for each test case

Overall results (see details below) — lower times are better

Although we can make some general observations about the execution times of the chosen AWS services, the results are not meant to be a definitive guide to performance. An accurate comparison would require a deeper understanding of how each of these managed services works under the hood, in order to both optimize and balance their compute profiles correctly.

Amazon Athena

The Resultset column contains the final number of records written to Amazon S3 by Athena. The results contain the data pipeline’s median execution time and any additional data points.

Results for Amazon Athena data pipelines

AWS Glue Studio (AWS Glue PySpark Extensions)

Tests were run with three different configurations for AWS Glue Studio using the graphical interface for creating jobs with AWS Glue PySpark Extensions. Times for each configuration were nearly identical.

Results for data pipelines using AWS Glue Studio with AWS Glue PySpark Extensions

AWS Glue Studio (Apache PySpark script)

As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Apache Spark scripts with AWS Glue Studio. The tests were run with the same three configurations as above. The execution times compared to the Amazon EMR tests, below, are almost identical.

Results for data pipelines using PySpark scripts on AWS Glue Studio

Amazon EMR with Apache Spark

Tests were run with three different configurations for Amazon EMR with Apache Spark using PySpark. The first set of results is for the 2-node EMR cluster. The second set of results is for the 4-node cluster. The third set of results is for the same 4-node cluster in which the data was not bucketed into a single file within each partition. Compare the execution times and the number of objects against the previous set of results. Too many small files can negatively impact query performance.

Results for data pipelines using Amazon EMR with Apache Spark — times for PySpark scripts

It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had similar execution times for all three test cases.

Results for data pipelines using Amazon EMR with Apache Spark — Python vs. Scala

Amazon Glue DataBrew

Tests were run with three different configurations Amazon Glue DataBrew, including 3, 10, and 20 maximum nodes. Times for each configuration were nearly identical.

Results for data pipelines using Amazon Glue DataBrew

Observations

  1. All tested AWS services can read and write to an AWS Glue Data Catalog and the underlying datastore, Amazon S3. In addition, they all work with the most common analytics data file formats.
  2. All tested AWS services have rich APIs providing access through the AWS CLI and SDKs, which support multiple programming languages.
  3. Overall, AWS Glue Studio, using the AWS Glue PySpark Extensions, appears to be the most capable ELT tool of the five services tested and with the best performance.
  4. Both AWS Glue DataBrew and AWS Glue Studio are no-code or low-code services, democratizing access to data for non-programmers. Conversely, Amazon Athena requires knowledge of ANSI SQL, and Amazon EMR with Apache Spark requires knowledge of Scala or Python. Be cognizant of the potential trade-offs from using no-code or low-code services on observability, configuration control, and automation.
  5. Both AWS Glue DataBrew and AWS Glue Studio can write a custom Parquet writer type optimized for Dynamic Frames, GlueParquet. One potential advantage, a pre-computed schema is not required before writing.
  6. There is a slight ‘cold-start’ with Glue Studio. Studio startup times ranged from 7 seconds to 2 minutes and 4 seconds in the tests. However, the lower execution time of AWS Glue Studio compared to Amazon EMR with Spark and AWS Glue DataBrew in the tests offsets any initial cold-start time, in my opinion.
  7. Changing the maximum number of units from 3 to 10 to 20 for AWS Glue DataBrew made negligible differences in job execution times. Given the nearly identical execution times, it is unclear exactly how many units are being used by the job. More importantly, how many DataBrew node hours we are being billed for. These are some of the trade-offs with a fully-managed service — visibility and fine-tuning configuration.
  8. Similarly, with AWS Glue Studio, using either 10 workers w/ max. 20 DPUs, 20 workers w/ max. 40 DPUs, or 40 workers w/ max. 80 DPUs resulted in nearly identical executions times.
  9. Amazon Athena had the fastest execution times but is limited by the 100 partition limit for large CTAS resultsets. Athena is not practical, in my opinion, compared to other ELT methods, when a higher number of partitions are needed.
  10. It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had almost identical execution times for all three test cases.
  11. Using Amazon EMR with EC2 instances takes about 9 minutes to provision a new cluster for this comparison fully. Given nearly identical execution times to AWS Glue Studio with Apache Spark scripts, Glue has the clear advantage of nearly instantaneous startup times.
  12. AWS recently announced Amazon EMR Serverless. Although this service is still in Preview, this new version of EMR could potentially reduce or eliminate the lengthy startup time for ephemeral clusters requirements.
  13. Although not discussed, scheduling the data pipelines to run each night was a requirement for our use case. AWS Glue Studio jobs and AWS Glue DataBrew jobs are schedulable from those services. For Amazon EMR and Amazon Athena, we could use Amazon Managed Workflows for Apache Airflow (MWAA), AWS Data Pipeline, or AWS Step Functions combined with Amazon CloudWatch Events Rules to schedule the data pipelines.

Conclusion

Customers have many options for ELT — the cleansing, deduplication, refinement, and normalization of raw data. We examined chosen services on AWS, each capable of handling the analytics use case presented. The best choice of tools depends on your specific ELT use case and performance requirements.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Video Demonstration: Lakehouse Automation on AWS with Apache Airflow

Programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow

Introduction

In the following video demonstration, we will learn how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow. Since we are on AWS, we will be using the fully-managed Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Using Airflow, we will COPY raw data into staging tables, then merge that staging data into a series of tables. We will then load incremental data into Redshift on a regular schedule. Next, we will join and aggregate data from several tables and UNLOAD the resulting dataset to an Amazon S3-based data lake. Lastly, we will catalog the data in S3 using AWS Glue and query with Amazon Athena.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL statements, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs included in the GitHub project are:

Demonstration DAGs as seen in MWAA Airflow UI

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Video Demonstration: Building a Data Lake with Apache Airflow

Build a simple Data Lake on AWS using a combination of services, including Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue, AWS Glue Studio, Amazon Athena, and Amazon S3

Introduction

In the following video demonstration, we will build a simple data lake on AWS using a combination of services, including Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue Data Catalog, AWS Glue Crawlers, AWS Glue Jobs, AWS Glue Studio, Amazon Athena, Amazon Relational Database Service (Amazon RDS), and Amazon S3.

Using a series of Airflow DAGs (Directed Acyclic Graphs), we will catalog and move data from three separate data sources into our Amazon S3-based data lake. Once in the data lake, we will perform ETL (or more accurately ELT) on the raw data — cleansing, augmenting, and preparing it for data analytics. Finally, we will perform aggregations on the refined data and write those final datasets back to our data lake. The data lake will be organized around the data lake pattern of bronze (aka raw), silver (aka refined), and gold (aka aggregated) data, popularized by Databricks.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL files, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs shown in the video demonstration have been renamed for easier project management within the Airflow UI. The DAGs included in the GitHub project are as follows:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Video Demonstration: Building a Data Lake on AWS

Build a simple Data Lake on AWS using a combination of services, including AWS Glue, AWS Glue Studio, Amazon Athena, and Amazon S3

Introduction

In the following video demonstration, we will build a simple data lake on AWS using a combination of services, including AWS Glue Data Catalog, AWS Glue Crawlers, AWS Glue Jobs, AWS Glue Studio, Amazon Athena, Amazon Relational Database Service (Amazon RDS), and Amazon S3.

We will catalog and move data from three separate data sources into our Amazon S3-based data lake. Once in the data lake, we will perform ETL (or more accurately ELT) on the raw data — cleansing, augmenting, and preparing it for data analytics. Finally, we will perform aggregations on the refined data and write those final datasets back to our data lake. The data lake will be organized around the data lake pattern of bronze (aka raw), silver (aka refined), and gold (aka aggregated) data, popularized by Databricks.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the SQL statements, is open-sourced and located on GitHub.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Video Demonstration: Ahana Cloud for Presto on AWS using Apache Hive and AWS Glue

Using Ahana Cloud for Presto to perform analytics on AWS using both Apache Hive and AWS Glue as metastores

Introduction

The following series of five videos are an extended version of the demonstration featured in the October 2021 webinar, Build an Open Data Lake on AWS with Presto. An on-demand copy of the live webinar is available on Ahana.io, featuring Dipti Borkar (Ahana Co-Founder and CPO) and I.

In the demonstration, we will build a data lake on AWS using a combination of Ahana Cloud for Presto, Apache Hive, Apache Superset, Amazon S3, AWS Glue, and Amazon Athena. We then analyze the data in Apache Superset using Ahana Cloud for Presto.

Build an Open Data Lake on AWS with Presto

Demonstration

The demonstration is divided into five YouTube videos (playlist):

Ahana Cloud for Presto Demo — Part 1/5: Public GitHub Resources

Ahana Cloud for Presto Demo — Part 2/5: MoMa Datasource

Ahana Cloud for Presto Demo — Part 3/5: Ahana SaaS

Ahana Cloud for Presto Demo — Part 4/5: AWS Glue & Amazon

Ahana Cloud for Presto Demo — Part 5/5: PrestoDB & Apache Hive

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. In the webinar and the videos, the Apache Hive and AWS Glue data catalog tables contain an _athena or _presto suffix. For clarity, in the source code, I have changed those to indicate the metastore they are associated with, _hive or _glue, since either set of tables can be queried Presto. Additionally, in the webinar and the videos, the raw data files were uploaded to Amazon S3 in uncompressed CSV format; this is unnecessary. The CTAS SQL statements both expect GZIP-compressed CSV files. To save time and cost, upload the compressed files, as they are, to Amazon S3.

The following files are used in the demonstration:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , , , ,

Leave a comment

IoT Data Analytics at the Edge: Exploring the convergence of IoT, Data Analytics, and Edge Computing with Grafana, Mosquitto, and TimescaleDB on ARM-based devices

This post is a revised version of an earlier post, featuring major version updates of TimescaleDB (v1.7.4-pg12 to v2.0.0-pg12), Grafana (v7.1.5 to v7.5.2), and Mosquitto (v1.6.12 to v2.0.9). All source code and SQL scripts are revised. Note that TimeScaleDB has a current limitation/bug with Docker on ARM later than v2.0.0.

GMT IoT Edge Analytics Stack architecture (Image by author

The Edge

Edge computing is a fast-growing technology trend, which involves pushing compute capabilities to a network’s edge. Wikipedia describes edge computing as a distributed computing paradigm that brings computation and data storage closer to the location needed to improve response times and save bandwidth. The term edge commonly refers to a compute node at the edge of a network (edge device), sitting close to the data source and between that data source and external system such as the Cloud. In his recent post, 3 Advantages (And 1 Disadvantage) Of Edge Computing, well-known futurist Bernard Marr argues reduced bandwidth requirements, reduced latency, and enhanced security and privacy as three primary advantages of edge computing.

David Ricketts, Head of Marketing at Quiss Technology PLC, in his post, Cloud and Edge Computing — The Stats You Need to Know for 2018, estimates that the global edge computing market is expected to reach USD 6.72 billion by 2022 at a compound annual growth rate of a whopping 35.4 percent. Realizing the market potential, many major Cloud providers, edge device manufacturers, and integrators are rapidly expanding their edge compute capabilities. AWS, for example, currently offers more than a dozen services in their edge computing category.

Internet of Things

Edge computing is frequently associated with the Internet of Things (IoT). IoT devices, industrial equipment, and sensors generate data transmitted to other internal and external systems, often by way of edge nodes, such as an IoT Gateway. IoT devices typically generate time-series data. According to Wikipedia, a time series is a set of data points indexed in time order — a sequence taken at successive equally spaced points in time. IoT devices typically generate continuous high-volume streams of time-series data, often on a scale of millions of data points per second. IoT data characteristics require IoT platforms to minimally support temporal accuracy, high-volume ingestion and processing, efficient data compression and downsampling, and real-time querying capabilities.

Edge devices such as IoT Gateways, which aggregate and transmit IoT data from these devices to external systems, are generally lower-powered, with limited processors, memory, and storage. Accordingly, IoT platforms must satisfy all the requirements of IoT data while simultaneously supporting resource-constrained environments.

IoT Analytics at the Edge

Leading Cloud providers AWS, Azure, Google Cloud, IBM Cloud, Oracle Cloud, and Alibaba Cloud all offer IoT services. Many offer IoT services with edge computing capabilities. AWS offers AWS IoT Greengrass. Greengrass provides local compute, messaging, data management, sync, and machine learning (ML) inference capabilities to edge devices. Azure offers Azure IoT Edge. Azure IoT Edge provides the ability to run artificial intelligence (AI), Azure and third-party services, and custom business logic on edge devices using standard containers. Google Cloud offers Edge TPU. Edge TPU (Tensor Processing Unit) is Google’s purpose-built application-specific integrated circuit (ASIC), designed to run AI at the edge.

IoT Analytics

Many Cloud providers also offer IoT analytics as part of their suite of IoT services, although not at the edge. AWS offers AWS IoT Analytics, while Azure has Azure Time Series Insights. Google provides IoT analytics, indirectly, through downstream analytic systems and ad hoc analysis using Google BigQuery or advanced analytics and machine learning with Cloud Machine Learning Engine. These services generally all require data to be transmitted to the Cloud for analytics.

Cloud-centric IoT analytics platform data flow (Image by author)

The ability to analyze real-time, streaming IoT data at the edge is critical to a rapid feedback loop. IoT edge analytics can accelerate anomaly detection and remediation, improve predictive maintenance capabilities, and expedite proactive inventory replenishment.

IoT Edge Analytics Stack

In my opinion, the ideal IoT edge analytics stack is comprised of lightweight, purpose-built, easily deployable and manageable, platform- and programming language-agnostic, open-source software components. The minimal IoT edge analytics stack should include:

  1. Lightweight message broker;
  2. Purpose-built time-series database;
  3. ANSI-standard SQL ad-hoc query engine;
  4. Data visualization tool;
  5. Simple deployment and management framework;

Each component should be purpose-built for IoT.

Lightweight Message Broker

We will use Eclipse Mosquitto as our message broker. According to the project’s description, Mosquitto is an open-source message broker that implements the Message Queuing Telemetry Transport (MQTT) protocol versions 5.0, 3.1.1, and 3.1. Mosquitto is lightweight and suitable for use on all devices, from low-power single-board computers (SBCs) to full-powered servers.

MQTT Client Library

We will interact with Mosquitto using Eclipse Paho. According to the project, the Eclipse Paho project provides open-source, mainly client-side implementations of MQTT and MQTT-SN in a variety of programming languages. MQTT and MQTT for Sensor Networks (MQTT-SN) are light-weight publish/subscribe messaging transports for TCP/IP and connectionless protocols, such as UDP, respectively.

We will be using Paho’s Python Client. The Paho Python Client provides a client class with support for both MQTT v3.1 and v3.1.1 on Python 2.7 or 3.x. The client also provides helper functions to make publishing messages to an MQTT server straightforward.

Time-Series Database

Time-series databases are optimal for storing IoT data. According to InfluxData, makers of a leading time-series database, InfluxDB, a time-series database (TSDB), is a database optimized for time-stamped or time-series data. Time series data are simply measurements or events that are tracked, monitored, downsampled, and aggregated over time. Jiao Xian of Alibaba Cloud has authored an insightful post on the time-series database ecosystem, What Are Time Series Databases? A few leading Cloud providers offer purpose-built time-series databases, though they are not available at the edge. AWS offers Amazon Timestream, and Alibaba Cloud offers Time Series Database.

InfluxDB is an excellent choice for a time-series database. It was my first choice, along with TimescaleDB, when developing this stack. However, InfluxDB Flux’s apparent incompatibilities with some ARM-based architecture ruled it out for inclusion in the stack for this particular post.

We will use TimescaleDB as our time-series database. TimescaleDB is the leading open-source relational database for time-series data. Described as ‘PostgreSQL for time-series,’ TimescaleDB is based on PostgreSQL, which provides full ANSI SQL, rock-solid reliability, and a massive ecosystem. TimescaleDB claims to achieve 10–100x faster queries than PostgreSQL, InfluxDB, and MongoDB, with native optimizations for time-series analytics.

TimescaleDB is designed for performing analytical queries, both through its native support for PostgreSQL’s full range of SQL functionality and additional functions native to TimescaleDB. These time-series optimized functions include Median/Percentile, Cumulative Sum, Moving Average, Increase, Rate, Delta, Time Bucket, Histogram, and Gap Filling.

Ad-hoc Data Query Engine

We have the option of using psql, the terminal-based front-end to PostgreSQL, to execute ad-hoc queries against TimescaleDB. The psql front-end enables you to enter queries interactively, issue them to PostgreSQL, and see the query results.

View of psql terminal-based interface for querying the TimescaleDB database

We also have the option of using pgAdmin, specifically the biarms/pgadmin4 Docker version, to execute ad-hoc queries and perform most other database tasks. pgAdmin is the most popular open-source administration and development platform for PostgreSQL. While several popular Docker versions of pgAdmin only support Linux AMD64 architectures, the biarms/pgadmin4 Docker version supports ARM-based devices.

Dashboard view of TimescaleDB database from within pgAdmin UI

Executing a query against the TimescaleDB database using pgAdmin’s Query Tool

Data Visualization

For data visualization, we will use Grafana. Grafana allows you to query, visualize, alert on, and understand metrics no matter where they are stored. With Grafana, you can create, explore, and share dashboards, fostering a data-driven culture. Grafana allows you to define thresholds visually and get notified via Slack, PagerDuty, and more. Grafana supports dozens of data sources, including MySQL, PostgreSQL, Elasticsearch, InfluxDB, TimescaleDB, Graphite, Prometheus, Google BigQuery, GraphQL, and Oracle. Grafana is extensible through a large collection of plugins.

Example of Grafana dashboard showing the post’s IoT sensor data

Edge Deployment and Management Platform

Docker introduced the current industry standard for containers in 2013. Docker containers are a standardized unit of software that allows developers to isolate apps from their environment. We will use Docker to deploy the IoT edge analytics stack, referred to herein as the GTM Stack, composed of containerized versions of Grafana, TimescaleDB, Eclipse Mosquitto, and pgAdmin, to an ARMv7-based edge node. The acronym, GTM, comes from the three primary OSS projects composing the stack. The abbreviation also suggests Greenwich Mean Time, relating to the precise time-series nature of IoT data.

GMT IoT Edge Analytics Stack architecture (Image by author)

Running Docker Engine in swarm mode, we can use Docker to deploy the complete IoT edge analytics stack to the swarm, running on the edge node. The deploy command accepts a stack description in the form of a Docker Compose file, a YAML file used to configure the application’s services. With a single command, we can create and start all the services from the configuration file.

Source Code

All source code for this post is available on GitHub. Use the following command to git clone a local copy of the project. Note that the updated version of the source code for this post is in the v2021–03 branch.

git clone --branch v2021-03 --single-branch --depth 1 \
https://github.com/garystafford/iot-analytics-at-the-edge.git

IoT Devices

For this post, I have deployed three Linux ARM-based IoT devices, each connected to a sensor array. Each sensor array contains multiple analog and digital sensors. The sensors record temperature, humidity, air quality (liquefied petroleum gas (LPG), carbon monoxide (CO), and smoke), light, and motion. For more information on the IoT device and sensor hardware involved, please see my previous post.Getting Started with IoT Analytics on AWS
Analyze environmental sensor data from IoT devices in near real-time with AWS IoT Analyticstowardsdatascience.com

Each ARM-based IoT device runs a small Python3-based script, sensor_data_to_mosquitto.py, shown below.

import argparse
import json
import logging
import sys
import time
from datetime import datetime
import paho.mqtt.publish as publish
from getmac import get_mac_address
from pytz import timezone
from Sensors import Sensors
# Sensor to Mosquitto Script
# Author: Gary A. Stafford
# Date: 2021-03-26
# Usage: python3 sensor_data_to_mosquitto.py \
# –host "192.168.1.12" –port 1883 \
# –topic "sensor/output" –frequency 10
sensors = Sensors()
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
def main():
args = parse_args()
publish_message_to_db(args)
def get_readings():
sensors.led_state(0)
# Retrieve sensor readings
payload_dht = sensors.get_sensor_data_dht()
payload_gas = sensors.get_sensor_data_gas()
payload_light = sensors.get_sensor_data_light()
payload_motion = sensors.get_sensor_data_motion()
message = {
"device_id": get_mac_address(),
"time": datetime.now(timezone("UTC")),
"data": {
"temperature": payload_dht["temperature"],
"humidity": payload_dht["humidity"],
"lpg": payload_gas["lpg"],
"co": payload_gas["co"],
"smoke": payload_gas["smoke"],
"light": payload_light["light"],
"motion": payload_motion["motion"]
}
}
return message
def date_converter(o):
if isinstance(o, datetime):
return o.__str__()
def publish_message_to_db(args):
while True:
message = get_readings()
message_json = json.dumps(message, default=date_converter, sort_keys=True,
indent=None, separators=(',', ':'))
logger.debug(message_json)
try:
publish.single(args.topic, payload=message_json, hostname=args.host, port=args.port)
except Exception as error:
logger.error("Exception: {}".format(error))
finally:
time.sleep(args.frequency)
# Read in command-line parameters
def parse_args():
parser = argparse.ArgumentParser(description='Script arguments')
parser.add_argument('–host', help='Mosquitto host', default='localhost')
parser.add_argument('–port', help='Mosquitto port', type=int, default=1883)
parser.add_argument('–topic', help='Mosquitto topic', default='paho/test')
parser.add_argument('–frequency', help='Message frequency in seconds', type=int, default=5)
return parser.parse_args()
if __name__ == "__main__":
main()

The IoT devices’ script implements the Eclipse Paho MQTT Python client library. An MQTT message containing simultaneous readings from each sensor is sent to a Mosquitto topic on the edge node at a configurable frequency.

message = {
"device_id": get_mac_address(),
"time": datetime.now(timezone("UTC")),
"data": {
"temperature": payload_dht["temperature"],
"humidity": payload_dht["humidity"],
"lpg": payload_gas["lpg"],
"co": payload_gas["co"],
"smoke": payload_gas["smoke"],
"light": payload_light["light"],
"motion": payload_motion["motion"]
}
}

Below are the actual sensor readings sent by the IoT device as an MQTT message to the Mosquitto topic.

{
"data": {
"co": 0.0031827073092533685,
"humidity": 51.099998474121094,
"light": true,
"lpg": 0.005553622262501496,
"motion": false,
"smoke": 0.01449612738171321,
"temperature": 19.100000381469727
},
"device_id": "00:0f:00:70:91:0a",
"time": "2021-04-02 17:23:44.809046+00:00"
}

IoT Edge Node

For this post, I have deployed a single Linux ARM-based edge node. The three IoT devices containing sensor arrays communicate with the edge node over Wi-Fi. IoT devices could easily use an alternative communication protocol, such as BLE, LoRaWAN, or Ethernet. For more information on BLE and LoRaWAN, please see some of my previous posts:LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT and BLE and GATT for IoT: Getting Started with Bluetooth Low Energy (BLE) and Generic Attribute Profile (GATT) Specification for IoT.

The edge node also runs a small Python3-based script, mosquitto_to_timescaledb.py, shown below.

import argparse
import json
import logging
import sys
from datetime import datetime
import paho.mqtt.client as mqtt
import psycopg2
# Mosquitto to TimescaleDB Script
# Author: Gary A. Stafford
# Date: 2021-03-31
# Usage: python3 mosquitto_to_timescaledb.py \
# –msqt_topic "sensor/output –msqt_host "192.168.1.12" –msqt_port 1883 \
# –ts_host "192.168.1.12" –ts_port 5432 \
# –ts_username postgres –ts_password postgres1234 –ts_database demo_iot
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
args = argparse.Namespace
ts_connection: str = ""
def main():
global args
args = parse_args()
global ts_connection
ts_connection = "postgres://{}:{}@{}:{}/{}".format(args.ts_username, args.ts_password, args.ts_host,
args.ts_port, args.ts_database)
logger.debug("TimescaleDB connection: {}".format(ts_connection))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.msqt_host, args.msqt_port, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
logger.debug("Connected with result code {}".format(str(rc)))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(args.msqt_topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
logger.debug("Topic: {}, Message Payload: {}".format(msg.topic, str(msg.payload)))
publish_message_to_db(msg)
def date_converter(o):
if isinstance(o, datetime):
return o.__str__()
def publish_message_to_db(message):
message_payload = json.loads(message.payload)
# logger.debug("message.payload: {}".format(json.dumps(message_payload, default=date_converter)))
sql = """INSERT INTO sensor_data(time, device_id, temperature, humidity, lpg, co, smoke, light, motion)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);"""
data = (
message_payload["time"],
message_payload["device_id"],
message_payload["data"]["temperature"],
message_payload["data"]["humidity"],
message_payload["data"]["lpg"],
message_payload["data"]["co"],
message_payload["data"]["smoke"],
message_payload["data"]["light"],
message_payload["data"]["motion"]
)
try:
with psycopg2.connect(ts_connection, connect_timeout=3) as conn:
with conn.cursor() as curs:
try:
curs.execute(sql, data)
except psycopg2.Error as error:
logger.error("Exception: {}".format(error.pgerror))
except Exception as error:
logger.error("Exception: {}".format(error))
except psycopg2.OperationalError as error:
logger.error("Exception: {}".format(error.pgerror))
finally:
conn.close()
# Read in command-line parameters
def parse_args():
parser = argparse.ArgumentParser(description='Script arguments')
parser.add_argument('–msqt_topic', help='Mosquitto topic', default='paho/test')
parser.add_argument('–msqt_host', help='Mosquitto host', default='localhost')
parser.add_argument('–msqt_port', help='Mosquitto port', type=int, default=1883)
parser.add_argument('–ts_host', help='TimescaleDB host', default='localhost')
parser.add_argument('–ts_port', help='TimescaleDB port', type=int, default=5432)
parser.add_argument('–ts_username', help='TimescaleDB username', default='postgres')
parser.add_argument('–ts_password', help='TimescaleDB password', default='postgres1234')
parser.add_argument('–ts_database', help='TimescaleDB password', default='demo_iot')
return parser.parse_args()
if __name__ == "__main__":
main()

Like the IoT devices, the edge node’s script implements the Eclipse Paho MQTT Python client library. The script pulls MQTT messages off a Mosquitto topic(s), serializes the message payload to JSON, and writes the payload’s data to the TimescaleDB database. The edge node’s script accepts several arguments, which allow you to configure the necessary Mosquitto and TimescaleDB connection settings.

Why not use Telegraf?

Telegraf is a plugin-driven agent that collects, processes, aggregates, and writes metrics. There is a Telegraf output plugin, the PostgreSQL and TimescaleDB Output Plugin for Telegraf, produced by TimescaleDB. The plugin can replace the need to manage and maintain the above script. However, I chose not to use it because it is not yet an official Telegraf plugin. If the plugin was included in a Telegraf release, I would certainly encourage its use.

Script Management

Both Linux-based IoT devices and edge nodes run systemd system and service manager. To ensure the Python scripts keep running in the case of a system restart, we define a systemd unit. Units are objects that systemd knows how to manage. This is a standardized representation of system resources that can be managed by the suite of daemons and manipulated by the provided utilities. Each script has a systemd unit file. Below, we see the gtm_stack_mosquitto unit file, gtm_stack_mosquitto.service.

[Unit]
Description=GTM Stack – Sensor to Mosquitto Script
After=network.target
[Service]
ExecStart=/usr/bin/python3 -u sensor_data_to_mosquitto.py \
–host "192.168.1.12" –port 1883 –topic "sensor/output"
WorkingDirectory=/home/pi/iot-analytics-at-the-edge/scripts
StandardOutput=inherit
StandardError=inherit
Restart=always
User=pi
[Install]
WantedBy=multi-user.target

The gtm_stack_mosq_to_tmscl unit file, gtm_stack_mosq_to_tmscl.service, is nearly identical.

To install the gtm_stack_mosquitto.service systemd unit file on each IoT device, use the following commands:

SERVICE=gtm_stack_mosquitto
sudo cp systemctl/${SERVICE}.service /etc/systemd/system/
sudo systemctl start ${SERVICE}.service
sudo systemctl enable ${SERVICE}.service
# check status
systemctl status ${SERVICE}.service
ps aux | grep sensor_data_to_mosquitto.py
view raw systemd.sh hosted with ❤ by GitHub

Installing the gtm_stack_mosq_to_tmscl.service unit file on the edge node is nearly identical.

Docker Stack

The edge node runs the GTM Docker stack, stack.yml, in a swarm. As discussed earlier, the stack contains four containers: Eclipse Mosquitto, TimescaleDB, Grafana, and pgAdmin. The Mosquitto, TimescaleDB, and Grafana containers have paths within the containers bind-mounted to directories on the edge device. With bind-mounting, the container’s configuration and data will persist if the containers are removed and re-created. The containers are running on an isolated overlay network.

version: "3.9" # optional since v1.27.0
services:
timescaledb:
image: timescale/timescaledb:2.0.0-pg12
ports:
– "5432:5432/tcp"
networks:
– demo-iot-net
environment:
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: postgres1234
POSTGRES_DB: demo_iot
deploy:
restart_policy:
condition: on-failure
volumes:
– "$HOME/data/postgres:/var/lib/postgresql/data"
grafana:
image: grafana/grafana:7.5.2
ports:
– "3000:3000/tcp"
networks:
– demo-iot-net
deploy:
restart_policy:
condition: on-failure
volumes:
– "$HOME/data/grafana:/var/lib/grafana"
user: $ID:1
mosquitto:
image: eclipse-mosquitto:2.0.9
ports:
– "1883:1883/tcp"
networks:
– demo-iot-net
deploy:
restart_policy:
condition: on-failure
volumes:
– "$HOME/data/mosquitto/config:/mosquitto/config"
– "$HOME/data/mosquitto/data:/mosquitto/data"
– "$HOME/data/mosquitto/log:/mosquitto/log"
pgadmin:
image: biarms/pgadmin4:4.21
ports:
– "5050:5050/tcp"
networks:
– demo-iot-net
deploy:
restart_policy:
condition: on-failure
networks:
demo-iot-net:
view raw stack.yml hosted with ❤ by GitHub

The GTM Docker stack is installed using the following commands on the edge node. We will assume Docker and git are pre-installed on the edge node for this post.

# on edge node
git clone https://github.com/garystafford/iot-analytics-at-the-edge.git
# build required directories
mkdir -p ~/data/postgres
mkdir -p ~/data/grafana
mkdir -p ~/data/mosquitto/config
mkdir -p ~/data/mosquitto/data
mkdir -p ~/data/mosquitto/log
# move mosquitto config
cd iot-analytics-at-the-edge/docker/
cp mosquitto.conf ~/data/mosquitto/config/
# deploy stack
docker swarm init
docker stack deploy -c stack.yml iot
# check status of stack
docker stack ps iot –no-trunc
docker stack services iot
view raw gtm_stack.sh hosted with ❤ by GitHub

First, we will create several local directories on the edge device, which will be used to bind-mount to the Docker container’s directories. Below, we see the bind-mounted local directories with the eventual container’s contents stored within them.

The bind-mounted local directories on the edge device from the stack

Next, we copy the custom Mosquitto configuration file, mosquitto.conf, included in the project to the edge device’s correct location.

Lastly, we initialize the Docker swarm and deploy the stack.

Output of ‘docker service ls' command, showing the running GTM Stack containers

TimescaleDB Setup

With the GTM stack running, we need to create a single Timescale hypertable, sensor_data, in the TimescaleDB demo_iot database to hold the incoming IoT sensor data. Hypertables, according to TimescaleDB, are designed to be easy to manage and to behave like standard PostgreSQL tables. Hypertables are comprised of many interlinked “chunk” tables. Commands made to the hypertable automatically propagate changes down to all of the chunks belonging to that hypertable.

CREATE TABLE IF NOT EXISTS sensor_data (
time timestamptz NOT NULL,
device_id text NOT NULL,
temperature double PRECISION NOT NULL,
humidity double PRECISION NOT NULL,
lpg double PRECISION NOT NULL,
co double PRECISION NOT NULL,
smoke double PRECISION NOT NULL,
light boolean NOT NULL,
motion boolean NOT NULL
);
SELECT create_hypertable('sensor_data', 'time');
view raw sensor_data.sql hosted with ❤ by GitHub

I suggest using psql to execute the required DDL statements, which will create the hypertable and the proceeding views and database user permissions. All SQL statements are included in the project’s statements.sql file. One way to use psql is to install it on your local workstation, then use psql to connect to the remote edge node. I prefer to instantiate a local PostgreSQL Docker container instance running psql. I then use the local container’s psql client to connect to the edge node’s TimescaleDB database. For example, from my local machine, I run the following docker run command to connect to the edge node’s TimescaleDB database on the edge node, located locally at 192.168.1.12.

docker run -it –rm postgres psql \
-U postgres -h 192.168.1.12 -p 5432 -d demo_iot
view raw docker_run.sh hosted with ❤ by GitHub

Although not as practical, you can also access psql from within the TimescaleDB Docker container, running on the actual edge node, using the following docker exec command.

TIMESCALEDB_CONTAINER=$(docker ps -q \
–filter='name=iot_timescaledb.1' –format '{{.Names}}')
docker exec -it ${TIMESCALEDB_CONTAINER} psql \
-U postgres -h localhost -d demo_iot
view raw access_psql.sh hosted with ❤ by GitHub

TimescaleDB Continuous Aggregates

For this post’s demonstration, we will create four TimescaleDB materialized views, which will be queried from a Grafana Dashboard. The materialized views are TimescaleDB Continuous Aggregates. According to Timescale, aggregate queries which touch large swathes of time-series data can take a long time to compute because the system needs to scan large amounts of data on every query execution. To make these queries faster, a continuous aggregate allows materializing the computed aggregates, while also providing means to continuously, and with low overhead, keep them up-to-date as the underlying source data changes.

For example, we generate sensor data every five seconds from the three IoT devices in this post. When visualizing a 24-hour period in Grafana, using continuous aggregates with an interval of one minute, we would reduce the total volume of data queried from 51,840 rows to 4,320 rows, a reduction of over 91%. The larger the time period or the number of IoT devices being analyzed, the more significant these savings will positively impact query performance.

A time_bucket on the time partitioning column of the hypertable is required for all continuous aggregate views. The time_bucket function, in this case, has a bucket width (interval) of 1 minute. The interval is configurable.

— create materialized views (continuous aggregates)
— temperature and humidity
CREATE MATERIALIZED VIEW temperature_humidity_summary_minute(device_id, bucket, avg_temp, avg_humidity)
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time),
avg(temperature),
avg(humidity)
FROM sensor_data
WHERE humidity BETWEEN 0 AND 100
GROUP BY time_bucket(INTERVAL '1 minute', time), device_id
WITH NO DATA;
— air quality (lpg, co, smoke)
CREATE MATERIALIZED VIEW air_quality_summary_minute(device_id, bucket, avg_lpg, avg_co, avg_smoke)
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time),
avg(lpg),
avg(co),
avg(smoke)
FROM sensor_data
GROUP BY time_bucket(INTERVAL '1 minute', time), device_id
WITH NO DATA;
— light
CREATE MATERIALIZED VIEW light_summary_minute(device_id, bucket, avg_light)
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time),
avg(case when light = 't' then 1 else 0 end)
FROM sensor_data
GROUP BY time_bucket(INTERVAL '1 minute', time), device_id
WITH NO DATA;
— motion
CREATE MATERIALIZED VIEW motion_summary_minute(device_id, bucket, avg_motion)
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time),
avg(case when motion = 't' then 1 else 0 end)
FROM sensor_data
GROUP BY time_bucket(INTERVAL '1 minute', time), device_id
WITH NO DATA;

To automatically refresh the four materialized views, we will create four corresponding continuous aggregate policies. In this demonstration, the continuous aggregate policies create a refresh window between one week ago and one hour ago, with a refresh interval of one hour.

create policies that automatically refreshes continuous aggregates
SELECT add_continuous_aggregate_policy('air_quality_summary_minute',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
SELECT add_continuous_aggregate_policy('light_summary_minute',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
SELECT add_continuous_aggregate_policy('motion_summary_minute',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
SELECT add_continuous_aggregate_policy('temperature_humidity_summary_minute',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
view jobs
SELECT * FROM timescaledb_information.jobs;
view job stats
SELECT job_id, total_runs, total_failures, total_successes
FROM timescaledb_information.job_stats;

Advanced Analytic Queries

The ability to perform ad-hoc queries on time-series IoT data is an essential feature of the IoT edge analytics stack. We can use psql, pgAdmin, or even our own IDE to perform ad-hoc queries against the TimescaleDB database on the edge node. Below are examples of typical ad-hoc queries a data analyst might perform on IoT sensor data. These example queries demonstrate TimescaleDB’s advanced analytical capabilities for working with time-series data, including Moving Average, Delta, Time Bucket, and Histogram.

— ad-hoc queries
— find max temperature (°C) and humidity (%) for last 3 hours in 15 minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#select
SELECT time_bucket('15 minutes', time) AS fifteen_min,
device_id,
count(time),
max(temperature) AS max_temp,
max(humidity) AS max_hum
FROM sensor_data
WHERE time > now() – INTERVAL '3 hours'
AND humidity BETWEEN 0 AND 100
GROUP BY fifteen_min, device_id
ORDER BY fifteen_min DESC, max_temp desc;
— find temperature (°C) anomalies (delta > ~5°F)
https://docs.timescale.com/latest/using-timescaledb/reading-data#delta
WITH ht AS (SELECT time,
temperature,
abs(temperature – lag(temperature) over (ORDER BY time)) AS delta
FROM sensor_data)
SELECT ht.time, ht.temperature, ht.delta
FROM ht
WHERE ht.delta > 2.63
ORDER BY ht.time;
— find three minute moving average of temperature (°F) for last day
— (5 sec. interval * 36 rows = 3 min.)
https://docs.timescale.com/latest/using-timescaledb/reading-data#moving-average
SELECT time,
avg((temperature * 1.9) + 32) over (ORDER BY time
ROWS BETWEEN 35 PRECEDING AND CURRENT ROW)
AS smooth_temp
FROM sensor_data
WHERE device_id = 'Manufacturing Plant'
AND time > now() – INTERVAL '1 day'
ORDER BY time desc;
— find average humidity (%) for last 12 hours in 5-minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#time-bucket
SELECT time_bucket('5 minutes', time) AS time_period,
avg(humidity) AS avg_humidity
FROM sensor_data
WHERE device_id = 'Main Warehouse'
AND humidity BETWEEN 0 AND 100
AND time > now() – INTERVAL '12 hours'
GROUP BY time_period
ORDER BY time_period desc;
— calculate histograms of avg. temperature (°F) between 55-85°F in 5°F buckets during last 2 days
https://docs.timescale.com/latest/using-timescaledb/reading-data#histogram
SELECT device_id,
count(time),
histogram((temperature * 1.9) + 32, 55.0, 85.0, 5)
FROM sensor_data
WHERE temperature IS NOT NULL
AND time > now() – INTERVAL '2 days'
GROUP BY device_id;
— find average light value for last 90 minutes in 5-minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#time-bucket
SELECT device_id,
time_bucket('5 minutes', time) AS five_min,
avg(case when light = 't' then 1 else 0 end) AS avg_light
FROM sensor_data
WHERE device_id = 'Manufacturing Plant'
AND time > now() – INTERVAL '90 minutes'
GROUP BY device_id, five_min
ORDER BY five_min desc;

Data Visualization with Grafana

Using the TimescaleDB continuous aggregates we have created, we can quickly build a richly featured dashboard in Grafana. Below we see a typical IoT Dashboard you might build to monitor the post’s IoT sensor data in near real-time. An exported version, dashboard_external_export.json, is included in the GitHub project.

Example of Grafana dashboard showing the post’s IoT sensor data
Example of Grafana IoT Demo Dashboard showing sensor data

Limiting Grafana’s Access to IoT Data

Following the Grafana recommendation for database user permissions, we create a grafanareader PostgresSQL user, and limit the user’s access to the sensor_data table and the four views we created. Grafana will use this user’s credentials to perform SELECT queries of the TimescaleDB demo_iot database.

CREATE USER grafanareader WITH PASSWORD 'grafana1234';
GRANT USAGE ON SCHEMA public TO grafanareader;
GRANT SELECT ON public.sensor_data TO grafanareader;
GRANT SELECT ON public.temperature_humidity_summary_minute TO grafanareader;
GRANT SELECT ON public.air_quality_summary_minute TO grafanareader;
GRANT SELECT ON public.light_summary_minute TO grafanareader;
GRANT SELECT ON public.motion_summary_minute TO grafanareader;

Using PostgreSQL in Grafana

Grafana’s documentation includes a comprehensive set of instructions for Using PostgreSQL in Grafana. To connect to the TimescaleDB database from Grafana, we use the PostgreSQL data source plugin.

Configuring the TimescaleDB database connection in Grafana

The data displayed in each Panel in the Grafana Dashboard is based on a SQL query. For example, the Average Temperature Panel might use a query similar to the example below. This particular query also converts Celsius to Fahrenheit. Note the use of Grafana Macros (e.g., $__time(), $__timeFilter()). Macros can be used within a query to simplify syntax and allow for dynamic parts.

SELECT
$__time(bucket),
device_id AS metric,
((avg_temp * 1.9) + 32) AS avg_temp
FROM temperature_humidity_summary_minute
WHERE
$__timeFilter(bucket)
ORDER BY 1,2

Below, we see another example from the Average Humidity Panel. In this particular query, we might choose to limit the humidity data to a valid range of 0%–100%.

SELECT
$__time(bucket),
device_id AS metric,
avg_humidity
FROM temperature_humidity_summary_minute
WHERE
$__timeFilter(bucket)
AND avg_humidity >= 0.0
AND avg_humidity <= 100.0
ORDER BY 1,2

Mobile Friendly

Grafana dashboards are mobile-friendly. Below we see two views of the dashboard, using the Chrome mobile browser on an Apple iPhone.

Grafana Alerts

Grafana allows Alerts to be created based on the Rules you define in each Panel. If data values match the Rule’s conditions, which you pre-define, such as a temperature reading above a certain threshold for a set amount of time, an alert is sent to your choice of destinations. According to the Rule shown below, If the average temperature exceeds 75°F for a period of 5 minutes, an alert is sent.

High-temperature rule configuration

As demonstrated below, when the laboratory temperature began to exceed 75°F, the alert entered a ‘Pending’ state. If the temperature exceeded 75°F for the pre-determined period of 5 minutes, the alert status changes to ‘Alerting’, and an alert is sent. When the temperature dropped back below 75°F for the pre-determined period of 5 minutes, the alert status changed from ‘Alerting’ to ‘OK’, and a subsequent notification was sent.

Average temperature graph showing the various alert status changes

There are currently twenty alert notifiers available out-of-the-box with Grafana, including Slack, email, PagerDuty, webhooks, VictorOps, Opsgenie, and Microsoft Teams. We can use Grafana Alerts to notify the proper resources, in near real-time, if an issue is detected based on the data. Below, we see an actual series of high-temperature alerts sent by Grafana to the Slack channel, followed by subsequent notifications as the temperature returned to normal.

Grafana alert notifications in Slack channel

Conclusion

This post explored the development of an IoT edge analytics stack comprised of lightweight, purpose-built, easily deployable and manageable platform- and programming language-agnostic, open-source software components. These components included Docker containerized versions of Grafana, TimescaleDB, Eclipse Mosquitto, and pgAdmin, referred to as the GTM Stack. Using the GTM stack, we collected, analyzed, and visualized IoT data without first shipping the data to Cloud or other external systems.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment