Posts Tagged Amazon Athena

Utilizing In-memory Data Caching to Enhance the Performance of Data Lake-based Applications

Significantly improve the performance and reduce the cost of data lake-based analytics applications using Amazon ElastiCache for Redis

Introduction

The recent post, Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena, demonstrated how to develop a Cloud-native analytics application using Spring Boot. The application queried data in an Amazon S3-based data lake via an AWS Glue Data Catalog utilizing the Amazon Athena API.

Securely exposing data in a data lake using RESTful APIs can fulfill many data-consumer needs. However, access to that data can be significantly slower than access from a database or data warehouse. For example, in the previous post, we imported the OpenAPI v3 specification from the Spring Boot service into Postman. The API specification contained approximately 17 endpoints.

Running a suite of integration tests against the Spring Boot service using Postman

From my local development laptop, the Postman API test run times for all service endpoints took an average of 32.4 seconds. The Spring Boot service was running three Kubernetes pod replicas on Amazon EKS in the AWS US East (N. Virginia) Region.

Sample of test results ran from service (no Redis cache)

Compare the data lake query result times to equivalent queries made against a minimally-sized Amazon RDS for PostgreSQL database instance containing the same data. The average run times for all PostgreSQL queries averaged 10.8 seconds from a similar Spring Boot service. Although not a precise benchmark, we can clearly see that access to the data in the Amazon S3-based data lake is substantially slower, approximately 3x slower, than that of the PostgreSQL database. Tuning the database would easily create an even greater disparity.

Sample of test results for queries ran from service against Athena vs. PostgreSQL (quicker is better)

Caching for Data Lakes

According to AWS, the speed and throughput of a database can be the most impactful factor for overall application performance. Consequently, in-memory data caching can be one of the most effective strategies to improve overall application performance and reduce database costs. This same caching strategy can be applied to analytics applications built atop data lakes, as this post will demonstrate.

High-level AWS architecture demonstrated in this post

In-memory Caching

According to Hazelcast, memory caching (aka in-memory caching), often simply referred to as caching, is a technique in which computer applications temporarily store data in a computer’s main memory (e.g., RAM) to enable fast retrievals of that data. The RAM used for the temporary storage is known as the cache. As an application tries to read data, typically from a data storage system like a database, it checks to see if the desired record already exists in the cache. If it does, the application will read the data from the cache, thus eliminating the slower access to the database. If the desired record is not in the cache, then the application reads the record from the source. When it retrieves that data, it writes it to the cache so that when the application needs that same data in the future, it can quickly retrieve it from the cache.

Further, according to Hazelcast, as an application tries to read data, typically from a data storage system like a database, it checks to see if the desired record already exists in the cache. If it does, the application will read the data from the cache, thus eliminating the slower access to the database. If the desired record is not in the cache, then the application reads the record from the source. When it retrieves that data, it writes it to the cache so that when the application needs that same data in the future, it can quickly retrieve it from the cache.

Redis In-memory Data Store

According to their website, Redis is the open-source, in-memory data store used by millions of developers as a database, cache, streaming engine, and message broker. Redis provides data structures such as stringshasheslistssetssorted sets with range queries, bitmapshyperloglogsgeospatial indexes, and streams. In addition, Redis has built-in replicationLua scriptingLRU evictiontransactions, and different levels of on-disk persistence and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.

Amazon ElastiCache for Redis

According to AWS, Amazon ElastiCache for Redis, the fully-managed version of Redis, according to AWS, is a blazing fast in-memory data store that provides sub-millisecond latency to power internet-scale real-time applications. Redis applications can work seamlessly with ElastiCache for Redis without any code changes. ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with manageability, security, and scalability from AWS. As a result, Redis is an excellent choice for implementing a highly available in-memory cache to decrease data access latency, increase throughput, and ease the load on relational and NoSQL databases.

ElastiCache Performance Results

In the following post, we will add in-memory caching to the Spring Boot service introduced in the previous post. In preliminary tests with Amazon ElastiCache for Redis, the Spring Boot service delivered a 34x improvement in average response times. For example, test runs with the best-case scenario of a Redis cache hit ratio of 100% averaged 0.95 seconds compared to 32.4 seconds without Redis.

Sample of uncached versus cached test results (quicker is better)

Source Code

All the source code and Docker and Kubernetes resources are open-source and available on GitHub.

git clone --depth 1 -b redis \
https://github.com/garystafford/athena-spring-app.git

In addition, a Docker image for the Redis-base Spring Boot service is available on Docker Hub. For this post, use the latest tag with the .redis suffix.

Spring Boot service images are available on Docker Hub

Code Changes

The following code changes are required to the Spring Boot service to implement Spring Boot Cache with Redis.

Gradle Build

The gradle.build file now implements two additional dependencies, Spring Boot’s spring-boot-starter-cache and spring-boot-starter-data-redis (lines 45–46).

plugins {
id 'org.springframework.boot' version '2.7.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
id 'io.freefair.lombok' version '6.5.0-rc1'
}
group = 'aws.example'
version = '2.0.0'
sourceCompatibility = '17'
def awsSdkVersion = '2.17.225'
def springBootVersion = '2.7.1'
def restAssuredVersion = '5.1.1'
repositories {
mavenCentral()
}
dependencies {
// aws sdk
runtimeOnly "software.amazon.awssdk:bom:${awsSdkVersion}"
implementation "software.amazon.awssdk:athena:${awsSdkVersion}"
// spring
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}"
developmentOnly "org.springframework.boot:spring-boot-devtools:${springBootVersion}"
implementation 'org.springdoc:springdoc-openapi-ui:1.6.9'
implementation 'org.springframework:spring-context:5.3.20'
// testings
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
testImplementation "io.rest-assured:rest-assured:${restAssuredVersion}"
testImplementation "io.rest-assured:json-path:${restAssuredVersion}"
testImplementation "io.rest-assured:xml-path:${restAssuredVersion}"
testImplementation "io.rest-assured:json-schema-validator:${restAssuredVersion}"
// monitoring
implementation 'io.micrometer:micrometer-registry-prometheus:1.9.1'
// spring caching with redis
implementation "org.springframework.boot:spring-boot-starter-cache:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-data-redis:${springBootVersion}"
}
tasks.named('test') {
useJUnitPlatform()
}
view raw build.gradle hosted with ❤ by GitHub

Application Properties

The application properties file, application.yml, has been modified for both the dev and prod Spring Profiles. The dev Spring Profile expects Redis to be running on localhost. Correspondingly, the project’s docker-compose.yml file now includes a Redis container for local development. The time-to-live (TTL) for all Redis caches is arbitrarily set to one minute for dev and five minutes for prod. To increase application performance and reduce the cost of querying the data lake using Athena, increase Redis’s TTL. Note that increasing the TTL will reduce data freshness.

spring:
profiles:
active: dev
server:
port: 8080
servlet:
contextPath: /v1
athena:
region: us-east-1
workgroup: primary
catalog: AwsDataCatalog
database: tickit_demo
limit: 25
client-execution-timeout: 100000
retry-sleep: 1000
results-bucket: ${RESULTS_BUCKET}
named-query-id: ${NAMED_QUERY_ID}
spring:
config:
activate:
on-profile: dev
redis:
host: localhost
port: 6379
cache:
redis:
time-to-live: 60000 # 1000 ms * 60 * 1 = 1 min
enable-statistics: true
server:
port: 8080
logging:
level:
root: DEBUG
management:
endpoints:
web:
exposure:
include: '*'
jmx:
exposure:
include: '*'
spring:
config:
activate:
on-profile: prod
redis:
host: ${REDIS_HOST}
port: 6379
cache:
redis:
time-to-live: 300000 # 1000 ms * 60 * 5 = 5 min
enable-statistics: true
logging:
level:
root: INFO
management:
endpoints:
web:
exposure:
include: health, prometheus
jmx:
exposure:
include: health
view raw application.yml hosted with ❤ by GitHub

Athena Application Class

The AthenaApplication class declaration is now decorated with Spring Framework’s EnableCaching annotation (line 22). Additionally, two new Beans have been added (lines 58–68). Spring Redis provides an implementation for the Spring cache abstraction through the org.springframework.data.redis.cache package. The RedisCacheManager cache manager creates caches by default upon the first write. The RedisCacheConfiguration cache configuration helps to customize RedisCache behavior such as caching null values, cache key prefixes, and binary serialization.

package com.example.athena;
import com.example.athena.common.PreparedStatement;
import com.example.athena.common.View;
import com.example.athena.config.ConfigProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.time.Duration;
@SpringBootApplication
@EnableConfigurationProperties(ConfigProperties.class)
@EnableCaching
public class AthenaApplication {
private final PreparedStatement preparedStatement;
private final View view;
@Autowired
public AthenaApplication(PreparedStatement preparedStatement, View view) {
this.preparedStatement = preparedStatement;
this.view = view;
}
public static void main(String[] args) {
SpringApplication.run(AthenaApplication.class, args);
}
@Bean
void CreatePreparedStatement() {
preparedStatement.CreatePreparedStatement();
}
@Bean
void createView() {
view.CreateView();
}
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**").allowedOrigins("*");
}
};
}
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {
return RedisCacheManager.create(connectionFactory);
}
@Bean
public RedisCacheConfiguration cacheConfiguration() {
return RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(5))
.disableCachingNullValues();
}
}

POJO Data Model Classes

Spring Boot Redis caching uses Java serialization and deserialization. Therefore, all the POJO data model classes must implement Serializable (line 14).

package com.example.athena.tickit.model.ecomm;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Listing implements Serializable {
private int id;
private int sellerId;
private int eventId;
private int dateId;
private int numTickets;
private BigDecimal pricePerTicket;
private LocalDateTime listTime;
}
view raw Listing.java hosted with ❤ by GitHub

Service Classes

Each public method in the Service classes is now decorated with Spring Framework’s Cachable annotation (lines 42 and 66). For example, the findById(int id) method in the CategoryServiceImp class is annotated with @Cacheable(value = "categories", key = "#id"). The method’s key parameter uses Spring Expression Language (SpEL) expression for computing the key dynamically. Default is null, meaning all method parameters are considered as a key unless a custom keyGenerator has been configured. If no value is found in the Redis cache for the computed key, the target method will be invoked, and the returned value will be stored in the associated cache.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.ecomm.Listing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class ListingServiceImp implements ListingService {
private static final Logger logger = LoggerFactory.getLogger(ListingServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public ListingServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
@Cacheable(value = "listings")
public List<Listing> findAll(Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE listid IS NOT NULL";
String query = String.format("""
SELECT *
FROM refined_tickit_public_listing
%s
ORDER BY listid
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}
@Cacheable(value = "listing", key = "#id")
public Listing findById(int id) {
String query = String.format("""
SELECT DISTINCT *
FROM refined_tickit_public_listing
WHERE listid=%s""", id);
Listing listing;
try {
listing = startQuery(query).get(0);
} catch (java.lang.IndexOutOfBoundsException e) {
logger.error(e.getMessage());
return null;
}
return listing;
}
private List<Listing> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<Listing> listings = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return listings;
}
private List<Listing> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<Listing> listings = new ArrayList<>();
try {
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Listing listing = new Listing();
listing.setId(parseInt(allData.get(0).varCharValue()));
listing.setSellerId(parseInt(allData.get(1).varCharValue()));
listing.setEventId(parseInt(allData.get(2).varCharValue()));
listing.setDateId(parseInt(allData.get(3).varCharValue()));
listing.setNumTickets(Integer.parseInt(allData.get(4).varCharValue()));
listing.setPricePerTicket(new BigDecimal(allData.get(5).varCharValue()));
listing.setListTime(LocalDateTime.parse(allData.get(6).varCharValue(), formatter));
listings.add(listing);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return listings;
}
}

Controller Classes

There are no changes required to the Controller classes.

Amazon ElastiCache for Redis

Multiple options are available for creating an Amazon ElastiCache for Redis cluster, including cluster mode, multi-AZ option, auto-failover option, node type, number of replicas, number of shards, replicas per shard, Availability Zone placements, and encryption at rest and encryption in transit options. The results in this post are based on a minimally-configured Redis version 6.2.6 cluster, with one shard, two cache.r6g.large nodes, and cluster mode, multi-AZ option, and auto-failover all disabled. In addition, encryption at rest and encryption in transit were also disabled. This cluster configuration is sufficient for development and testing, but not Production.

Amazon ElastiCache for Redis cluster used for this post
Amazon ElastiCache for Redis cluster monitoring console showing caching activity

Testing the Cache

To test Amazon ElastiCache for Redis, we will use Postman again with the imported OpenAPI v3 specification. With all data evicted from existing Redis caches, the first time the Postman tests run, they cause the service’s target methods to be invoked and the returned data stored in the associated caches.

Running a suite of integration tests against the Spring Boot service using Postman

To confirm this caching behavior, use the Redis CLI’s --scan option. To access the redis-cli, I deployed a single Redis pod to Amazon EKS. The first time the --scan command runs, we should get back an empty list of keys. After the first Postman test run, the same --scan command should return a list of cached keys.

List of cached keys in Redis

Use the Redis CLI’s MONITOR option to further confirm data is being cached, as indicated by the set command.

Athena query results being cached in Redis

After the initial caching of data, use the Redis CLI’s MONITOR option, again, to confirm the cache is being hit instead of calling the target methods, which would then invoke the Athena API. Rerunning the Postman tests, we should see get commands as opposed to set commands.

Monitoring cache hits in Redis

Lastly, to confirm the Spring Boot service is effectively using Redis to cache data, we can also check Amazon Athena’s Recent queries tab in the AWS Management Console. After repeated sequential test runs within the TTL window, we should only see one Athena query per endpoint.

Amazon Athena Recent queries tab in the AWS Management Console

Conclusion

In this brief follow-up to the recent post, Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena, we saw how to substantially increase data lake application performance using Amazon ElastiCache for Redis. Although this caching technique is often associated with databases, it can also be effectively applied to data lake-based applications, as demonstrated in the post.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author unless otherwise noted.

, , , , ,

Leave a comment

Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena

Learn how to develop Cloud-native, RESTful Java services that query data in an AWS-based data lake using Amazon Athena’s API

Introduction

AWS provides a collection of fully-managed services that makes building and managing secure data lakes faster and easier, including AWS Lake Formation, AWS Glue, and Amazon S3. Additional analytics services such as Amazon EMR, AWS Glue Studio, and Amazon Redshift allow Data Scientists and Analysts to run high-performance queries on large volumes of semi-structured and structured data quickly and economically.

What is not always as obvious is how teams develop internal and external customer-facing analytics applications built on top of data lakes. For example, imagine sellers on an eCommerce platform, the scenario used in this post, want to make better marketing decisions regarding their products by analyzing sales trends and buyer preferences. Further, suppose the data required for the analysis must be aggregated from multiple systems and data sources; the ideal use case for a data lake.

Example of a personalized sales report generated from the Spring Boot service’s salesbyseller endpoint

In this post, we will explore an example Java Spring Boot RESTful Web Service that allows end-users to query data stored in a data lake on AWS. The RESTful Web Service will access data stored as Apache Parquet in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena. The service will use Spring Boot and the AWS SDK for Java to expose a secure, RESTful Application Programming Interface (API).

High-level AWS architecture demonstrated in this post

Amazon Athena is a serverless, interactive query service based on Presto, used to query data and analyze big data in Amazon S3 using standard SQL. Using Athena functionality exposed by the AWS SDK for Java and Athena API, the Spring Boot service will demonstrate how to access tablesviewsprepared statements, and saved queries (aka named queries).

Amazon Athena Query Editor

TL;DR

Do you want to explore the source code for this post’s Spring Boot service or deploy it to Kubernetes before reading the full article? All the source code, Docker, and Kubernetes resources are open-source and available on GitHub.

git clone --depth 1 -b main \
https://github.com/garystafford/athena-spring-app.git

A Docker image for the Spring Boot service is also available on Docker Hub.

Spring Boot service image available on Docker Hub

Data Lake Data Source

There are endless data sources to build a demonstration data lake on AWS. This post uses the TICKIT sample database provided by AWS and designed for Amazon Redshift, AWS’s cloud data warehousing service. The database consists of seven tables. Two previous posts and associated videos, Building a Data Lake on AWS with Apache Airflow and Building a Data Lake on AWS, detail the setup of the data lake used in this post using AWS Glue and optionally Apache Airflow with Amazon MWAA.

Those two posts use the data lake pattern of segmenting data as bronze (aka raw), silver (aka refined), and gold (aka aggregated), popularized by Databricks. The data lake simulates a typical scenario where data originates from multiple sources, including an e-commerce platform, a CRM system, and a SaaS provider must be aggregated and analyzed.

High-level data lake architecture demonstrated in the previous post

Spring Projects with IntelliJ IDE

Although not a requirement, I used JetBrains IntelliJ IDEA 2022 (Ultimate Edition) to develop and test the post’s Spring Boot service. Bootstrapping Spring projects with IntelliJ is easy. Developers can quickly create a Spring project using the Spring Initializr plugin bundled with the IntelliJ.

JetBrains IntelliJ IDEA plugin support for Spring projects

The Spring Initializr plugin’s new project creation wizard is based on start.spring.io. The plugin allows you to quickly select the Spring dependencies you want to incorporate into your project.

Adding dependencies to a new Spring project in IntelliJ

Visual Studio Code

There are also several Spring extensions for the popular Visual Studio Code IDE, including Microsoft’s Spring Initializr Java Support extension.

Spring Initializr Java Support extension for Visual Studio Code by Microsoft

Gradle

This post uses Gradle instead of Maven to develop, test, build, package, and deploy the Spring service. Based on the packages selected in the new project setup shown above, the Spring Initializr plugin’s new project creation wizard creates a build.gradle file. Additional packages, such as LombakMicrometer, and Rest Assured, were added separately.

plugins {
id 'org.springframework.boot' version '2.7.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
id 'io.freefair.lombok' version '6.5.0-rc1'
}
group = 'aws.example'
version = '1.0.0'
sourceCompatibility = '17'
def awsSdkVersion = '2.17.225'
def springBootVersion = '2.7.1'
def restAssuredVersion = '5.1.1'
repositories {
mavenCentral()
}
dependencies {
// aws sdk
runtimeOnly "software.amazon.awssdk:bom:${awsSdkVersion}"
implementation "software.amazon.awssdk:athena:${awsSdkVersion}"
// spring
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}"
developmentOnly "org.springframework.boot:spring-boot-devtools:${springBootVersion}"
implementation 'org.springdoc:springdoc-openapi-ui:1.6.9'
implementation 'org.springframework:spring-context:5.3.20'
// testings
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
testImplementation "io.rest-assured:rest-assured:${restAssuredVersion}"
testImplementation "io.rest-assured:json-path:${restAssuredVersion}"
testImplementation "io.rest-assured:xml-path:${restAssuredVersion}"
testImplementation "io.rest-assured:json-schema-validator:${restAssuredVersion}"
// monitoring
implementation 'io.micrometer:micrometer-registry-prometheus:1.9.1'
}
tasks.named('test') {
useJUnitPlatform()
}
view raw build.gradle hosted with ❤ by GitHub

Amazon Corretto

The Spring boot service is developed for and compiled with the most recent version of Amazon Corretto 17. According to AWS, Amazon Corretto is a no-cost, multiplatform, production-ready distribution of the Open Java Development Kit (OpenJDK). Corretto comes with long-term support that includes performance enhancements and security fixes. Corretto is certified as compatible with the Java SE standard and is used internally at Amazon for many production services.

Source Code

Each API endpoint in the Spring Boot RESTful Web Service has a corresponding POJO data model class, service interface and service implementation class, and controller class. In addition, there are also common classes such as configuration, a client factory, and Athena-specific request/response methods. Lastly, there are additional class dependencies for views and prepared statements.

Java class relationships related to querying the Amazon Athena refined_tickit_public_category table

The project’s source code is arranged in a logical hierarchy by package and class type.

.
└── com
└── example
└── athena
├── AthenaApplication.java
├── common
│   ├── AthenaClientFactory.java
│   ├── AthenaClientFactoryImp.java
│   ├── AthenaCommon.java
│   ├── NamedQuery.java
│   ├── PreparedStatement.java
│   └── View.java
├── config
│   └── ConfigProperties.java
└── tickit
├── controller
│   ├── BuyerLikesByCategoryController.java
│   ├── CategoryController.java
│   ├── DateDetailController.java
│   ├── EventController.java
│   ├── ListingController.java
│   ├── SaleBySellerController.java
│   ├── SaleController.java
│   ├── SalesByCategoryController.java
│   ├── UserController.java
│   └── VenueController.java
├── model
│   ├── crm
│   │   └── User.java
│   ├── ecomm
│   │   ├── DateDetail.java
│   │   ├── Listing.java
│   │   └── Sale.java
│   ├── resultsets
│   │   ├── BuyerLikesByCategory.java
│   │   ├── SaleBySeller.java
│   │   └── SalesByCategory.java
│   └── saas
│   ├── Category.java
│   ├── Event.java
│   └── Venue.java
└── service
├── BuyerLikesByCategoryServiceImp.java
├── BuyersLikesByCategoryService.java
├── CategoryService.java
├── CategoryServiceImp.java
├── DateDetailService.java
├── DateDetailServiceImp.java
├── EventService.java
├── EventServiceImp.java
├── ListingService.java
├── ListingServiceImp.java
├── SaleBySellerService.java
├── SaleBySellerServiceImp.java
├── SaleService.java
├── SaleServiceImp.java
├── SalesByCategoryService.java
├── SalesByCategoryServiceImp.java
├── UserService.java
├── UserServiceImp.java
├── VenueService.java
└── VenueServiceImp.java

Amazon Athena Access

There are three standard methods for accessing Amazon Athena with the AWS SDK for Java: 1) the AthenaClient service client, 2) the AthenaAsyncClient service client for accessing Athena asynchronously, and 3) using the JDBC driver with the AWS SDK. The AthenaClient and AthenaAsyncClient service clients are both parts of the software.amazon.awssdk.services.athena package. For simplicity, this post’s Spring Boot service uses the AthenaClient service client instead of Java’s asynchronously programming model. AWS supplies basic code samples as part of their documentation as a starting point for writing Athena applications using the SDK. The code samples also use the AthenaClient service client.

POJO-based Data Model Class

For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Plain Old Java Object (POJO). According to Wikipedia, a POGO is an ordinary Java object, not bound by any particular restriction. The POJO class is similar to a JPA Entity, representing persistent data stored in a relational database. In this case, the POJO uses Lombok’s @Data annotation. According to the documentation, this annotation generates getters for all fields, a useful toString method, and hashCode and equals implementations that check all non-transient fields. It also generates setters for all non-final fields and a constructor.

package com.example.athena.tickit.model.saas;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {
private int id;
private int venueId;
private int catId;
private int dateId;
private String name;
private LocalDateTime startTime;
}
view raw Event.java hosted with ❤ by GitHub

Each POJO corresponds directly to a ‘silver’ table in the AWS Glue Data Catalog. For example, the Event POJO corresponds to the refined_tickit_public_event table in the tickit_demo Data Catalog database. The POJO defines the Spring Boot service’s data model for data read from the corresponding AWS Glue Data Catalog table.

Glue Data Catalog refined_tickit_public_event table

The Glue Data Catalog table is the interface between the Athena query and the underlying data stored in Amazon S3 object storage. The Athena query targets the table, which returns the underlying data from S3.

Tickit Category data stored as Apache Parquet files in Amazon S3

Service Class

Retrieving data from the data lake via AWS Glue, using Athena, is handled by a service class. For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Service Interface and implementation class. The service implementation class uses Spring Framework’s @Service annotation. According to the documentation, it indicates that an annotated class is a “Service,” initially defined by Domain-Driven Design (Evans, 2003) as “an operation offered as an interface that stands alone in the model, with no encapsulated state.” Most importantly for the Spring Boot service, this annotation serves as a specialization of @Component, allowing for implementation classes to be autodetected through classpath scanning.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.saas.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class EventServiceImp implements EventService {
private static final Logger logger = LoggerFactory.getLogger(EventServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public EventServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<Event> findAll(Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE eventid IS NOT NULL";
String query = String.format("""
SELECT *
FROM refined_tickit_public_event
%s
ORDER BY eventid
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}
public Event findById(int id) {
String query = String.format("""
SELECT DISTINCT *
FROM refined_tickit_public_event
WHERE eventid=%s""", id);
Event event;
try {
event = startQuery(query).get(0);
} catch (IndexOutOfBoundsException e) {
logger.error(e.getMessage());
return null;
}
return event;
}
private List<Event> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<Event> events = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return events;
}
private List<Event> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<Event> events = new ArrayList<>();
try {
// Max Results can be set but if it's not set,
// it will choose the maximum page size
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Event event = new Event();
event.setId(parseInt(allData.get(0).varCharValue()));
event.setVenueId(parseInt(allData.get(1).varCharValue()));
event.setCatId(parseInt(allData.get(2).varCharValue()));
event.setDateId(parseInt(allData.get(3).varCharValue()));
event.setName(allData.get(4).varCharValue());
event.setStartTime(LocalDateTime.parse(allData.get(5).varCharValue(), formatter));
events.add(event);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return events;
}
}

Using Spring’s common constructor-based Dependency Injection (DI) method (aka constructor injection), the service auto-wires an instance of the AthenaClientFactory interface. Note that we are auto-wiring the service interface, not the service implementation, allowing us to wire in a different implementation if desired, such as for testing.

The service calls the AthenaClientFactoryclass’s createClient() method, which returns a connection to Amazon Athena using one of several available authentication methods. The authentication scheme will depend on where the service is deployed and how you want to securely connect to AWS. Some options include environment variables, local AWS profile, EC2 instance profile, or token from the web identity provider.

return AthenaClient.builder()
.credentialsProvider(EnvironmentVariableCredentialsProvider.create())
.build();

The service class transforms the payload returned by an instance of GetQueryResultsResponse into an ordered collection (also known as a sequence), List<E>, where E represents a POJO. For example, with the data lake’srefined_tickit_public_event table, the service returns a List<Event>. This pattern repeats itself for tables, views, prepared statements, and named queries. Column data types can be transformed and formatted on the fly, new columns added, and existing columns skipped.

List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Event event = new Event();
event.setId(parseInt(allData.get(0).varCharValue()));
event.setVenueId(parseInt(allData.get(1).varCharValue()));
event.setCatId(parseInt(allData.get(2).varCharValue()));
event.setDateId(parseInt(allData.get(3).varCharValue()));
event.setName(allData.get(4).varCharValue());
event.setStartTime(LocalDateTime.parse(allData.get(5).varCharValue(), formatter));
events.add(event);
}
}

For each endpoint defined in the Controller class, for example, get()findAll(), and FindById(), there is a corresponding method in the Service class. Below, we see an example of the findAll() method in the SalesByCategoryServiceImp service class. This method corresponds to the identically named method in the SalesByCategoryController controller class. Each of these service methods follows a similar pattern of constructing a dynamic Athena SQL query based on input parameters, which is passed to Athena through the AthenaClient service client using an instance of GetQueryResultsRequest.

public List<SalesByCategory> findAll(String calendarDate, Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE caldate IS NOT NULL";
if (calendarDate != null) {
whereClause = whereClause + " AND caldate=date('" + calendarDate + "')";
}
String query = String.format("""
SELECT *
FROM tickit_sales_by_day_and_category
%s
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}

Controller Class

Lastly, there is a corresponding Controller class for each API endpoint in the Spring Boot RESTful Web Service. The controller class uses Spring Framework’s @RestController annotation. According to the documentation, this annotation is a convenience annotation that is itself annotated with @Controller and @ResponseBody. Types that carry this annotation are treated as controllers where @RequestMapping methods assume @ResponseBody semantics by default.

The controller class takes a dependency on the corresponding service class application component using constructor-based Dependency Injection (DI). Like the service example above, we are auto-wiring the service interface, not the service implementation.

package com.example.athena.tickit.controller;
import com.example.athena.tickit.model.saas.Event;
import com.example.athena.tickit.service.EventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping(value = "/events")
public class EventController {
private final EventService service;
@Autowired
public EventController(EventService service) {
this.service = service;
}
@RequestMapping(method = RequestMethod.GET)
public ResponseEntity<List<Event>> findAll(
@RequestParam(required = false) Integer limit,
@RequestParam(required = false) Integer offset
) {
List<Event> events = service.findAll(limit, offset);
if (events.size() == 0) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(null);
}
return ResponseEntity.status(HttpStatus.OK).body(events);
}
@RequestMapping(value = "/{id}", method = RequestMethod.GET)
public ResponseEntity<Event> findById(@PathVariable("id") int id) {
Event event = service.findById(id);
if (event == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(null);
}
return ResponseEntity.status(HttpStatus.OK).body(event);
}
}

The controller is responsible for serializing the ordered collection of POJOs into JSON and returning that JSON payload in the body of the HTTP response to the initial HTTP request.

Querying Views

In addition to querying AWS Glue Data Catalog tables (aka Athena tables), we also query views. According to the documentation, a view in Amazon Athena is a logical table, not a physical table. Therefore, the query that defines a view runs each time the view is referenced in a query.

For convenience, each time the Spring Boot service starts, the main AthenaApplication class calls the View.java class’s CreateView() method to check for the existence of the view, view_tickit_sales_by_day_and_category. If the view does not exist, it is created and becomes accessible to all application end-users. The view is queried through the service’s /salesbycategory endpoint.

Java class relationships related to querying the Amazon Athena view

This confirm-or-create pattern is repeated for the prepared statement in the main AthenaApplication class (detailed in the next section).

package com.example.athena;
import com.example.athena.common.PreparedStatement;
import com.example.athena.common.View;
import com.example.athena.config.ConfigProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@SpringBootApplication
@EnableConfigurationProperties(ConfigProperties.class)
public class AthenaApplication {
private final PreparedStatement preparedStatement;
private final View view;
@Autowired
public AthenaApplication(PreparedStatement preparedStatement, View view) {
this.preparedStatement = preparedStatement;
this.view = view;
}
public static void main(String[] args) {
SpringApplication.run(AthenaApplication.class, args);
}
@Bean
void CreatePreparedStatement() {
preparedStatement.CreatePreparedStatement();
}
@Bean
void createView() {
view.CreateView();
}
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**").allowedOrigins("*");
}
};
}
}

Below, we see the View class called by the service at startup.

package com.example.athena.common;
import com.example.athena.config.ConfigProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.GetTableMetadataRequest;
import software.amazon.awssdk.services.athena.model.GetTableMetadataResponse;
import software.amazon.awssdk.services.athena.model.MetadataException;
@Component
public class View {
private static final Logger logger = LoggerFactory.getLogger(View.class);
private final AthenaClientFactory athenaClientFactoryImp;
private final ConfigProperties configProperties;
private final AthenaCommon athenaCommon;
@Autowired
public View(AthenaClientFactory athenaClientFactoryImp,
ConfigProperties configProperties,
AthenaCommon athenaCommon) {
this.athenaClientFactoryImp = athenaClientFactoryImp;
this.configProperties = configProperties;
this.athenaCommon = athenaCommon;
}
public void CreateView() {
String viewName = "view_tickit_sales_by_day_and_category";
String createViewSqlStatement = String.format("""
CREATE VIEW %s AS
SELECT cast(d.caldate AS DATE) AS caldate,
c.catgroup,
c.catname,
sum(round(cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold, 2)) AS saleamount,
sum(cast(s.commission AS DECIMAL(8,2))) AS commission
FROM refined_tickit_public_sales AS s
LEFT JOIN refined_tickit_public_event AS e ON e.eventid = s.eventid
LEFT JOIN refined_tickit_public_date AS d ON d.dateid = s.dateid
LEFT JOIN refined_tickit_public_category AS c ON c.catid = e.catid
GROUP BY caldate,
catgroup,
catname
ORDER BY caldate,
catgroup,
catname;""", viewName);
try (AthenaClient athenaClient = athenaClientFactoryImp.createClient()) {
try {
GetTableMetadataResponse getPreparedStatementRequest = getGetTableMetadataResponse(viewName, athenaClient);
logger.debug(String.format("View already exists: %s", getPreparedStatementRequest.tableMetadata().name()));
} catch (MetadataException e) { // View does not exist
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, createViewSqlStatement);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
// Confirm View was created
GetTableMetadataResponse getPreparedStatementRequest = getGetTableMetadataResponse(viewName, athenaClient);
logger.debug(String.format("View created successfully: %s", getPreparedStatementRequest.tableMetadata().name()));
}
}
}
private GetTableMetadataResponse getGetTableMetadataResponse(String viewName, AthenaClient athenaClient) {
GetTableMetadataRequest getTableMetadataRequest = GetTableMetadataRequest.builder()
.catalogName(configProperties.getCatalog())
.databaseName(configProperties.getDatabase())
.tableName(viewName)
.build();
return athenaClient.getTableMetadata(getTableMetadataRequest);
}
}
view raw View.java hosted with ❤ by GitHub

Aside from the fact the /salesbycategory endpoint queries a view, everything else is identical to querying a table. This endpoint uses the same model-service-controller pattern.

Executing Prepared Statements

According to the documentation, you can use the Athena parameterized query feature to prepare statements for repeated execution of the same query with different query parameters. The prepared statement used by the service, tickit_sales_by_seller, accepts a single parameter, the ID of the seller (sellerid). The prepared statement is executed using the /salesbyseller endpoint. This scenario simulates an end-user of the analytics application who wants to retrieve enriched sales information about their sales.

package com.example.athena.common;
import com.example.athena.config.ConfigProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.CreatePreparedStatementRequest;
import software.amazon.awssdk.services.athena.model.GetPreparedStatementRequest;
import software.amazon.awssdk.services.athena.model.GetPreparedStatementResponse;
import software.amazon.awssdk.services.athena.model.ResourceNotFoundException;
@Component
public class PreparedStatement {
private static final Logger logger = LoggerFactory.getLogger(PreparedStatement.class);
private final AthenaClientFactory athenaClientFactoryImp;
private final ConfigProperties configProperties;
@Autowired
public PreparedStatement(AthenaClientFactory athenaClientFactoryImp, ConfigProperties configProperties) {
this.athenaClientFactoryImp = athenaClientFactoryImp;
this.configProperties = configProperties;
}
public void CreatePreparedStatement() {
String preparedStatementName = "tickit_sales_by_seller";
String preparedStatementSql = """
SELECT cast(d.caldate AS DATE) AS caldate,
s.pricepaid,
s.qtysold,
round(cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold, 2) AS saleamount,
cast(s.commission AS DECIMAL(8,2)) AS commission,
round((cast(s.commission AS DECIMAL(8,2)) / (cast(s.pricepaid AS DECIMAL(8,2)) * s.qtysold)) * 100, 2) AS commissionprcnt,
e.eventname,
concat(u1.firstname, ' ', u1.lastname) AS seller,
concat(u2.firstname, ' ', u2.lastname) AS buyer,
c.catgroup,
c.catname
FROM refined_tickit_public_sales AS s
LEFT JOIN refined_tickit_public_listing AS l ON l.listid = s.listid
LEFT JOIN refined_tickit_public_users AS u1 ON u1.userid = s.sellerid
LEFT JOIN refined_tickit_public_users AS u2 ON u2.userid = s.buyerid
LEFT JOIN refined_tickit_public_event AS e ON e.eventid = s.eventid
LEFT JOIN refined_tickit_public_date AS d ON d.dateid = s.dateid
LEFT JOIN refined_tickit_public_category AS c ON c.catid = e.catid
WHERE s.sellerid = ?
ORDER BY caldate,
eventname;""";
try (AthenaClient athenaClient = athenaClientFactoryImp.createClient()) {
try {
GetPreparedStatementResponse getPreparedStatementResponse = getGetPreparedStatementResponse(preparedStatementName, athenaClient);
logger.debug(String.format("Prepared statement already exists: %s", getPreparedStatementResponse.preparedStatement().statementName()));
} catch (ResourceNotFoundException e) { // PreparedStatement does not exist
CreatePreparedStatementRequest createPreparedStatementRequest = CreatePreparedStatementRequest.builder()
.statementName(preparedStatementName)
.description("Returns all sales by seller based on the seller's userid")
.workGroup(configProperties.getWorkGroup())
.queryStatement(preparedStatementSql).build();
athenaClient.createPreparedStatement(createPreparedStatementRequest);
// Confirm PreparedStatement was created
GetPreparedStatementResponse getPreparedStatementResponse = getGetPreparedStatementResponse(preparedStatementName, athenaClient);
logger.debug(String.format("Prepared statement created successfully: %s", getPreparedStatementResponse.preparedStatement().statementName()));
}
}
}
private GetPreparedStatementResponse getGetPreparedStatementResponse(String preparedStatementName, AthenaClient athenaClient) {
GetPreparedStatementRequest getPreparedStatementRequest = GetPreparedStatementRequest.builder()
.statementName(preparedStatementName)
.workGroup(configProperties.getWorkGroup()).build();
return athenaClient.getPreparedStatement(getPreparedStatementRequest);
}
}

The pattern of querying data is similar to tables and views, except instead of using the common SELECT...FROM...WHERE SQL query pattern, we use the EXECUTE...USING pattern.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.tickit.model.resultsets.SaleBySeller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
@Service
public class SaleBySellerServiceImp implements SaleBySellerService {
private static final Logger logger = LoggerFactory.getLogger(SaleBySellerServiceImp.class);
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public SaleBySellerServiceImp(AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<SaleBySeller> find(int id) {
String query = String.format("""
EXECUTE tickit_sales_by_seller USING %s;""", id);
return startQuery(query);
}
private List<SaleBySeller> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<SaleBySeller> saleBySellers = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return saleBySellers;
}
private List<SaleBySeller> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<SaleBySeller> saleBySellers = new ArrayList<>();
try {
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
SaleBySeller saleBySeller = new SaleBySeller();
saleBySeller.setCalDate(LocalDate.parse(allData.get(0).varCharValue()));
saleBySeller.setPricePaid(new BigDecimal(allData.get(1).varCharValue()));
saleBySeller.setQtySold(Integer.parseInt(allData.get(2).varCharValue()));
saleBySeller.setSaleAmount(new BigDecimal(allData.get(3).varCharValue()));
saleBySeller.setCommission(new BigDecimal(allData.get(4).varCharValue()));
saleBySeller.setCommissionPrcnt(Double.valueOf(allData.get(5).varCharValue()));
saleBySeller.setEventName(allData.get(6).varCharValue());
saleBySeller.setSeller(allData.get(7).varCharValue());
saleBySeller.setBuyer(allData.get(8).varCharValue());
saleBySeller.setCatGroup(allData.get(9).varCharValue());
saleBySeller.setCatName(allData.get(10).varCharValue());
saleBySellers.add(saleBySeller);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return saleBySellers;
}
}

For example, to execute the prepared statement for a seller with an ID of 3, we would use EXECUTE tickit_sales_by_seller USING 3;. We pass the seller’s ID of 3 as a path parameter similar to other endpoints exposed by the service: /v1/salesbyseller/3.

Sales by seller query results from Athena using the seller’s ID as a parameter for the prepared statement

Again, aside from the fact the /salesbyseller endpoint executes a prepared statement and passes a parameter; everything else is identical to querying a table or a view, using the same model-service-controller pattern.

Working with Named Queries

In addition to tables, views, and prepared statements, Athena has the concept of saved queries, referred to as named queries in the Athena API and when using AWS CloudFormation. You can use the Athena console or API to save, edit, run, rename, and delete queries. The queries are persisted using a NamedQueryId, a unique identifier (UUID) of the query. You must reference the NamedQueryId when working with existing named queries.

Example of saved query (named query) used in this post

There are multiple ways to use and reuse existing named queries programmatically. For this demonstration, I created the named query, buyer_likes_by_category, in advance and then stored the resulting NamedQueryId as an application property, injected at runtime or kubernetes deployment time through a local environment variable.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.resultsets.BuyerLikesByCategory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class BuyerLikesByCategoryServiceImp implements BuyersLikesByCategoryService {
private static final Logger logger = LoggerFactory.getLogger(BuyerLikesByCategoryServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public BuyerLikesByCategoryServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
public List<BuyerLikesByCategory> get() {
return getNamedQueryResults(configProperties.getNamedQueryId());
}
private List<BuyerLikesByCategory> getNamedQueryResults(String queryId) {
logger.debug(String.format("NamedQueryId: %s", queryId));
AthenaClient athenaClient = athenaClientFactory.createClient();
GetNamedQueryRequest getNamedQueryRequest = GetNamedQueryRequest.builder()
.namedQueryId(queryId)
.build();
GetNamedQueryResponse getNamedQueryResponse = athenaClient.getNamedQuery(getNamedQueryRequest);
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, getNamedQueryResponse.namedQuery().queryString());
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<BuyerLikesByCategory> buyerLikesByCategories = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return buyerLikesByCategories;
}
private List<BuyerLikesByCategory> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<BuyerLikesByCategory> buyerLikesByCategories = new ArrayList<>();
try {
// Max Results can be set but if it's not set,
// it will choose the maximum page size
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
BuyerLikesByCategory buyerLikesByCategory = new BuyerLikesByCategory();
buyerLikesByCategory.setSports(parseInt(allData.get(0).varCharValue()));
buyerLikesByCategory.setTheatre(parseInt(allData.get(1).varCharValue()));
buyerLikesByCategory.setConcerts(parseInt(allData.get(2).varCharValue()));
buyerLikesByCategory.setJazz(parseInt(allData.get(3).varCharValue()));
buyerLikesByCategory.setClassical(parseInt(allData.get(4).varCharValue()));
buyerLikesByCategory.setOpera(parseInt(allData.get(5).varCharValue()));
buyerLikesByCategory.setRock(parseInt(allData.get(6).varCharValue()));
buyerLikesByCategory.setVegas(parseInt(allData.get(7).varCharValue()));
buyerLikesByCategory.setBroadway(parseInt(allData.get(8).varCharValue()));
buyerLikesByCategory.setMusicals(parseInt(allData.get(9).varCharValue()));
buyerLikesByCategories.add(buyerLikesByCategory);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return buyerLikesByCategories;
}
}

Alternately, you might iterate through a list of named queries to find one that matches the name at startup. However, this method would undoubtedly impact service performance, startup time, and cost. Lastly, you could use a method like NamedQuery() included in the unused NamedQuery class at startup, similar to the view and prepared statement. That named query’s unique NamedQueryId would be persisted as a system property, referencable by the service class. The downside is that you would create a duplicate of the named query each time you start the service. Therefore, this method is also not recommended.

Configuration

Two components responsible for persisting configuration for the Spring Boot service are the application.yml properties file and ConfigProperties class. The class uses Spring Framework’s @ConfigurationProperties annotation. According to the documentation, this annotation is used for externalized configuration. Add this to a class definition or a @Bean method in a @Configuration class if you want to bind and validate some external Properties (e.g., from a .properties or .yml file). Binding is performed by calling setters on the annotated class or, if @ConstructorBinding in use, by binding to the constructor parameters.

The @ConfigurationProperties annotation includes the prefix of athena. This value corresponds to the athena prefix in the the application.yml properties file. The fields in the ConfigProperties class are bound to the properties in the the application.yml. For example, the property, namedQueryId, is bound to the property, athena.named.query.id. Further, that property is bound to an external environment variable, NAMED_QUERY_ID. These values could be supplied from an external configuration system, a Kubernetes secret, or external secrets management system.

spring:
profiles:
active: dev
server:
port: 8080
servlet:
contextPath: /v1
athena:
region: us-east-1
workgroup: primary
catalog: AwsDataCatalog
database: tickit_demo
limit: 25
client-execution-timeout: 100000
retry-sleep: 1000
results-bucket: ${RESULTS_BUCKET}
named-query-id: ${NAMED_QUERY_ID}
spring:
config:
activate:
on-profile: dev
logging:
level:
root: DEBUG
management:
endpoints:
web:
exposure:
include: '*'
jmx:
exposure:
include: '*'
spring:
config:
activate:
on-profile: prod
logging:
level:
root: INFO
management:
endpoints:
web:
exposure:
include: health, prometheus
jmx:
exposure:
include: health
view raw application.yml hosted with ❤ by GitHub

AWS IAM: Authentication and Authorization

For the Spring Boot service to interact with Amazon Athena, AWS Glue, and Amazon S3, you need to establish an AWS IAM Role, which the service assumes once authenticated. The Role must be associated with an attached IAM Policy containing the requisite Athena, Glue, and S3 permissions. For development, the service uses a policy similar to the one shown below. Please note this policy is broader than recommended for Production; it does not represent the security best practice of least privilege. In particular, the use of the overly-broad * for Resources should be strictly avoided when creating policies.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:CreatePreparedStatement",
"athena:ListPreparedStatements",
"glue:CreateTable",
"athena:CreateNamedQuery",
"athena:ListNamedQueries",
"athena:GetTableMetadata",
"athena:GetPreparedStatement",
"athena:GetQueryResults",
"athena:GetQueryExecution",
"athena:GetNamedQuery"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:BatchGetPartition",
"glue:GetTable"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject",
"s3:PutBucketPublicAccessBlock"
],
"Resource": [
"arn:aws:s3:::aws-athena-query-results-*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::date-lake-demo-*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:ListAllMyBuckets"
],
"Resource": [
"*"
]
}
]
}

In addition to the authorization granted by the IAM Policy, AWS Lake Formation can be used with Amazon S3, AWS Glue, and Amazon Athena to grant fine-grained database-, table-, column-, and row-level access to datasets.

Swagger UI and the OpenAPI Specification

The easiest way to view and experiment with all the endpoints available through the controller classes is using the Swagger UI, included in the example Spring Boot service, by way of the springdoc-openapi Java library. The Swagger UI is accessed at /v1/swagger-ui/index.html.

Swagger UI showing endpoints exposed by the service’s controller classes

The OpenAPI Specification (formerly Swagger Specification) is an API description format for REST APIs. The /v1/v3/api-docs endpoint allows you to generate an OpenAPI v3 specification file. The OpenAPI file describes the entire API.

Spring Boot service’s OpenAPI v3 specification

The OpenAPI v3 specification can be saved as a file and imported into applications like Postman, the API platform for building and using APIs.

Calling the service’s /users API endpoint using Postman
Running a suite of integration tests against the Spring Boot service using Postman

Integration Tests

Included in the Spring Boot service’s source code is a limited number of example integration tests, not to be confused with unit tests. Each test class uses Spring Framework’s @SpringBootTest annotation. According to the documentation, this annotation can be specified on a test class that runs Spring Boot-based tests. It provides several features over and above the regular Spring TestContext Framework.

package com.example.athena;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import static io.restassured.RestAssured.get;
import static io.restassured.RestAssured.given;
import static io.restassured.http.ContentType.JSON;
@SpringBootTest
class CategoriesResourceTests {
private static final String ResourcePath = "/v1/categories";
private static final int resultsetLimit = 25;
@Test
void findAll() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.greaterThanOrEqualTo(1))
.body("$.size()", Matchers.lessThanOrEqualTo(resultsetLimit));
}
@Test
void findAllWithLimit() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("limit", 3)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.equalTo(3));
}
@Test
void findAllWithOffset() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("offset", 2)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.greaterThanOrEqualTo(1))
.body("$.size()", Matchers.lessThanOrEqualTo(resultsetLimit));
}
@Test
void findAllWithLimitAndOffset() {
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.queryParam("limit", 3)
.queryParam("offset", 2)
.when()
.get(ResourcePath)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("$.size()", Matchers.equalTo(3));
}
@Test
void findById() {
// Get the first 'id' available
int id = get(ResourcePath + "?limit=1")
.then()
.extract()
.path("[0].id");
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath + "/{id}", id)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("id", Matchers.equalTo(id));
}
}

The integration tests use Rest Assured’s given-when-then pattern of testing, made popular as part of Behavior-Driven Development (BDD). In addition, each test uses the JUnit’s @Test annotation. According to the documentation, this annotation signals that the annotated method is a test method. Therefore, methods using this annotation must not be private or static and must not return a value.

@Test
void findById() {
// Get the first 'id' available
int id = get(ResourcePath + "?limit=1")
.then()
.extract()
.path("[0].id");
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.when()
.get(ResourcePath + "/{id}", id)
.then()
.assertThat()
.contentType(JSON)
.statusCode(200)
.body("id", Matchers.equalTo(id));
}

Run the integration tests using Gradle from the project’s root: ./gradlew clean build test. A detailed ‘Test Summary’ is produced in the project’s build directory as HTML for easy review.

Test Details
Test Details

Load Testing the Service

In Production, the Spring Boot service will need to handle multiple concurrent users executing queries against Amazon Athena.

Athena’s Recent Queries console shows multi concurrent queries being queued and executed

We could use various load testing tools to evaluate the service’s ability to handle multiple concurrent users. One of the simplest is my favorite go-based utility, hey, which sends load to a URL using a provided number of requests in the provided concurrency level and prints stats. It also supports HTTP2 endpoints. So, for example, we could execute 500 HTTP requests with a concurrency level of 25 against the Spring Boot service’s /users endpoint using hey. The post’s integration tests were run against three Kubernetes replica pods of the service deployed to Amazon EKS.

hey -n 500 -c 25 -T "application/json;charset=UTF-8" \
-h2 https://athena.example-api.com/v1/users

From Athena’s Recent Queries console, we see many simultaneous queries being queued and executed by a hey through the Spring Boot service’s endpoint.

Athena’s Recent Queries console shows simultaneous queries being queued and executed

Metrics

The Spring Boot service implements the micrometer-registry-prometheus extension. The Micrometer metrics library exposes runtime and application metrics. Micrometer defines a core library, providing a registration mechanism for metrics and core metric types. These metrics are exposed by the service’s /v1/actuator/prometheus endpoint.

Metrics exposed using the Prometheus endpoint

Using the Micrometer extension, metrics exposed by the /v1/actuator/prometheus endpoint can be scraped and visualized by tools such as Prometheus. Conveniently, AWS offers the fully-managed Amazon Managed Service for Prometheus (AMP), which easily integrates with Amazon EKS.

Graph of HTTP server requests scraped by Prometheus from the Spring Boot service

Using Prometheus as a datasource, we can build dashboards in Grafana to observe the service’s metrics. Like AMP, AWS also offers the fully-managed Amazon Managed Grafana (AMG).

Grafana dashboard showing metrics from Prometheus for Spring Boot service deployed to Amazon EKS
Grafana dashboard showing JVM metrics from Prometheus for Spring Boot service deployed to Amazon EKS

Conclusion

This post taught us how to create a Spring Boot RESTful Web Service, allowing end-user applications to securely query data stored in a data lake on AWS. The service used AWS SDK for Java to access data stored in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author unless otherwise noted.

, , , , , ,

Leave a comment

Data Preparation on AWS: Comparing Available ELT Options to Cleanse and Normalize Data

Comparing the features and performance of different AWS analytics services for Extract, Load, Transform (ELT)

Introduction

According to Wikipedia, “Extract, load, transform (ELT) is an alternative to extract, transform, load (ETL) used with data lake implementations. In contrast to ETL, in ELT models the data is not transformed on entry to the data lake but stored in its original raw format. This enables faster loading times. However, ELT requires sufficient processing power within the data processing engine to carry out the transformation on demand, to return the results in a timely manner.

As capital investments and customer demand continue to drive the growth of the cloud-based analytics market, the choice of tools seems endless, and that can be a problem. Customers face a constant barrage of commercial and open-source tools for their batch, streaming, and interactive exploratory data analytics needs. The major Cloud Service Providers (CSPs) have even grown to a point where they now offer multiple services to accomplish similar analytics tasks.

This post will examine the choice of analytics services available on AWS capable of performing ELT. Specifically, this post will compare the features and performance of AWS Glue Studio, Amazon Glue DataBrew, Amazon Athena, and Amazon EMR using multiple ELT use cases and service configurations.

Data pipeline architecture showing a choice of AWS ELT services

Analytics Use Case

We will address a simple yet common analytics challenge for this comparison — preparing a nightly data feed for analysis the next day. Each night a batch of approximately 1.2 GB of raw CSV-format healthcare data will be exported from a Patient Administration System (PAS) and uploaded to Amazon S3. The data must be cleansed, deduplicated, refined, normalized, and made available to the Data Science team the following morning. The team of Data Scientists will perform complex data analytics on the data and build machine learning models designed for early disease detection and prevention.

Sample Dataset

The dataset used for this comparison is generated by Synthea, an open-source patient population simulation. The high-quality, synthetic, realistic patient data and associated health records cover every aspect of healthcare. The dataset contains the patient-related healthcare history for allergies, care plans, conditions, devices, encounters, imaging studies, immunizations, medications, observations, organizations, patients, payers, procedures, providers, and supplies.

The Synthea dataset was first introduced in my March 2021 post examining the handling of sensitive PII data using Amazon Macie: Data Lakes: Discovery, Security, and Privacy of Sensitive Data.

The Synthea synthetic patient data is available in different record volumes and various data formats, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Since this post seeks to measure the performance of different AWS ELT-capable services, we will use a larger version of the Synthea dataset containing hundreds of thousands to millions of records.

AWS Glue Data Catalog

The dataset comprises nine uncompressed CSV files uploaded to Amazon S3 and cataloged to an AWS Glue Data Catalog, a persistent metadata store, using an AWS Glue Crawler.

Raw Synthea CSV data, in S3, cataloged in AWS Glue Data Catalog

Test Cases

We will use three data preparation test cases based on the Synthea dataset to examine the different AWS ELT-capable services.

Specifications for three different test cases

Test Case 1: Encounters for Symptom

An encounter is a health care contact between the patient and the provider responsible for diagnosing and treating the patient. In our first test case, we will process 1.26M encounters records for an ongoing study of patient symptoms by our Data Science team.

id date patient code description reasoncode reasondescription
714fd61a-f9fd-43ff-87b9-3cc45a3f1e53 2014-01-09 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
23e07532-8b96-4d05-b14e-d4c5a5288ed2 2014-08-18 33f33990-ae8b-4be8-938f-e47ad473abfe 185349003 Outpatient Encounter
45044100-aaba-4209-8ad1-15383c76842d 2015-07-12 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 36971009 Sinusitis (disorder)
ffdddbfb-35e8-4a74-a801-89e97feed2f3 2014-08-12 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
352d1693-591a-4615-9b1b-f145648f49cc 2016-05-25 36d131ee-dd5b-4acb-acbe-19961c32c099 185349003 Outpatient Encounter
4620bd2f-8010-46a9-82ab-8f25eb621c37 2016-10-07 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 195662009 Acute viral pharyngitis (disorder)
815494d8-2570-4918-a8de-fd4000d8100f 2010-08-02 660bec03-9e58-47f2-98b9-2f1c564f3838 698314001 Consultation for treatment
67ec5c2d-f41e-4538-adbe-8c06c71ddc35 2010-11-22 660bec03-9e58-47f2-98b9-2f1c564f3838 170258001 Outpatient Encounter
dbe481ce-b961-4f43-ac0a-07fa8cfa8bdd 2012-11-21 660bec03-9e58-47f2-98b9-2f1c564f3838 50849002 Emergency room admission
b5f1ab7e-5e67-4070-bcf0-52451eb20551 2013-12-04 660bec03-9e58-47f2-98b9-2f1c564f3838 185345009 Encounter for symptom 10509002 Acute bronchitis (disorder)
view raw encounters.csv hosted with ❤ by GitHub
Sample of raw encounters data

Data preparation includes the following steps:

  1. Load 1.26M encounter records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Select only the records where the description column contains “Encounter for symptom.”
  4. Remove any rows with an empty reasoncodes column.
  5. Extract a new year, month, and day column from the date column.
  6. Remove the date column.
  7. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  8. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  9. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 2: Observations

Clinical observations ensure that treatment plans are up-to-date and correctly administered and allow healthcare staff to carry out timely and regular bedside assessments. We will process 5.38M encounters records for our Data Science team in our second test case.

date patient encounter code description value units
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8302-2 Body Height 175.76 cm
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 29463-7 Body Weight 56.51 kg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 39156-5 Body Mass Index 18.29 kg/m2
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8480-6 Systolic Blood Pressure 119.0 mmHg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8462-4 Diastolic Blood Pressure 77.0 mmHg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8302-2 Body Height 177.25 cm
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 29463-7 Body Weight 59.87 kg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 39156-5 Body Mass Index 19.05 kg/m2
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8480-6 Systolic Blood Pressure 113.0 mmHg
2012-03-26 36d131ee-dd5b-4acb-acbe-19961c32c099 296a1fd4-56de-451c-a5fe-b50f9a18472d 8302-2 Body Height 174.17 cm
Sample of raw observations data

Data preparation includes the following steps:

  1. Load 5.38M observation records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Extract a new year, month, and day column from the date column.
  4. Remove the date column.
  5. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  6. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  7. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 3: Sinusitis Study

A medical condition is a broad term that includes all diseases, lesions, and disorders. In our second test case, we will join the conditions records with the patient records and filter for any condition containing the term ‘sinusitis’ in preparation for our Data Science team.

start stop patient encounter code description
2012-09-05 2012-10-16 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 05a6ef43-d690-455e-ab2f-1ea19d902274 44465007 Sprain of ankle
2014-09-08 2014-09-28 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 1cdcbe46-caaf-4b3f-b58c-9ca9ccb13013 283371005 Laceration of forearm
2014-11-28 2014-12-13 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 b222e257-98da-4a1b-a46c-45d5ad01bbdc 195662009 Acute viral pharyngitis (disorder)
1980-01-09 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 40055000 Chronic sinusitis (disorder)
1989-06-25 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 201834006 Localized primary osteoarthritis of the hand
1996-01-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 196416002 Impacted molars
2016-02-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 748cda45-c267-46b2-b00d-3b405a44094e 15777000 Prediabetes
2016-04-27 2016-05-20 01858c8d-f81c-4a95-ab4f-bd79fb62b284 a64734f1-5b21-4a59-b2e8-ebfdb9058f8b 444814009 Viral sinusitis (disorder)
2014-02-06 2014-02-19 d32e9ad2-4ea1-4bb9-925d-c00fe85851ae c64d3637-8922-4531-bba5-f3051ece6354 43878008 Streptococcal sore throat (disorder)
1982-05-18 08858d24-52f2-41dd-9fe9-cbf1f77b28b2 3fff3d52-a769-475f-b01b-12622f4fee17 368581000119106 Neuropathy due to type 2 diabetes mellitus (disorder)
view raw conditions.csv hosted with ❤ by GitHub
Sample of raw conditions data

Data preparation includes the following steps:

  1. Load 483k condition records using the existing AWS Glue Data Catalog table.
  2. Inner join the condition records with the 132k patient records based on patient ID.
  3. Remove any duplicate records.
  4. Drop approximately 15 unneeded columns.
  5. Select only the records where the description column contains the term “sinusitis.”
  6. Remove any rows with empty ethnicity, race, gender, or marital columns.
  7. Create a new column, condition_age, based on a calculation of the age in days at which the patient’s condition was diagnosed.
  8. Write the resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet-format files. No partitions are necessary.
  9. Given the small resultset, bucket the data such that only one file is written to minimize the impact of too many small files on future query performance.
  10. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog.

AWS ELT Options

There are numerous options on AWS to handle the batch transformation use case described above; a non-exhaustive list includes:

  1. AWS Glue Studio (UI-driven with AWS Glue PySpark Extensions)
  2. Amazon Glue DataBrew
  3. Amazon Athena
  4. Amazon EMR with Apache Spark
  5. AWS Glue Studio (Apache Spark script)
  6. AWS Glue Jobs (Legacy jobs)
  7. Amazon EMR with Presto
  8. Amazon EMR with Trino
  9. Amazon EMR with Hive
  10. AWS Step Functions and AWS Lambda
  11. Amazon Redshift Spectrum
  12. Partner solutions on AWS, such as Databricks, Snowflake, Upsolver, StreamSets, Stitch, and Fivetran
  13. Self-managed custom solutions using a combination of OSS, such as dbt, Airbyte, Dagster, Meltano, Apache NiFi, Apache Drill, Apache Beam, Pandas, Apache Airflow, and Kubernetes

For this comparison, we will choose the first five options listed above to develop our ELT data preparation pipelines: AWS Glue Studio (UI-driven job creation with AWS Glue PySpark Extensions), Amazon Glue DataBrew, Amazon Athena, Amazon EMR with Apache Spark, and AWS Glue Studio (Apache Spark script).

Data pipeline architecture showing a choice of AWS ELT services

AWS Glue Studio

According to the documentation, “AWS Glue Studio is a new graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. You can visually compose data transformation workflows and seamlessly run them on AWS Glue’s Apache Spark-based serverless ETL engine. You can inspect the schema and data results in each step of the job.

AWS Glue Studio’s visual job creation capability uses the AWS Glue PySpark Extensions, an extension of the PySpark Python dialect for scripting ETL jobs. The extensions provide easier integration with AWS Glue Data Catalog and other AWS-managed data services. As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Spark scripts with AWS Glue Studio. In fact, we can use the exact same scripts run on Amazon EMR.

For the tests, we are using the G.2X worker type, Glue version 3.0 (Spark 3.1.1 and Python 3.7), and Python as the language choice for this comparison. We will test three worker configurations using both UI-driven job creation with AWS Glue PySpark Extensions and Apache Spark script options:

  • 10 workers with a maximum of 20 DPUs
  • 20 workers with a maximum of 40 DPUs
  • 40 workers with a maximum of 80 DPUs
AWS Glue Studio visual job creation UI for Test Case 3: Sinusitis Study

AWS Glue Studio Spark job details for Test Case 2: Observations

AWS Glue Studio job runs for Test Case 2: Observations

AWS Glue DataBrew

According to the documentation, “AWS Glue DataBrew is a visual data preparation tool that enables users to clean and normalize data without writing any code. Using DataBrew helps reduce the time it takes to prepare data for analytics and machine learning (ML) by up to 80 percent, compared to custom-developed data preparation. You can choose from over 250 ready-made transformations to automate data preparation tasks, such as filtering anomalies, converting data to standard formats, and correcting invalid values.

DataBrew allows you to set the maximum number of DataBrew nodes that can be allocated when a job runs. For this comparison, we will test three different node configurations:

  • 3 maximum nodes
  • 10 maximum nodes
  • 20 maximum nodes
AWS Glue DataBrew Project for Test Case 3: Sinusitis Study

AWS Glue DataBrew Recipe for Test Case 1: Encounters for Symptom

AWS Glue DataBrew recipe job runs for Test Case 1: Encounters for Symptom

Amazon Athena

According to the documentation, “Athena helps you analyze unstructured, semi-structured, and structured data stored in Amazon S3. Examples include CSV, JSON, or columnar data formats such as Apache Parquet and Apache ORC. You can use Athena to run ad-hoc queries using ANSI SQL, without the need to aggregate or load the data into Athena.

Although Athena is classified as an ad-hoc query engine, using a CREATE TABLE AS SELECT (CTAS) query, we can create a new table in the AWS Glue Data Catalog and write to Amazon S3 from the results of a SELECT statement from another query. That other query statement performs a transformation on the data using SQL.

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 2: Observations

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 3: Sinusitis Study

Amazon Athena is a fully managed AWS service and has no performance settings to adjust or monitor.

Amazon Athena CTAS statement for Test Case 1: Encounters for Symptom

Parquet data partitioned by year in Amazon S3 for Test Case 1: Encounters for Symptom, using Athena

CTAS and Partitions

A notable limitation of Amazon Athena for the batch use case is the 100 partition limit with CTAS queries. Athena [only] supports writing to 100 unique partition and bucket combinations with CTAS. Partitioned by year, month, and day, the observations test case requires 2,558 partitions, and the observations test case requires 10,433 partitions. There is a recommended workaround using an INSERT INTO statement. However, the workaround requires additional SQL logic, computation, and most important cost. It is not practical, in my opinion, compared to other methods when a higher number of partitions are needed. To avoid the partition limit with CTAS, we will only partition by year and bucket by month when using Athena. Take this limitation into account when comparing the final results.

Amazon EMR with Apache Spark

According to the documentation, “Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. You can quickly and easily create managed Spark clusters from the AWS Management Console, AWS CLI, or the Amazon EMR API.

For this comparison, we are using two different Spark 3.1.2 EMR clusters:

  • (1) r5.xlarge Master node and (2) r5.2xlarge Core nodes
  • (1) r5.2xlarge Master node and (4) r5.2xlarge Core nodes

All Spark jobs are written in both Python (PySpark) and Scala. We are using the AWS Glue Data Catalog as the metastore for Spark SQL instead of Apache Hive.

4-node Amazon EMR cluster shown in Amazon EMR Management Console

Completed EMR Steps (Spark Jobs) on 4-node Amazon EMR cluster

# Purpose: Process data for sinusitis study using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "sinusitis_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
patient,
code,
description,
datediff(
date(substr(start, 1, 10)),
date(substr(birthdate, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions as c, patients as p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.bucketBy(1, "patient") \
.sortBy("patient", "code") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 3: Sinusitis Study

# Purpose: Process encounters dataset using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "encounter_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition",
"true") \
.config("hive.exec.dynamic.partition.mode",
"nonstrict") \
.config("hive.exec.max.dynamic.partitions",
"10000") \
.config("hive.exec.max.dynamic.partitions.pernode",
"10000") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
id,
patient,
code,
description,
reasoncode,
reasondescription,
year(date) as year,
month(date) as month,
day(date) as day
FROM encounters
WHERE description='Encounter for symptom';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.partitionBy("year", "month", "day") \
.bucketBy(1, "patient") \
.sortBy("patient") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 1: Encounters for Symptom

package main.spark.demo
// Purpose: Process observations dataset using Spark on Amazon EMR with Scala
// Author: Gary A. Stafford
// Date: 2022-03-06
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object Observations {
def main(args: Array[String]): Unit = {
val (spark: SparkSession, sc: SparkContext) = createSession
performELT(spark, sc)
}
private def createSession = {
val spark: SparkSession = SparkSession.builder
.appName("Observations ELT App")
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
.config("hive.exec.dynamic.partition",
"true")
.config("hive.exec.dynamic.partition.mode",
"nonstrict")
.config("hive.exec.max.dynamic.partitions",
"10000")
.config("hive.exec.max.dynamic.partitions.pernode",
"10000")
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("INFO")
(spark, sc)
}
private def performELT(spark: SparkSession, sc: SparkContext) = {
val tableName: String = sc.getConf.get("spark.executorEnv.TABLE_NAME")
val dataLakeBucket: String = sc.getConf.get("spark.executorEnv.DATA_LAKE_BUCKET")
spark.sql("USE synthea_patient_big_data;")
val sql_query_data: String =
"""
SELECT DISTINCT
patient,
encounter,
code,
description,
value,
units,
year(date) as year,
month(date) as month,
day(date) as day
FROM observations
WHERE date <> 'date';
"""
val observationsDF: DataFrame = spark
.sql(sql_query_data)
observationsDF
.coalesce(1)
.write
.partitionBy("year", "month", "day")
.bucketBy(1, "patient")
.sortBy("patient")
.mode("overwrite")
.format("parquet")
.option("path", s"s3://${dataLakeBucket}/${tableName}/")
.saveAsTable(tableName = tableName)
spark.sql(s"ALTER TABLE ${tableName} SET TBLPROPERTIES ('classification'='parquet');")
}
}
Spark jobs written in Scala had nearly identical execution times, such as Test Case 2: Observations

Partitions in the AWS Glue Data Catalog table for Test Case 1: Encounters for Symptom

Results

Data pipelines were developed and tested for each of the three test cases using the five chosen AWS ELT services and configuration variations. Each pipeline was then run 3–5 times, for a total of approximately 150 runs. The resulting AWS Glue Data Catalog table and data in Amazon S3 were deleted between each pipeline run. Each new run created a new data catalog table and wrote new results to Amazon S3. The median execution times from these tests are shown below.

Number of raw and processed records for each test case

Overall results (see details below) — lower times are better

Although we can make some general observations about the execution times of the chosen AWS services, the results are not meant to be a definitive guide to performance. An accurate comparison would require a deeper understanding of how each of these managed services works under the hood, in order to both optimize and balance their compute profiles correctly.

Amazon Athena

The Resultset column contains the final number of records written to Amazon S3 by Athena. The results contain the data pipeline’s median execution time and any additional data points.

Results for Amazon Athena data pipelines

AWS Glue Studio (AWS Glue PySpark Extensions)

Tests were run with three different configurations for AWS Glue Studio using the graphical interface for creating jobs with AWS Glue PySpark Extensions. Times for each configuration were nearly identical.

Results for data pipelines using AWS Glue Studio with AWS Glue PySpark Extensions

AWS Glue Studio (Apache PySpark script)

As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Apache Spark scripts with AWS Glue Studio. The tests were run with the same three configurations as above. The execution times compared to the Amazon EMR tests, below, are almost identical.

Results for data pipelines using PySpark scripts on AWS Glue Studio

Amazon EMR with Apache Spark

Tests were run with three different configurations for Amazon EMR with Apache Spark using PySpark. The first set of results is for the 2-node EMR cluster. The second set of results is for the 4-node cluster. The third set of results is for the same 4-node cluster in which the data was not bucketed into a single file within each partition. Compare the execution times and the number of objects against the previous set of results. Too many small files can negatively impact query performance.

Results for data pipelines using Amazon EMR with Apache Spark — times for PySpark scripts

It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had similar execution times for all three test cases.

Results for data pipelines using Amazon EMR with Apache Spark — Python vs. Scala

Amazon Glue DataBrew

Tests were run with three different configurations Amazon Glue DataBrew, including 3, 10, and 20 maximum nodes. Times for each configuration were nearly identical.

Results for data pipelines using Amazon Glue DataBrew

Observations

  1. All tested AWS services can read and write to an AWS Glue Data Catalog and the underlying datastore, Amazon S3. In addition, they all work with the most common analytics data file formats.
  2. All tested AWS services have rich APIs providing access through the AWS CLI and SDKs, which support multiple programming languages.
  3. Overall, AWS Glue Studio, using the AWS Glue PySpark Extensions, appears to be the most capable ELT tool of the five services tested and with the best performance.
  4. Both AWS Glue DataBrew and AWS Glue Studio are no-code or low-code services, democratizing access to data for non-programmers. Conversely, Amazon Athena requires knowledge of ANSI SQL, and Amazon EMR with Apache Spark requires knowledge of Scala or Python. Be cognizant of the potential trade-offs from using no-code or low-code services on observability, configuration control, and automation.
  5. Both AWS Glue DataBrew and AWS Glue Studio can write a custom Parquet writer type optimized for Dynamic Frames, GlueParquet. One potential advantage, a pre-computed schema is not required before writing.
  6. There is a slight ‘cold-start’ with Glue Studio. Studio startup times ranged from 7 seconds to 2 minutes and 4 seconds in the tests. However, the lower execution time of AWS Glue Studio compared to Amazon EMR with Spark and AWS Glue DataBrew in the tests offsets any initial cold-start time, in my opinion.
  7. Changing the maximum number of units from 3 to 10 to 20 for AWS Glue DataBrew made negligible differences in job execution times. Given the nearly identical execution times, it is unclear exactly how many units are being used by the job. More importantly, how many DataBrew node hours we are being billed for. These are some of the trade-offs with a fully-managed service — visibility and fine-tuning configuration.
  8. Similarly, with AWS Glue Studio, using either 10 workers w/ max. 20 DPUs, 20 workers w/ max. 40 DPUs, or 40 workers w/ max. 80 DPUs resulted in nearly identical executions times.
  9. Amazon Athena had the fastest execution times but is limited by the 100 partition limit for large CTAS resultsets. Athena is not practical, in my opinion, compared to other ELT methods, when a higher number of partitions are needed.
  10. It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had almost identical execution times for all three test cases.
  11. Using Amazon EMR with EC2 instances takes about 9 minutes to provision a new cluster for this comparison fully. Given nearly identical execution times to AWS Glue Studio with Apache Spark scripts, Glue has the clear advantage of nearly instantaneous startup times.
  12. AWS recently announced Amazon EMR Serverless. Although this service is still in Preview, this new version of EMR could potentially reduce or eliminate the lengthy startup time for ephemeral clusters requirements.
  13. Although not discussed, scheduling the data pipelines to run each night was a requirement for our use case. AWS Glue Studio jobs and AWS Glue DataBrew jobs are schedulable from those services. For Amazon EMR and Amazon Athena, we could use Amazon Managed Workflows for Apache Airflow (MWAA), AWS Data Pipeline, or AWS Step Functions combined with Amazon CloudWatch Events Rules to schedule the data pipelines.

Conclusion

Customers have many options for ELT — the cleansing, deduplication, refinement, and normalization of raw data. We examined chosen services on AWS, each capable of handling the analytics use case presented. The best choice of tools depends on your specific ELT use case and performance requirements.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Video Demonstration: Building a Data Lake with Apache Airflow

Build a simple Data Lake on AWS using a combination of services, including Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue, AWS Glue Studio, Amazon Athena, and Amazon S3

Introduction

In the following video demonstration, we will build a simple data lake on AWS using a combination of services, including Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue Data Catalog, AWS Glue Crawlers, AWS Glue Jobs, AWS Glue Studio, Amazon Athena, Amazon Relational Database Service (Amazon RDS), and Amazon S3.

Using a series of Airflow DAGs (Directed Acyclic Graphs), we will catalog and move data from three separate data sources into our Amazon S3-based data lake. Once in the data lake, we will perform ETL (or more accurately ELT) on the raw data — cleansing, augmenting, and preparing it for data analytics. Finally, we will perform aggregations on the refined data and write those final datasets back to our data lake. The data lake will be organized around the data lake pattern of bronze (aka raw), silver (aka refined), and gold (aka aggregated) data, popularized by Databricks.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL files, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs shown in the video demonstration have been renamed for easier project management within the Airflow UI. The DAGs included in the GitHub project are as follows:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Video Demonstration: Building a Data Lake on AWS

Build a simple Data Lake on AWS using a combination of services, including AWS Glue, AWS Glue Studio, Amazon Athena, and Amazon S3

Introduction

In the following video demonstration, we will build a simple data lake on AWS using a combination of services, including AWS Glue Data Catalog, AWS Glue Crawlers, AWS Glue Jobs, AWS Glue Studio, Amazon Athena, Amazon Relational Database Service (Amazon RDS), and Amazon S3.

We will catalog and move data from three separate data sources into our Amazon S3-based data lake. Once in the data lake, we will perform ETL (or more accurately ELT) on the raw data — cleansing, augmenting, and preparing it for data analytics. Finally, we will perform aggregations on the refined data and write those final datasets back to our data lake. The data lake will be organized around the data lake pattern of bronze (aka raw), silver (aka refined), and gold (aka aggregated) data, popularized by Databricks.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the SQL statements, is open-sourced and located on GitHub.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Video Demonstration: Ahana Cloud for Presto on AWS using Apache Hive and AWS Glue

Using Ahana Cloud for Presto to perform analytics on AWS using both Apache Hive and AWS Glue as metastores

Introduction

The following series of five videos are an extended version of the demonstration featured in the October 2021 webinar, Build an Open Data Lake on AWS with Presto. An on-demand copy of the live webinar is available on Ahana.io, featuring Dipti Borkar (Ahana Co-Founder and CPO) and I.

In the demonstration, we will build a data lake on AWS using a combination of Ahana Cloud for Presto, Apache Hive, Apache Superset, Amazon S3, AWS Glue, and Amazon Athena. We then analyze the data in Apache Superset using Ahana Cloud for Presto.

Build an Open Data Lake on AWS with Presto

Demonstration

The demonstration is divided into five YouTube videos (playlist):

Ahana Cloud for Presto Demo — Part 1/5: Public GitHub Resources

Ahana Cloud for Presto Demo — Part 2/5: MoMa Datasource

Ahana Cloud for Presto Demo — Part 3/5: Ahana SaaS

Ahana Cloud for Presto Demo — Part 4/5: AWS Glue & Amazon

Ahana Cloud for Presto Demo — Part 5/5: PrestoDB & Apache Hive

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. In the webinar and the videos, the Apache Hive and AWS Glue data catalog tables contain an _athena or _presto suffix. For clarity, in the source code, I have changed those to indicate the metastore they are associated with, _hive or _glue, since either set of tables can be queried Presto. Additionally, in the webinar and the videos, the raw data files were uploaded to Amazon S3 in uncompressed CSV format; this is unnecessary. The CTAS SQL statements both expect GZIP-compressed CSV files. To save time and cost, upload the compressed files, as they are, to Amazon S3.

The following files are used in the demonstration:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , , , ,

Leave a comment

Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 2

Introduction

In part one, we learned how to ingest, transform, and enrich raw, semi-structured data, in multiple formats, using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we will use the transformed and enriched data sources, stored in the data lake, to create compelling visualizations using Amazon QuickSight.

athena-glue-architecture-v2High-level AWS architecture diagram of the demonstration.

Background

If you recall the demonstration from part one of the post, we had adopted the persona of a large, US-based electric energy provider. The energy provider had developed and sold its next-generation Smart Electrical Monitoring Hub (Smart Hub) to residential customers. Customers can analyze their electrical usage with a fine level of granularity, per device and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.

Data Visualization and BI

The data analysis process in the demonstration was divided into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).

athena-glue-0.pngFull data analysis workflow diagram (click to enlarge…)

In the final, Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for data visualization and business intelligence, which integrate with Amazon Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, many available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo.

In this demonstration, we will focus on Amazon QuickSight. Amazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that include ML Insights. Dashboards can be accessed from any device, and embedded into your applications, portals, and websites. QuickSight serverlessly scales automatically from tens of users to tens of thousands without any infrastructure management.

Athena-Glue-4

Using QuickSight

QuickSight APIs

Amazon recently added a full set of aws quicksight APIs for interacting with QuickSight. For example, to preview the three QuickSight data sets created for this part of the demo, with the AWS CLI, we would use the list-data-sets comand.


aws quicksight list-data-sets –aws-account-id 123456789012


{
"Status": 200,
"DataSetSummaries": [
{
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/9eb88a69-20de-d8be-aefd-2c7ac4e23748",
"DataSetId": "9eb88a69-20de-d8be-aefd-2c7ac4e23748",
"Name": "etl_output_parquet",
"CreatedTime": 1578028774.897,
"LastUpdatedTime": 1578955245.02,
"ImportMode": "SPICE"
},
{
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/78e81193-189c-6dd0-864fb-a33244c9654",
"DataSetId": "78e81193-189c-6dd0-864fb-a33244c9654",
"Name": "electricity_rates_parquet",
"CreatedTime": 1578029224.996,
"LastUpdatedTime": 1578945179.472,
"ImportMode": "SPICE"
},
{
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/a474214d-c838-b384-bcca-ea1fcd2dd094",
"DataSetId": "a474214d-c838-b384-bcca-ea1fcd2dd094",
"Name": "smart_hub_locations_parquet",
"CreatedTime": 1578029124.565,
"LastUpdatedTime": 1578888788.135,
"ImportMode": "SPICE"
}
],
"RequestId": "2524e80c-7c67-7fbd-c3f1-b700c521badc"
}

To examine details of a single data set, with the AWS CLI, we would use the describe-data-set command.


aws quicksight describe-data-set \
–aws-account-id 123456789012 \
–data-set-id 9eb88a69-20de-d8be-aefd-2c7ac4e23748

QuickSight Console

However, for this final part of the demonstration, we will be working from the Amazon QuickSight Console, as opposed to the AWS CLI, AWS CDK, or CloudFormation templates.

Signing Up for QuickSight

To use Amazon QuickSight, you must sign up for QuickSight.

screen_shot_2020-01-02_at_9_07_00_pm

There are two Editions of Amazon QuickSight, Standard and Enterprise. For this demonstration, the Standard Edition will suffice.

screen_shot_2020-01-02_at_9_07_40_pm

QuickSight Data Sets

Amazon QuickSight uses Data Sets as the basis for all data visualizations. According to AWS, QuickSight data sets can be created from a wide variety of data sources, including Amazon RDS, Amazon Aurora, Amazon Redshift, Amazon Athena, and Amazon S3. You can also upload Excel spreadsheets or flat files (CSV, TSV, CLF, ELF, and JSON), connect to on-premises databases like SQL Server, MySQL, and PostgreSQL and import data from SaaS applications like Salesforce. Below, we see a list of the latest data sources available in the QuickSight New Data Set Console.

screen_shot_2020-01-12_at_8_18_05_pm_v2

Demonstration Data Sets

For the demonstration, I have created three QuickSight data sets, all based on Amazon Athena. You have two options when using Amazon Athena as a data source. The first option is to select a table from an AWS Glue Data Catalog database, such as the database we created in part one of the post, ‘smart_hub_data_catalog.’ The second option is to create a custom SQL query, based on one or more tables in an AWS Glue Data Catalog database.

screen_shot_2020-01-13_at_9_05_49_pm.png

Of the three data sets created for part two of this demonstration, two data sets use tables directly from the Data Catalog, including ‘etl_output_parquet’ and ‘electricity_rates_parquet.’ The third data set uses a custom SQL query, based on the single Data Catalog table, ‘smart_hub_locations_parquet.’ All three tables used to create the data sets represent the enriched, highly efficient Parquet-format data sources in the S3-based Data Lake.

screen_shot_2020-01-13_at_9_16_54_pm.png

Data Set Features

There are a large number of features available when creating and configuring data sets. We cannot possibly cover all of them in this post. Let’s look at three features: geospatial field types, calculated fields, and custom SQL.

Geospatial Data Types

QuickSight can intelligently detect common types of geographic fields in a data source and assign QuickSight geographic data type, including Country, County, City, Postcode, and State. QuickSight can also detect geospatial data, including Latitude and Longitude. We will take advantage of this QuickSight feature for our three data set’s data sources, including the State, Postcode, Latitude, and Longitude field types.

screen_shot_2020-01-13_at_9_53_19_pm.png

Calculated Fields

A commonly-used QuickSight data set feature is the ‘Calculated field.’ For the ‘etl_output_parquet’ data set, I have created a new field (column), cost_dollar.

screen_shot_2020-01-13_at_4_35_20_pm

The cost field is the electrical cost of the device, over a five minute time interval, in cents (¢). The calculated cost_dollar field is the quotient of the cost field divided by 100. This value represents the electrical cost of the device, over a five minute time interval, in dollars ($). This is a straightforward example. However, a calculated field can be very complex, built from multiple arithmetic, comparison, and conditional functions, string calculations, and data set fields.

screen_shot_2020-01-13_at_9_35_14_pm

Data set calculated fields can also be created and edited from the QuickSight Analysis Console (discussed later).

screen_shot_2020-01-13_at_4_45_47_pm.png

Custom SQL

The third QuickSight data set is based on an Amazon Athena custom SQL query.


SELECT lon, lat, postcode, hash, tz, state
FROM smart_hub_data_catalog.smart_hub_locations_parquet;

Although you can write queries in the QuickSight Data Prep Console, I prefer to write custom Athena queries using the Athena Query Editor. Using the Editor, you can write, run, debug, and optimize queries to ensure they function correctly, first.

screen_shot_2020-01-13_at_12_50_14_pm

The Athena query can then be pasted into the Custom SQL window. Clicking ‘Finish’ in the window is the equivalent of ‘Run query’ in the Athena Query Editor Console. The query runs and returns data.

screen_shot_2020-01-12_at_8_12_44_pm

Similar to the Athena Query Editor, queries executed in the QuickSight Data Prep Console will show up in the Athena History tab, with a /* QuickSight */ comment prefix.

screen_shot_2020-01-14_at_9_10_16_pm

SPICE

You will notice the three QuickSight data sets are labeled, ‘SPICE.’ According to AWS, the acronym, SPICE, stands for ‘Super-fast, Parallel, In-memory, Calculation Engine.’ QuickSight’s in-memory calculation engine, SPICE, achieves blazing fast performance at scale. SPICE automatically replicates data for high availability allowing thousands of users to simultaneously perform fast, interactive analysis while shielding your underlying data infrastructure, saving you time and resources. With the Standard Edition of QuickSight, as the first Author, you get 1 GB of SPICE in-memory data for free.

QuickSight Analysis

The QuickSight Analysis Console is where Analyses are created. A specific QuickSight Analysis will contain a collection of data sets and data visualizations (visuals). Each visual is associated with a single data set.

Types of QuickSight Analysis visuals include: horizontal and vertical, single and stacked bar charts, line graphs, combination charts, area line charts, scatter plots, heat maps, pie and donut charts, tree maps, pivot tables, gauges, key performance indicators (KPI), geospatial diagrams, and word clouds. Individual visual titles, legends, axis, and other visual aspects can be easily modified. Visuals can contain drill-downs.

A data set’s fields can be modified from within the Analysis Console. Field types and formats, such as date, numeric, currency fields, can be customized for display. The Analysis can include a Title and subtitle. There are some customizable themes available to change the overall look of the Analysis.

screen_shot_2020-01-15_at_12_45_37_am.png

Analysis Filters

Data displayed in the visuals can be further shaped using a combination of Filters, Conditional formatting, and Parameters. Below, we see an example of a typical filter based on a range of dates and times. The data set contains two full days’ worth of data. Here, we are filtering the data to a 14-hour peak electrical usage period, between 8 AM and 10 PM on the same day, 12/21/2019.

screen_shot_2020-01-13_at_10_46_57_pm.png

Drill-Down, Drill-Up, Focus, and Exclude

According to AWS, all visual types except pivot tables offer the ability to create a hierarchy of fields for a visual element. The hierarchy lets you drill down or up to see data at different levels of the hierarchy. Focus allows you to concentrate on a single element within a hierarchy of fields. Exclude allows you to remove an element from a hierarchy of fields. Below, we see an example of all four of these features, available to apply to the ‘Central Air Conditioner’. Since the AC unit is the largest consumer of electricity on average per day, applying these filters to understand its impact on the overall electrical usage may be useful to an analysis. We can also drill down to minutes from hours or up to days from hours.

screen_shot_2020-01-15_at_12_56_54_am.png

Example QuickSight Analysis

A QuickSight Analysis is shared by the Analysis Author as a QuickSight Dashboard. Below, we see an example of a QuickSight Dashboard, built and shared for this demonstration. The ‘Residential Electrical Usage Analysis’ is built from the three data sets created earlier. From those data sets, we have constructed several visuals, including a geospatial diagram, donut chart, heat map, KPI, combination chart, stacked vertical bar chart, and line graph. Each visual’s title, layout, and field display has all customized. The data displayed in the visuals have been filtered differently, including by date and time, by customer id (loc_id), and by state. Conditional formatting is used to enhance the visual appearance of visuals, such as the ‘Total Electrical Cost’ KPI.

screen_shot_2020-01-13_at_7_57_47_pm_v4.png

Conclusion

In part one, we learned how to ingest, transform, and enrich raw, semi-structured data, in multiple formats, using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we used the transformed and enriched datasets, stored in the data lake, to create compelling visualizations using Amazon QuickSight.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , ,

3 Comments

Executing Amazon Athena Queries from JetBrains PyCharm

 

Amazon Athena

According to Amazon, Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Amazon Athena supports and works with a variety of popular data file formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet.

The underlying technology behind Amazon Athena is Presto, the popular, open-source distributed SQL query engine for big data, created by Facebook. According to AWS, the Athena query engine is based on Presto 0.172. Athena is ideal for quick, ad-hoc querying, but it can also handle complex analysis, including large joins, window functions, and arrays. In addition to Presto, Athena also uses Apache Hive to define tables.

screen_shot_2020-01-05_at_10_32_25_am

Athena Query Editor

In the previous post, Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight, we used the Athena Query Editor to construct and test SQL queries against semi-structured data in an S3-based Data Lake. The Athena Query Editor has many of the basic features Data Engineers and Analysts expect, including SQL syntax highlighting, code auto-completion, and query formatting. Queries can be run directly from the Editor, saved for future reference, and query results downloaded. The Editor can convert SELECT queries to CREATE TABLE AS (CTAS) and CREATE VIEW AS statements. Access to AWS Glue data sources is also available from within the Editor.

Full-Featured IDE

Although the Athena Query Editor is fairly functional, many Engineers perform a majority of their software development work in a fuller-featured IDE. The choice of IDE may depend on one’s predominant programming language. According to the PYPL Index, the ten most popular, current IDEs are:

  1. Microsoft Visual Studio
  2. Android Studio
  3. Eclipse
  4. Visual Studio Code
  5. Apache NetBeans
  6. JetBrains PyCharm
  7. JetBrains IntelliJ
  8. Apple Xcode
  9. Sublime Text
  10. Atom

Within the domains of data science, big data analytics, and data analysis, languages such as SQL, Python, Java, Scala, and R are common. Although I work in a variety of IDEs, my go-to choices are JetBrains PyCharm for Python (including for PySpark and Jupyter Notebook development) and JetBrains IntelliJ for Java and Scala (including Apache Spark development). Both these IDEs also support many common SQL-based technologies, out-of-the-box, and are easily extendable to add new technologies.

jetbrains.png

Athena Integration with PyCharm

Utilizing the extensibility of the JetBrains suite of professional development IDEs, it is simple to add Amazon Athena to the list of available database drivers and make JDBC (Java Database Connectivity) connections to Athena instances on AWS.

Downloading the Athena JDBC Driver

To start, download the Athena JDBC Driver from Amazon. There are two versions, based on your choice of Java JDKs. Considering Java 8 was released six years ago (March 2014), most users will likely want the AthenaJDBC42-2.0.9.jar is compatible with JDBC 4.2 and JDK 8.0 or later.

screen_shot_2020-01-06_at_9_28_14_pm

Installation Guide

AWS also supplies a JDBC Driver Installation and Configuration Guide. The guide, as well as the Athena JDBC and ODBC Drivers, are produced by Simba Technologies (acquired by Magnitude Software). Instructions for creating an Athena Driver starts on page 23.

screen_shot_2020-01-06_at_9_28_27_pm

Creating a New Athena Driver

From PyCharm’s Database Tool Window, select the Drivers dialog box, select the downloaded Athena JDBC Driver JAR. Select com.simba.athena.jdbc.Driver in the Class dropdown. Name the Driver, ‘Amazon Athena.’

screen_shot_2020-01-06_at_10_06_58_pm

You can configure the Athena Driver further, using the Options and Advanced tabs.

screen_shot_2020-01-11_at_8.25.22_pm

Creating a New Athena Data Source

From PyCharm’s Database Tool Window, select the Data Source dialog box to create a new connection to your Athena instance. Choose ‘Amazon Athena’ from the list of available Database Drivers.

screen_shot_2020-01-08_at_3_47_48_pm

You will need four items to create an Athena Data Source:

  1. Your IAM User Access Key ID
  2. Your IAM User Secret Access Key
  3. The AWS Region of your Athena instance (e.g., us-east-1)
  4. An existing S3 bucket location to store query results

The Athena connection URL is a combination of the AWS Region and the S3 bucket, items 3 and 4, above. The format of the Athena connection URL is as follows.

jdbc:awsathena://AwsRegion=your-region;S3OutputLocation=s3://your-bucket-name/query-results-path

Give the new Athena Data Source a logical Name, input the User (Access Key ID), Password (Secret Access Key), and the Athena URL. To test the Athena Data Source, use the ‘Test Connection’ button.

screen_shot_2020-01-06_at_10_10_03_pm

You can create multiple Athena Data Sources using the Athena Driver. For example, you may have separate Development, Test, and Production instances of Athena, each in a different AWS Account.

Data Access

Once a successful connection has been made, switching to the Schemas tab, you should see a list of available AWS Glue Data Catalog databases. Below, we see the AWS Glue Catalog, which we created in the prior post. This Glue Data Catalog database contains ten metadata tables, each corresponding to a semi-structured, file-based data source in an S3-based data lake.

In the example below, I have chosen to limit the new Athena Data Source to a single Data Catalog database, to which the Data Source’s IAM User has access. Applying the core AWS security principle of granting least privilege, IAM Users should only have the permissions required to perform a specific set of approved tasks. This principle applies to the Glue Data Catalog databases, metadata tables, and the underlying S3 data sources.

screen_shot_2020-01-06_at_10_11_03_pm.png

Querying Athena from PyCharm

From within the PyCharm’s Database Tool Window, you should now see a list of the metadata tables defined in your AWS Glue Data Catalog database(s), as well as the individual columns within each table.

screen_shot_2020-01-06_at_10_12_18_pm

Similar to the Athena Query Editor, you can write SQL queries against the database tables in PyCharm. Like the Athena Query Editor, PyCharm has standard features SQL syntax highlighting, code auto-completion, and query formatting. Right-click on the Athena Data Source and choose New, then Console, to start.

screen_shot_2020-01-08_at_3_46_01_pm

Be mindful when writing queries and searching the Internet for SQL references, the Athena query engine is based on Presto 0.172. The current version of Presto, 0.234, is more than 50 releases ahead of the current Athena version. Both Athena and Presto functionality continue to change and diverge. There are also additional considerations and limitations for SQL queries in Athena to be aware of.

Whereas the Athena Query Editor is limited to only one query per query tab, in PyCharm, we can write and run multiple SQL queries in the same console window and have multiple console sessions opened to Athena at the same time.

screen_shot_2020-01-06_at_10_41_05_pm

By default, PyCharm’s query results are limited to the first ten rows of data. The number of rows displayed, as well as many other preferences, can be changed in the PyCharm’s Database Preferences dialog box.

screen_shot_2020-01-06_at_10_15_34_pm

Saving Queries and Exporting Results

In PyCharm, Athena queries can be saved as part of your PyCharm projects, as .sql files. Whereas the Athena Query Editor is limited to CSV, in PyCharm, query results can be exported in a variety of standard data file formats.

screen_shot_2020-01-08_at_3_43_39_pm

Athena Query History

All Athena queries ran from PyCharm are recorded in the History tab of the Athena Console. Although PyCharm shows query run times, the Athena History tab also displays the amount of data scanned. Knowing the query run time and volume of data scanned is useful when performance tuning queries.

screen_shot_2020-01-07_at_11_12_46_pm

Other IDEs

The technique shown for JetBrains PyCharm can also be applied to other JetBrains products, including GoLand, DataGrip, PhpStorm, and IntelliJ (shown below).

screen-shot-2020-01-08-at-5_35_57-pm.png

This blog represents my own view points and not of my employer, Amazon Web Services.

, , , , , ,

3 Comments

Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 1

Introduction

According to Wikipedia, data analysis is “a process of inspecting, cleansing, transforming, and modeling data with the goal of discovering useful information, informing conclusion, and supporting decision-making.” In this two-part post, we will explore how to get started with data analysis on AWS, using the serverless capabilities of Amazon Athena, AWS Glue, Amazon QuickSight, Amazon S3, and AWS Lambda. We will learn how to use these complementary services to transform, enrich, analyze, and visualize semi-structured data.

Data Analysis—discovering useful information, informing conclusion, and supporting decision-making. –Wikipedia

In part one, we will begin with raw, semi-structured data in multiple formats. We will discover how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We will build an S3-based data lake, and learn how AWS leverages open-source technologies, such as Presto, Apache Hive, and Apache Parquet. In part two, we will learn how to further analyze and visualize the data using Amazon QuickSight. Here’s a quick preview of what we will build in part one of the post.

Demonstration

In this demonstration, we will adopt the persona of a large, US-based electric energy provider. The energy provider has developed its next-generation Smart Electrical Monitoring Hub (Smart Hub). They have sold the Smart Hub to a large number of residential customers throughout the United States. The hypothetical Smart Hub wirelessly collects detailed electrical usage data from individual, smart electrical receptacles and electrical circuit meters, spread throughout the residence. Electrical usage data is encrypted and securely transmitted from the customer’s Smart Hub to the electric provider, who is running their business on AWS.

Customers are able to analyze their electrical usage with fine granularity, per device, and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.

screen_shot_2020-01-13_at_7_57_47_pm_v4.pngPreview of post’s data in Amazon QuickSight.

The original concept for the Smart Hub was developed as part of a multi-day training and hackathon, I recently attended with an AWSome group of AWS Solutions Architects in San Francisco. As a team, we developed the concept of the Smart Hub integrated with a real-time, serverless, streaming data architecture, leveraging AWS IoT Core, Amazon Kinesis, AWS Lambda, and Amazon DynamoDB.

SA_Team_PhotoFrom left: Bruno Giorgini, Mahalingam (‘Mahali’) Sivaprakasam, Gary Stafford, Amit Kumar Agrawal, and Manish Agarwal.

This post will focus on data analysis, as opposed to the real-time streaming aspect of data capture or how the data is persisted on AWS.

athena-glue-architecture-v2High-level AWS architecture diagram of the demonstration.

Featured Technologies

The following AWS services and open-source technologies are featured prominently in this post.

Athena-Glue-v2.png

Amazon S3-based Data Lake

Screen Shot 2020-01-02 at 5.09.05 PMAn Amazon S3-based Data Lake uses Amazon S3 as its primary storage platform. Amazon S3 provides an optimal foundation for a data lake because of its virtually unlimited scalability, from gigabytes to petabytes of content. Amazon S3 provides ‘11 nines’ (99.999999999%) durability. It has scalable performance, ease-of-use features, and native encryption and access control capabilities.

AWS Glue

Screen Shot 2020-01-02 at 5.11.37 PMAWS Glue is a fully managed extract, transform, and load (ETL) service to prepare and load data for analytics. AWS Glue discovers your data and stores the associated metadata (e.g., table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

AWS Glue Data Catalog

Screen Shot 2020-01-02 at 5.13.01 PM.pngThe AWS Glue Data Catalog is an Apache Hive Metastore compatible, central repository to store structural and operational metadata for data assets. For a given data set, store table definition, physical location, add business-relevant attributes, as well as track how the data has changed over time.

AWS Glue Crawler

Screen Shot 2020-01-02 at 5.14.57 PMAn AWS Glue Crawler connects to a data store, progresses through a prioritized list of classifiers to extract the schema of your data and other statistics, and then populates the Glue Data Catalog with this metadata. Crawlers can run periodically to detect the availability of new data as well as changes to existing data, including table definition changes. Crawlers automatically add new tables, new partitions to an existing table, and new versions of table definitions. You can even customize Glue Crawlers to classify your own file types.

AWS Glue ETL Job

Screen Shot 2020-01-02 at 5.11.37 PMAn AWS Glue ETL Job is the business logic that performs extract, transform, and load (ETL) work in AWS Glue. When you start a job, AWS Glue runs a script that extracts data from sources, transforms the data, and loads it into targets. AWS Glue generates a PySpark or Scala script, which runs on Apache Spark.

Amazon Athena

Screen Shot 2020-01-02 at 5.17.49 PMAmazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena supports and works with a variety of standard data formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet. Athena is integrated, out-of-the-box, with AWS Glue Data Catalog. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

The underlying technology behind Amazon Athena is Presto, the open-source distributed SQL query engine for big data, created by Facebook. According to the AWS, the Athena query engine is based on Presto 0.172 (released April 9, 2017). In addition to Presto, Athena uses Apache Hive to define tables.

Amazon QuickSight

Screen Shot 2020-01-02 at 5.18.40 PMAmazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that can then be accessed from any device, and embedded into your applications, portals, and websites.

AWS Lambda

Screen Shot 2020-01-02 at 5.25.57 PMAWS Lambda automatically runs code without requiring the provisioning or management servers. AWS Lambda automatically scales applications by running code in response to triggers. Lambda code runs in parallel. With AWS Lambda, you are charged for every 100ms your code executes and the number of times your code is triggered. You pay only for the compute time you consume.

Smart Hub Data

Everything in this post revolves around data. For the post’s demonstration, we will start with four categories of raw, synthetic data. Those data categories include Smart Hub electrical usage data, Smart Hub sensor mapping data, Smart Hub residential locations data, and electrical rate data. To demonstrate the capabilities of AWS Glue to handle multiple data formats, the four categories of raw data consist of three distinct file formats: XML, JSON, and CSV. I have attempted to incorporate as many ‘real-world’ complexities into the data without losing focus on the main subject of the post. The sample datasets are intentionally small to keep your AWS costs to a minimum for the demonstration.

To further reduce costs, we will use a variety of data partitioning schemes. According to AWS, by partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost. We have very little data for the demonstration, in which case partitioning may negatively impact query performance. However, in a ‘real-world’ scenario, there would be millions of potential residential customers generating terabytes of data. In that case, data partitioning would be essential for both cost and performance.

Smart Hub Electrical Usage Data

The Smart Hub’s time-series electrical usage data is collected from the customer’s Smart Hub. In the demonstration’s sample electrical usage data, each row represents a completely arbitrary five-minute time interval. There are a total of ten electrical sensors whose electrical usage in kilowatt-hours (kW) is recorded and transmitted. Each Smart Hub records and transmits electrical usage for 10 device sensors, 288 times per day (24 hr / 5 min intervals), for a total of 2,880 data points per day, per Smart Hub. There are two days worth of usage data for the demonstration, for a total of 5,760 data points. The data is stored in JSON Lines format. The usage data will be partitioned in the Amazon S3-based data lake by date (e.g., ‘dt=2019-12-21’).


{"loc_id":"b6a8d42425fde548","ts":1576915200,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,