Archive for category SQL

Lakehouse Data Modeling using dbt, Amazon Redshift, Redshift Spectrum, and AWS Glue

Learn how dbt makes it easy to transform data and materialize models in a modern cloud data lakehouse built on AWS

Introduction

Data lakes have grabbed much of the analytics community’s attention in recent years, thanks to an overabundance of VC-backed analytics startups and marketing dollars. Nonetheless, data warehouses, specifically modern cloud data warehouses, continue to gain market share, led by SnowflakeAmazon RedshiftGoogle Cloud BigQuery, and Microsoft’s Azure Synapse Analytics.

Several factors have fostered the renewed interest and appeal of data warehouses, including the data lakehouse architecture. According to Databricks, “a lakehouse is a new, open architecture that combines the best elements of data lakes and data warehouses. Lakehouses are enabled by a new system design: implementing similar data structures and data management features to those in a data warehouse directly on top of low-cost cloud storage in open formats.” Similarly, Snowflake describes a lakehouse as “a data solution concept that combines elements of the data warehouse with those of the data lake. Data lakehouses implement data warehouses’ data structures and management features for data lakes, which are typically more cost-effective for data storage.

dbt

In the following post, we will explore the use of dbt (data build tool), developed by dbt Labs, to transform data in an AWS-based data lakehouse, built with Amazon Redshift, Redshift Spectrum, AWS Glue, and Amazon S3. According to dbt Labs, “dbt enables analytics engineers to transform data in their warehouses by simply writing select statements. dbt handles turning these select statements into tables and views.” Further, “dbt does the T in ELT (Extract, Load, Transform) processes — it doesn’t extract or load data, but it’s extremely good at transforming data that’s already loaded into your warehouse.

This post’s project, displayed in dbt Cloud

Amazon Redshift

According to AWS, “Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes using AWS-designed hardware and machine learning to deliver the best price-performance at any scale.” AWS claims Amazon Redshift is the most widely used cloud data warehouse.

Amazon Redshift Spectrum

According to AWS, “Redshift Spectrum allows you to efficiently query and retrieve structured and semi-structured data from files in Amazon S3 without having to load the data into Amazon Redshift tables.” Redshift Spectrum tables define the data structure for the files in Amazon S3. The external tables exist in an external data catalog, which can be AWS Glue, the data catalog that comes with Amazon Athena, or an Apache Hive metastore.

dbt can interact with Amazon Redshift Spectrum to create external tables, refresh external table partitions, and access raw data in an Amazon S3-based data lake from the data warehouse. We will use dbt along with the dbt package, dbt_external_tables, to create the external tables in an AWS Glue data catalog.

Prerequisites

Prerequisites to follow along with this post’s demonstration include:

  • Amazon S3 bucket to store raw data;
  • Amazon Redshift or Amazon Redshift Serverless cluster;
  • AWS IAM Role with permissions to Amazon Redshift, Amazon S3, and AWS Glue;
  • dbt Cloud account;
  • dbt CLI (dbt Core) and dbt Amazon Redshift adapter installed locally;
  • Microsoft Visual Studio Code (VS Code) with dbt extensions installed;

The post’s demonstration uses dbt Cloud, VS Code, and the dbt CLI interchangeably with the project’s GitHub repository as a source. Follow along with the demonstration using any or all of these three dbt options.

Example of this project in VS Code with dbt extensions installed

Cost Warning!

Be careful when creating a new, provisioned Amazon Redshift cluster for this demonstration. The suggested default Production cluster with two ra3.4xlarge on-demand compute nodes and AQUA (Redshift’s Advanced Query Accelerator) enabled is estimated at $4,694/month ($3.26/node/hour). For this demonstration, choose the minimum size provisioned Redshift cluster configuration of one dc2.large on-demand compute node, estimated to cost $180/month ($0.25/node/hour). Be sure to delete the cluster when the demonstration is complete.

Creating a new Amazon Redshift cluster

Amazon Redshift Serverless Option

AWS recently announced the general availability (GA) of Amazon Redshift Serverless on July 12, 2022. Amazon Redshift Serverless allows data analysts, developers, and data scientists to run and scale analytics without having to provision and manage data warehouse clusters. dbt is fully compatible with Amazon Redshift Serverless and is an alternative to provisioned Redshift for this demonstration. According to AWS, Amazon Redshift Serverless measures data warehouse capacity in Redshift Processing Units (RPUs). You pay for the workloads you run in RPU-hours on a per-second basis (with a 60-second minimum charge), including queries that access data in open file formats in Amazon S3.

Source Code

All the source code demonstrated in this post is open source and available on GitHub.

Sample Data

This demonstration uses the TICKIT sample database provided by AWS and designed for use with Amazon Redshift. This sample database application tracks sales activity for the fictional online TICKIT website, where users buy and sell tickets for sporting events, shows, and concerts. The database consists of seven tables in a star schema: five dimension tables and two fact tables. A clean copy of the raw TICKIT data, formatted as pipe-delimited text files, is included in this GitHub project. Use the following shell commands to copy the raw data to Amazon S3:

unzip tickit_data.zip
s3_bucket="open-data-lake-demo-us-east-1"
declare -a TableArray=("category" "date" "event" "listing" "sale" "user" "venue")
for table in "${TableArray[@]}"
do
aws s3 cp ./raw_tickit_data/$table.txt s3://$s3_bucket/$table/
done

Prepare Amazon Redshift for dbt

Create New Database

Create a new Redshift database to use for the demonstration, demo.

create database demo
with owner awsuser;

Create Database Schemas

Within the new Redshift database,demo, create the external schema, tickit_external, and the corresponding external AWS Glue Data Catalogtickit_dbt, using the CREATE EXTERNAL SCHEMA Redshift SQL command. Make sure to update the command to reflect your IAM Role’s ARN. Next, create the schema that will hold our dbt models, tickit_dbt. Lastly, as a security best practice, drop the default public schema.

create external schema tickit_external
from data catalog
database 'tickit_dbt'
iam_role 'arn:aws:iam::<your_account_id>:role/<your_cluster_permissions_role>'
create external database if not exists;
create schema tickit_dbt;
drop schema public;

From the AWS Glue console, we should observe a new tickit_dbt AWS Glue Data Catalog. The description shown below was manually added after the catalog was created.

Newly created AWS Glue Data Catalog

Create dbt Database User and Group

As a security best practice, create a separate database dbt user and dbt group. We are assigning a completely arbitrary connection limit of ten. Then, apply the grants to allow the dbt group access to the new database and schemas. Lastly, change the two schema’s owners to the dbt.

create user dbt with password '<your_password>'
nocreatedb nocreateuser syslog access restricted
connection limit 10;
create group dbt with user dbt;
external tables
grant usage on schema tickit_external to group dbt;
grant create on schema tickit_external to group dbt;
grant all on all tables in schema tickit_external to group dbt;
dbt models: tables and view
grant usage on schema tickit_dbt to group dbt;
grant create on schema tickit_dbt to group dbt;
grant all on all tables in schema tickit_dbt to group dbt;
reassign schema ownership to dbt
alter schema tickit_dbt owner to dbt;
alter schema tickit_external owner to dbt;

Alternately, we could use an IAM Role with a SAML 2.0-compliant IdP.

Initialize and Configure dbt for Redshift

Next, configure your dbt Cloud account and dbt locally with your Amazon Redshift connection information using the dbt init command. On a Mac, this configuration is stored in the /Users/<your_usernama>/.dbt/profiles.yml file. You will need your Redshift cluster host URL, port, database, username, and password. With your local install of dbt, we can use the dbt debug command to confirm the new configuration.

Confirming configuration using dbt debug command

Project Structure

The GitHub project structure follows many of the best practices outlined in dbt Labs’ Best Practice Guide. Data models in the models directory is organized into the recommended stagingintermediate, and marts subdirectories (aka layers).

Project structure for data models

From a data lineage perspective, in this project, the staging layer’s data models depend on the external tables (AWS Glue/Amazon Redshift Spectrum). The intermediate layer’s data models depend on the staging models. The marts layer’s data models depend on staging and intermediate models.

dbt Cloud’s Lineage Graph showing an example of data model relationships

Install dbt Packages

The GitHub project’s packages.yml contains a few commonly recommended packages. The only one required for this post is the dbt-labs/dbt_external_tables package. Make sure your project is referring to the latest version of the package.

packages:
package: dbt-labs/dbt_external_tables
version: 0.8.0
package: dbt-labs/codegen
version: 0.7.0
package: dbt-labs/dbt_utils
version: 0.8.6
package: dbt-labs/redshift
version: 0.6.1
view raw packages.yml hosted with ❤ by GitHub

Use the dbt deps command to install the packages locally.

Installing dbt packages

External Tables

The _tickit__sources.yml file in the models/staging/tickit/external_tables/ model’s subdirectory defines the schema and S3 location for each of the seven external TICKIT database tables: category, date, event, listing, sale, user, and venue. You will need to update this file to reflect the name of your Amazon S3 bucket, in seven places.

version: 2
sources:
name: tickit_external
description: Sales activity for the fictional TICKIT web site, where users buy and sell tickets online for sporting events, shows, and concerts.
database: demo
schema: tickit_external
loader: s3
tables:
name: category
description: dimension table – TICKIT categories
external:
location: "s3://<your_s3_bucket_name>/raw_tickit_data/category/"
row_format: >
serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
'separatorChar'='|'
)
table_properties: "('skip.header.line.count'='1')"
columns:
name: catid
data_type: int
description: primary key
tests:
unique
not_null
name: catgroup
data_type: varchar(20)
name: catname
data_type: varchar(20)
name: catdesc
data_type: varchar(50)

Execute the command, dbt run-operation stage_external_sources, to create the seven external tables in the AWS Glue Data Catalog. This command is part of the dbt_external_tables package we installed earlier. It iterates through all source nodes, creates the tables if missing, and refreshes metadata.

Creating external tables in AWS Glue Data Catalog

If we failed to run the previous SQL statements to set schema ownership to the dbt user, the following error will likely occur.

Typical error resulting from incorrect ownership of the external_table schema

Once the command completes, we should observe seven new tables in the AWS Glue Data Catalog.

Seven new tables in the AWS Glue Data Catalog

Examining one of the AWS Glue data catalog tables, we can observe how the configuration in the _tickit__sources.yml file was used to define the table’s properties and schema. Note the Location field indicates where the underlying data is located in our Amazon S3 bucket.

The ‘category’ table in the AWS Glue Data Catalog

Staging Layer

In their best practices guide, dbt describes the staging layer in the following manner: “you can think of the staging layer as condensing and refining this material into the individual atoms we’ll later build more intricate and useful structures with.” The staging data models are the base tables and views we will use to build more complex aggregations and analytics queries in Redshift. The schema.yml file, also in the models/staging/tickit/ model’s subdirectory, defines seven late-binding views, modeled by dbt, to be created in Amazon Redshift.

version: 2
models:
name: stg_tickit__categories
description: Late binding view of external dimension table – TICKIT category
name: stg_tickit__dates
description: Late binding view of external dimension table – TICKIT date
name: stg_tickit__events
description: Late binding view of external dimension table – TICKIT event
name: stg_tickit__listings
description: Late binding view of external fact table – TICKIT listing
name: stg_tickit__sales
description: Late binding view of external fact table – TICKIT sale
name: stg_tickit__users
description: Late binding view of external dimension table – TICKIT user
name: stg_tickit__venues
description: Late binding view of external dimension table – TICKIT venue
view raw schema.yml hosted with ❤ by GitHub

The staging model’s SQL statements also follow many of dbt’s best practices. Below, we see an example of the stg_tickit__sales model (stg_tickit__sales.sql). This model performs a SELECT from the external sale table in the external_table schema. The model performs column renaming and basic calculations.

{{ config(materialized='view', bind=False) }}
with source as (
select * from {{ source('tickit_external', 'sale') }}
),
renamed as (
select
saleid as sale_id,
listid as list_id,
sellerid as seller_id,
buyerid as buyer_id,
eventid as event_id,
dateid as date_id,
qtysold as qty_sold,
round (pricepaid / qtysold, 2) as ticket_price,
pricepaid as price_paid,
round((commission / pricepaid) * 100, 2) as commission_prcnt,
commission,
(pricepaid commission) as earnings,
saletime as sale_time
from
source
where
sale_id IS NOT NULL
order by
sale_id
)
select * from renamed

The the dbt run command, according to dbt, “executes compiled SQL model files against the current target database. dbt connects to the target database and runs the relevant SQL, required to materialize all data models using the specified materialization strategies.” Instead of using the dbt run command to create all the project’s tables and views at once, for now, we are limiting the command to just the models in the ./models/staging/tickit/ directory using the --select optional argument. Execute the dbt run --select staging command to materialize the seven corresponding staging tables in Amazon Redshift.

The Staging layer’s data models successfully materialized in Redshift

Once the command completes, we should observe seven new views in Amazon Redshift demo database’s tickit_dbt schema with the stg_ prefix.

Amazon Redshift Query Editor v2 Console

Selecting from any of the views should return data.

Amazon Redshift Query Editor v2 Console

Late Binding Views

This demonstration uses late binding views for staging and intermediate layer models. According to dbt, “using late-binding views in a production deployment of dbt can vastly improve the availability of data in the warehouse, especially for models that are materialized as late-binding views and are queried by end-users, since they won’t be dropped when upstream models are updated. Additionally, late binding views can be used with external tables via Redshift Spectrum.

Alternatively, we could define the seven staging models as tables instead of late binding views. Once created as tables, the dependent intermediate and marts views will not require a late-binding reference, as in this project.

Intermediate Layer

In their best practices guide, dbt describes the intermediate layer as “purpose-built transformation steps.” Further, “the best guiding principle is to think about verbs (e.g. pivotedaggregated_to_userjoinedfanned_out_by_quanityfunnel_created, etc.) in the intermediate layer.

The project’s intermediate layer consists of two models related to users. The sample TICKIT database lumps all users into a single table. However, for analytics purposes, different user personas might interest marketing teams, such as buyers, sellers, sellers who also buy, and non-buyers (users who have never purchased tickets). The two models in the project’s intermediate layer filter for buyers and for sellers, resulting in two separate views of user personas.

{{ config(materialized='view', bind=False) }}
with sales as (
select * from {{ ref('stg_tickit__sales') }}
),
users as (
select * from {{ ref('stg_tickit__users') }}
),
first_purchase as (
select min(date(sale_time)) as first_purchase_date, buyer_id
from sales
group by buyer_id
),
final as (
select distinct
u.user_id,
u.username,
cast((u.last_name||', '||u.first_name) as varchar(100)) as full_name,
f.first_purchase_date,
u.city,
u.state,
u.email,
u.phone,
u.like_broadway,
u.like_classical,
u.like_concerts,
u.like_jazz,
u.like_musicals,
u.like_opera,
u.like_rock,
u.like_sports,
u.like_theatre,
u.like_vegas
from
sales as s
join users as u on u.user_id = s.buyer_id
join first_purchase as f on f.buyer_id = s.buyer_id
order by
user_id
)
select * from final

To materialize the intermediate layer’s two data models into views, execute the command, dbt run --select intermediate.

The Intermediate layer’s data models successfully materialized in Redshift

Once the command completes, we should observe a total of nine views in Amazon Redshift demo database’s tickit_dbt schema — seven staging and two intermediate, identified with the int_ prefix.

Amazon Redshift Query Editor v2 Console

Marts Layer

In their best practices guide, dbt describes the marts layer as “business defined entities.” Further, “this is the layer where everything comes together and we start to arrange all of our atoms (staging models) and molecules (intermediate models) into full-fledged cells that have identity and purpose. We sometimes like to call this the entity layer or concept layer, to emphasize that all our marts are meant to represent a specific entity or concept at its unique grain.

The project’s marts layer consists of four data models across marketing and sales. The models are materialized as two dimension tables and two fact tables. Although it is common practice to describe and label these as traditional star schema dimension (dim_) or fact (fct_) tables, in reality, the fact tables in this demonstration are actually flat, de-normalized, wide tables. Wide tables generally have better analytics performance in a modern data warehouse, according to Fivetran and others.

{{ config(materialized='table', sort='sale_id', dist='sale_id') }}
with categories as (
select * from {{ ref('stg_tickit__categories') }}
),
dates as (
select * from {{ ref('stg_tickit__dates') }}
),
events as (
select * from {{ ref('stg_tickit__events') }}
),
listings as (
select * from {{ ref('stg_tickit__listings') }}
),
sales as (
select * from {{ ref('stg_tickit__sales') }}
),
sellers as (
select * from {{ ref('int_sellers_extracted_from_users') }}
),
buyers as (
select * from {{ ref('int_buyers_extracted_from_users') }}
),
event_categories as (
select
e.event_id,
e.event_name,
c.cat_group,
c.cat_name
from events as e
join categories as c on c.cat_id = e.cat_id
),
final as (
select
s.sale_id,
s.sale_time,
d.qtr,
ec.cat_group,
ec.cat_name,
ec.event_name,
b.username as buyer_username,
b.full_name as buyer_name,
b.state as buyer_state,
b.first_purchase_date as buyer_first_purchase_date,
se.username as seller_username,
se.full_name as seller_name,
se.state as seller_state,
se.first_sale_date as seller_first_sale_date,
s.ticket_price,
s.qty_sold,
s.price_paid,
s.commission_prcnt,
s.commission,
s.earnings
from
sales as s
join listings as l on l.list_id = s.list_id
join buyers as b on b.user_id = s.buyer_id
join sellers as se on se.user_id = s.seller_id
join event_categories as ec on ec.event_id = s.event_id
join dates as d on d.date_id = s.date_id
order by
sale_id
)
select * from final
view raw fct_sales.sql hosted with ❤ by GitHub

The marts layer’s models take various dependencies through joins on staging and intermediate models. The data model above, fct_sales, has dependencies on multiple staging and intermediate models.

Lineage graph (dependency graph) of a data model, displayed in dbt Cloud

To materialize the marts layer’s four data models into tables, execute the command, dbt run --select marts.

The Marts layer’s data models successfully materialized in Redshift as tables

Once the command completes, we should observe four tables and nine views in the Redshift demo database’s tickit_dbt schema. Note how the dbt model for fct_sales (shown above), with its Jinja templating and multiple CTEs have been compiled into the resulting table in Redshift, this is the real magic of dbt!

Amazon Redshift Query Editor v2 Console showing fct_sales table

At this point, all of the project’s models have been compiled and created in the Redshift demo database by dbt.

Analyses

The demonstration’s project also contains example analyses. dbt allows us to version control more analytical-oriented SQL files within our dbt project using the analyses functionality of dbt. These analyses do not fit the fairly opinionated dbt model definition. We can compile the analyses SQL file using the dbt compile command, then copy and paste the resulting SQL statements from the target/compiled/ subdirectory into our data warehouse’s query tool of choice.

Analysis shown in dbt Cloud interface
Compiled analysis SQL executed in Amazon Redshift

Project Documentation

Using the dbt docs generate command will automatically generate the project’s documentation website from the SQL and YAML files. Documentations can be generated and displayed from your dbt Cloud account or hosted locally.

Project’s documentation website

Testing

According to dbt, “Tests are assertions you make about your models and other resources in your dbt project (e.g. sources, seeds, and snapshots). When you run dbt test, dbt will tell you if each test in your project passes or fails.” The project contains over 50 tests, split between the _tickit__sources.yml file and individual tests in the test/ directory. Typical dbt tests check for non-null and unique values, values within an expected numeric range, and values from a known list of strings. Any SELECT statement written in SQL can be tested.

name: user
description: dimension table – TICKIT users
columns:
name: userid
data_type: int
description: primary key
tests:
unique
not_null
name: username
data_type: char(8)
tests:
unique
not_null
name: firstname
data_type: varchar(30)
tests:
not_null
Snippet of tests in the _tickit__sources.sql file
all prices paid for tickets should be a positive value of $1 or greater
there are no credits (negative values)
select price_paid
from {{ ref('stg_tickit__sales') }}
group by price_paid
having not (price_paid >= 1)

Execute the project’s tests using the dbt test command. We can execute individual tests using the --select optional argument, for example, dbt test --select assert_all_sale_amounts_are_positive. We can also use the --threads optional argument with most dbt commands, including dbt test, increasing parallelism and reducing execution time. The example below uses 10 threads, the arbitrary maximum configured for the Amazon Redshift dbt user.

Running dbt tests
Successful test run

Jobs

According to dbt, Jobs are a set of dbt commands that you want to run on a schedule. For example, dbt run and dbt test. Jobs can load packages, run tests, materialize models, check source freshness (dbt source freshness), and regenerate documentation. Below, we have created a daily job to test, refresh, and document our project as the data is updated in the data lake.

dbt Cloud’s Job Run Overview

Notifications

According to dbt, Setting up notifications in dbt Cloud will allow you to receive alerts via Email or a chosen Slack channel when a job run succeeds, fails, or is canceled.

dbt Cloud’s Notifications interface for configuring email and Slack

The Slack notifications include run status, timings, and a link to open the job in dbt Cloud. Below, we see a notification regarding our project’s daily job run.

Notification of successful job run in Slack from dbt Cloud

Exposures

Exposures are a recent addition to dbt. Exposures make it possible to define and describe a downstream use of our dbt project, such as in a dashboard, application, or data science pipeline. Below we see an example of an exposure describing a sales dashboard created in Amazon QuickSight.

version: 2
exposures:
name: tickit_sales_summary
type: dashboard
maturity: medium
url: https://us-east-1.quicksight.aws.amazon.com
description: >
TICKIT sales summary dashboard, authored in Amazon QuickSight
depends_on:
ref('fct_sales')
ref('fct_listings')
owner:
name: Gary A. Stafford
email: gary.a.stafford@gmail.com
view raw dashboard.yml hosted with ❤ by GitHub

The exposure YAML file shown above describes the Amazon QuickSight dashboard shown below.

Amazon QuickSight dashboard

Exposures work with dbt’s auto-documentation feature. dbt populates a dedicated page in the auto-generated documentation site with context relevant to data consumers.

Project’s documentation website, showing dashboard exposure
Project’s documentation website, showing dashboard exposure’s lineage graph

Conclusion

In this post, we covered some of the basic functionality of dbt. We learned how dbt enables analysts to work more like software engineers. We also learned how dbt makes it easy to codify data models in SQL, to version control and manage data models as code with git, and collaborate on data models with other data team members.

Topics not explored in this post but critical to most large-scale dbt-managed production environments include advanced Jinja templating and macrosmodel freshnessorchestrationjob schedulingContinuous Integration and GitOpsnotificationsenvironment variables, and incremental models. We will explore these additional dbt capabilities in future posts.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are the property of the author unless otherwise noted.

, , , , , ,

Leave a comment

Utilizing In-memory Data Caching to Enhance the Performance of Data Lake-based Applications

Significantly improve the performance and reduce the cost of data lake-based analytics applications using Amazon ElastiCache for Redis

Introduction

The recent post, Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena, demonstrated how to develop a Cloud-native analytics application using Spring Boot. The application queried data in an Amazon S3-based data lake via an AWS Glue Data Catalog utilizing the Amazon Athena API.

Securely exposing data in a data lake using RESTful APIs can fulfill many data-consumer needs. However, access to that data can be significantly slower than access from a database or data warehouse. For example, in the previous post, we imported the OpenAPI v3 specification from the Spring Boot service into Postman. The API specification contained approximately 17 endpoints.

Running a suite of integration tests against the Spring Boot service using Postman

From my local development laptop, the Postman API test run times for all service endpoints took an average of 32.4 seconds. The Spring Boot service was running three Kubernetes pod replicas on Amazon EKS in the AWS US East (N. Virginia) Region.

Sample of test results ran from service (no Redis cache)

Compare the data lake query result times to equivalent queries made against a minimally-sized Amazon RDS for PostgreSQL database instance containing the same data. The average run times for all PostgreSQL queries averaged 10.8 seconds from a similar Spring Boot service. Although not a precise benchmark, we can clearly see that access to the data in the Amazon S3-based data lake is substantially slower, approximately 3x slower, than that of the PostgreSQL database. Tuning the database would easily create an even greater disparity.

Sample of test results for queries ran from service against Athena vs. PostgreSQL (quicker is better)

Caching for Data Lakes

According to AWS, the speed and throughput of a database can be the most impactful factor for overall application performance. Consequently, in-memory data caching can be one of the most effective strategies to improve overall application performance and reduce database costs. This same caching strategy can be applied to analytics applications built atop data lakes, as this post will demonstrate.

High-level AWS architecture demonstrated in this post

In-memory Caching

According to Hazelcast, memory caching (aka in-memory caching), often simply referred to as caching, is a technique in which computer applications temporarily store data in a computer’s main memory (e.g., RAM) to enable fast retrievals of that data. The RAM used for the temporary storage is known as the cache. As an application tries to read data, typically from a data storage system like a database, it checks to see if the desired record already exists in the cache. If it does, the application will read the data from the cache, thus eliminating the slower access to the database. If the desired record is not in the cache, then the application reads the record from the source. When it retrieves that data, it writes it to the cache so that when the application needs that same data in the future, it can quickly retrieve it from the cache.

Further, according to Hazelcast, as an application tries to read data, typically from a data storage system like a database, it checks to see if the desired record already exists in the cache. If it does, the application will read the data from the cache, thus eliminating the slower access to the database. If the desired record is not in the cache, then the application reads the record from the source. When it retrieves that data, it writes it to the cache so that when the application needs that same data in the future, it can quickly retrieve it from the cache.

Redis In-memory Data Store

According to their website, Redis is the open-source, in-memory data store used by millions of developers as a database, cache, streaming engine, and message broker. Redis provides data structures such as stringshasheslistssetssorted sets with range queries, bitmapshyperloglogsgeospatial indexes, and streams. In addition, Redis has built-in replicationLua scriptingLRU evictiontransactions, and different levels of on-disk persistence and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.

Amazon ElastiCache for Redis

According to AWS, Amazon ElastiCache for Redis, the fully-managed version of Redis, according to AWS, is a blazing fast in-memory data store that provides sub-millisecond latency to power internet-scale real-time applications. Redis applications can work seamlessly with ElastiCache for Redis without any code changes. ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with manageability, security, and scalability from AWS. As a result, Redis is an excellent choice for implementing a highly available in-memory cache to decrease data access latency, increase throughput, and ease the load on relational and NoSQL databases.

ElastiCache Performance Results

In the following post, we will add in-memory caching to the Spring Boot service introduced in the previous post. In preliminary tests with Amazon ElastiCache for Redis, the Spring Boot service delivered a 34x improvement in average response times. For example, test runs with the best-case scenario of a Redis cache hit ratio of 100% averaged 0.95 seconds compared to 32.4 seconds without Redis.

Sample of uncached versus cached test results (quicker is better)

Source Code

All the source code and Docker and Kubernetes resources are open-source and available on GitHub.

git clone --depth 1 -b redis \
https://github.com/garystafford/athena-spring-app.git

In addition, a Docker image for the Redis-base Spring Boot service is available on Docker Hub. For this post, use the latest tag with the .redis suffix.

Spring Boot service images are available on Docker Hub

Code Changes

The following code changes are required to the Spring Boot service to implement Spring Boot Cache with Redis.

Gradle Build

The gradle.build file now implements two additional dependencies, Spring Boot’s spring-boot-starter-cache and spring-boot-starter-data-redis (lines 45–46).

plugins {
id 'org.springframework.boot' version '2.7.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
id 'io.freefair.lombok' version '6.5.0-rc1'
}
group = 'aws.example'
version = '2.0.0'
sourceCompatibility = '17'
def awsSdkVersion = '2.17.225'
def springBootVersion = '2.7.1'
def restAssuredVersion = '5.1.1'
repositories {
mavenCentral()
}
dependencies {
// aws sdk
runtimeOnly "software.amazon.awssdk:bom:${awsSdkVersion}"
implementation "software.amazon.awssdk:athena:${awsSdkVersion}"
// spring
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}"
developmentOnly "org.springframework.boot:spring-boot-devtools:${springBootVersion}"
implementation 'org.springdoc:springdoc-openapi-ui:1.6.9'
implementation 'org.springframework:spring-context:5.3.20'
// testings
testImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
testImplementation "io.rest-assured:rest-assured:${restAssuredVersion}"
testImplementation "io.rest-assured:json-path:${restAssuredVersion}"
testImplementation "io.rest-assured:xml-path:${restAssuredVersion}"
testImplementation "io.rest-assured:json-schema-validator:${restAssuredVersion}"
// monitoring
implementation 'io.micrometer:micrometer-registry-prometheus:1.9.1'
// spring caching with redis
implementation "org.springframework.boot:spring-boot-starter-cache:${springBootVersion}"
implementation "org.springframework.boot:spring-boot-starter-data-redis:${springBootVersion}"
}
tasks.named('test') {
useJUnitPlatform()
}
view raw build.gradle hosted with ❤ by GitHub

Application Properties

The application properties file, application.yml, has been modified for both the dev and prod Spring Profiles. The dev Spring Profile expects Redis to be running on localhost. Correspondingly, the project’s docker-compose.yml file now includes a Redis container for local development. The time-to-live (TTL) for all Redis caches is arbitrarily set to one minute for dev and five minutes for prod. To increase application performance and reduce the cost of querying the data lake using Athena, increase Redis’s TTL. Note that increasing the TTL will reduce data freshness.

spring:
profiles:
active: dev
server:
port: 8080
servlet:
contextPath: /v1
athena:
region: us-east-1
workgroup: primary
catalog: AwsDataCatalog
database: tickit_demo
limit: 25
client-execution-timeout: 100000
retry-sleep: 1000
results-bucket: ${RESULTS_BUCKET}
named-query-id: ${NAMED_QUERY_ID}
spring:
config:
activate:
on-profile: dev
redis:
host: localhost
port: 6379
cache:
redis:
time-to-live: 60000 # 1000 ms * 60 * 1 = 1 min
enable-statistics: true
server:
port: 8080
logging:
level:
root: DEBUG
management:
endpoints:
web:
exposure:
include: '*'
jmx:
exposure:
include: '*'
spring:
config:
activate:
on-profile: prod
redis:
host: ${REDIS_HOST}
port: 6379
cache:
redis:
time-to-live: 300000 # 1000 ms * 60 * 5 = 5 min
enable-statistics: true
logging:
level:
root: INFO
management:
endpoints:
web:
exposure:
include: health, prometheus
jmx:
exposure:
include: health
view raw application.yml hosted with ❤ by GitHub

Athena Application Class

The AthenaApplication class declaration is now decorated with Spring Framework’s EnableCaching annotation (line 22). Additionally, two new Beans have been added (lines 58–68). Spring Redis provides an implementation for the Spring cache abstraction through the org.springframework.data.redis.cache package. The RedisCacheManager cache manager creates caches by default upon the first write. The RedisCacheConfiguration cache configuration helps to customize RedisCache behavior such as caching null values, cache key prefixes, and binary serialization.

package com.example.athena;
import com.example.athena.common.PreparedStatement;
import com.example.athena.common.View;
import com.example.athena.config.ConfigProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.time.Duration;
@SpringBootApplication
@EnableConfigurationProperties(ConfigProperties.class)
@EnableCaching
public class AthenaApplication {
private final PreparedStatement preparedStatement;
private final View view;
@Autowired
public AthenaApplication(PreparedStatement preparedStatement, View view) {
this.preparedStatement = preparedStatement;
this.view = view;
}
public static void main(String[] args) {
SpringApplication.run(AthenaApplication.class, args);
}
@Bean
void CreatePreparedStatement() {
preparedStatement.CreatePreparedStatement();
}
@Bean
void createView() {
view.CreateView();
}
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**").allowedOrigins("*");
}
};
}
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {
return RedisCacheManager.create(connectionFactory);
}
@Bean
public RedisCacheConfiguration cacheConfiguration() {
return RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(5))
.disableCachingNullValues();
}
}

POJO Data Model Classes

Spring Boot Redis caching uses Java serialization and deserialization. Therefore, all the POJO data model classes must implement Serializable (line 14).

package com.example.athena.tickit.model.ecomm;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Listing implements Serializable {
private int id;
private int sellerId;
private int eventId;
private int dateId;
private int numTickets;
private BigDecimal pricePerTicket;
private LocalDateTime listTime;
}
view raw Listing.java hosted with ❤ by GitHub

Service Classes

Each public method in the Service classes is now decorated with Spring Framework’s Cachable annotation (lines 42 and 66). For example, the findById(int id) method in the CategoryServiceImp class is annotated with @Cacheable(value = "categories", key = "#id"). The method’s key parameter uses Spring Expression Language (SpEL) expression for computing the key dynamically. Default is null, meaning all method parameters are considered as a key unless a custom keyGenerator has been configured. If no value is found in the Redis cache for the computed key, the target method will be invoked, and the returned value will be stored in the associated cache.

package com.example.athena.tickit.service;
import com.example.athena.common.AthenaClientFactory;
import com.example.athena.common.AthenaCommon;
import com.example.athena.config.ConfigProperties;
import com.example.athena.tickit.model.ecomm.Listing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.*;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Integer.parseInt;
@Service
public class ListingServiceImp implements ListingService {
private static final Logger logger = LoggerFactory.getLogger(ListingServiceImp.class);
private final ConfigProperties configProperties;
private final AthenaClientFactory athenaClientFactory;
private final AthenaCommon athenaCommon;
@Autowired
public ListingServiceImp(ConfigProperties configProperties, AthenaClientFactory athenaClientFactory, AthenaCommon athenaCommon) {
this.configProperties = configProperties;
this.athenaClientFactory = athenaClientFactory;
this.athenaCommon = athenaCommon;
}
@Cacheable(value = "listings")
public List<Listing> findAll(Integer limit, Integer offset) {
if (limit == null || limit < 1 || limit > configProperties.getLimit()) {
limit = configProperties.getLimit();
}
if (offset == null || offset < 1) {
offset = 0;
}
String whereClause = "WHERE listid IS NOT NULL";
String query = String.format("""
SELECT *
FROM refined_tickit_public_listing
%s
ORDER BY listid
OFFSET %s
LIMIT %s;""", whereClause, offset, limit);
return startQuery(query);
}
@Cacheable(value = "listing", key = "#id")
public Listing findById(int id) {
String query = String.format("""
SELECT DISTINCT *
FROM refined_tickit_public_listing
WHERE listid=%s""", id);
Listing listing;
try {
listing = startQuery(query).get(0);
} catch (java.lang.IndexOutOfBoundsException e) {
logger.error(e.getMessage());
return null;
}
return listing;
}
private List<Listing> startQuery(String query) {
logger.debug(String.format("Query: %s", query.replace("\n", " ")));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<Listing> listings = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return listings;
}
private List<Listing> processResultRows(AthenaClient athenaClient, String queryExecutionId) {
List<Listing> listings = new ArrayList<>();
try {
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
List<Row> rows;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
for (GetQueryResultsResponse result : getQueryResultsResults) {
rows = result.resultSet().rows();
for (Row myRow : rows.subList(1, rows.size())) { // skip first row – column names
List<Datum> allData = myRow.data();
Listing listing = new Listing();
listing.setId(parseInt(allData.get(0).varCharValue()));
listing.setSellerId(parseInt(allData.get(1).varCharValue()));
listing.setEventId(parseInt(allData.get(2).varCharValue()));
listing.setDateId(parseInt(allData.get(3).varCharValue()));
listing.setNumTickets(Integer.parseInt(allData.get(4).varCharValue()));
listing.setPricePerTicket(new BigDecimal(allData.get(5).varCharValue()));
listing.setListTime(LocalDateTime.parse(allData.get(6).varCharValue(), formatter));
listings.add(listing);
}
}
} catch (AthenaException e) {
logger.error(e.getMessage());
}
return listings;
}
}

Controller Classes

There are no changes required to the Controller classes.

Amazon ElastiCache for Redis

Multiple options are available for creating an Amazon ElastiCache for Redis cluster, including cluster mode, multi-AZ option, auto-failover option, node type, number of replicas, number of shards, replicas per shard, Availability Zone placements, and encryption at rest and encryption in transit options. The results in this post are based on a minimally-configured Redis version 6.2.6 cluster, with one shard, two cache.r6g.large nodes, and cluster mode, multi-AZ option, and auto-failover all disabled. In addition, encryption at rest and encryption in transit were also disabled. This cluster configuration is sufficient for development and testing, but not Production.

Amazon ElastiCache for Redis cluster used for this post
Amazon ElastiCache for Redis cluster monitoring console showing caching activity

Testing the Cache

To test Amazon ElastiCache for Redis, we will use Postman again with the imported OpenAPI v3 specification. With all data evicted from existing Redis caches, the first time the Postman tests run, they cause the service’s target methods to be invoked and the returned data stored in the associated caches.

Running a suite of integration tests against the Spring Boot service using Postman

To confirm this caching behavior, use the Redis CLI’s --scan option. To access the redis-cli, I deployed a single Redis pod to Amazon EKS. The first time the --scan command runs, we should get back an empty list of keys. After the first Postman test run, the same --scan command should return a list of cached keys.

List of cached keys in Redis

Use the Redis CLI’s MONITOR option to further confirm data is being cached, as indicated by the set command.

Athena query results being cached in Redis

After the initial caching of data, use the Redis CLI’s MONITOR option, again, to confirm the cache is being hit instead of calling the target methods, which would then invoke the Athena API. Rerunning the Postman tests, we should see get commands as opposed to set commands.

Monitoring cache hits in Redis

Lastly, to confirm the Spring Boot service is effectively using Redis to cache data, we can also check Amazon Athena’s Recent queries tab in the AWS Management Console. After repeated sequential test runs within the TTL window, we should only see one Athena query per endpoint.

Amazon Athena Recent queries tab in the AWS Management Console

Conclusion

In this brief follow-up to the recent post, Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena, we saw how to substantially increase data lake application performance using Amazon ElastiCache for Redis. Although this caching technique is often associated with databases, it can also be effectively applied to data lake-based applications, as demonstrated in the post.


This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author unless otherwise noted.

, , , , ,

Leave a comment

Monolith to Microservices: Refactoring Relational Databases

Exploring common patterns for refactoring relational database models as part of a microservices architecture

Introduction

There is no shortage of books, articles, tutorials, and presentations on migrating existing monolithic applications to microservices, nor designing new applications using a microservices architecture. It has been one of the most popular IT topics for the last several years. Unfortunately, monolithic architectures often have equally monolithic database models. As organizations evolve from monolithic to microservices architectures, refactoring the application’s database model is often overlooked or deprioritized. Similarly, as organizations develop new microservices-based applications, they frequently neglect to apply a similar strategy to their databases.

The following post will examine several basic patterns for refactoring relational databases for microservices-based applications.

Terminology

Monolithic Architecture

A monolithic architecture is “the traditional unified model for the design of a software program. Monolithic, in this context, means composed all in one piece.” (TechTarget). A monolithic application “has all or most of its functionality within a single process or container, and it’s componentized in internal layers or libraries” (Microsoft). A monolith is usually built, deployed, and upgraded as a single unit of code.

Microservices Architecture

A microservices architecture (aka microservices) refers to “an architectural style for developing applications. Microservices allow a large application to be separated into smaller independent parts, with each part having its own realm of responsibility” (Google Cloud).

According to microservices.io, the advantages of microservices include:

  • Highly maintainable and testable
  • Loosely coupled
  • Independently deployable
  • Organized around business capabilities
  • Owned by a small team
  • Enables rapid, frequent, and reliable delivery
  • Allows an organization to [more easily] evolve its technology stack

Database

A database is “an organized collection of structured information, or data, typically stored electronically in a computer system” (Oracle). There are many types of databases. The most common database engines include relational, NoSQL, key-value, document, in-memory, graph, time series, wide column, and ledger.

PostgreSQL

In this post, we will use PostgreSQL (aka Postgres), a popular open-source object-relational database. A relational database is “a collection of data items with pre-defined relationships between them. These items are organized as a set of tables with columns and rows. Tables are used to hold information about the objects to be represented in the database” (AWS).

Amazon RDS for PostgreSQL

We will use the fully managed Amazon RDS for PostgreSQL in this post. Amazon RDS makes it easy to set up, operate, and scale PostgreSQL deployments in the cloud. With Amazon RDS, you can deploy scalable PostgreSQL deployments in minutes with cost-efficient and resizable hardware capacity. In addition, Amazon RDS offers multiple versions of PostgreSQL, including the latest version used for this post, 14.2.

The patterns discussed here are not specific to Amazon RDS for PostgreSQL. There are many options for using PostgreSQL on the public cloud or within your private data center. Alternately, you could choose Amazon Aurora PostgreSQL-Compatible Edition, Google Cloud’s Cloud SQL for PostgreSQL, Microsoft’s Azure Database for PostgreSQLElephantSQL, or your own self-manage PostgreSQL deployed to bare metal servers, virtual machine (VM), or container.

Database Refactoring Patterns

There are many ways in which a relational database, such as PostgreSQL, can be refactored to optimize efficiency in microservices-based application architectures. As stated earlier, a database is an organized collection of structured data. Therefore, most refactoring patterns reorganize the data to optimize for an organization’s functional requirements, such as database access efficiency, performance, resilience, security, compliance, and manageability.

The basic building block of Amazon RDS is the DB instance, where you create your databases. You choose the engine-specific characteristics of the DB instance when you create it, such as storage capacity, CPU, memory, and EC2 instance type on which the database server runs. A single Amazon RDS database instance can contain multiple databases. Those databases contain numerous object types, including tables, views, functions, procedures, and types. Tables and other object types are organized into schemas. These hierarchal constructs — instances, databases, schemas, and tables — can be arranged in different ways depending on the requirements of the database data producers and consumers.

Basic relational database refactoring patterns

Sample Database

To demonstrate different patterns, we need data. Specifically, we need a database with data. Conveniently, due to the popularity of PostgreSQL, there are many available sample databases, including the Pagila database. I have used it in many previous articles and demonstrations. The Pagila database is available for download from several sources.

Database diagram showing the relations between Pagila’s tables

The Pagila database represents a DVD rental business. The database is well-built, small, and adheres to a third normal form (3NF) database schema design. The Pagila database has many objects, including 1 schema, 15 tables, 1 trigger, 7 views, 8 functions, 1 domain, 1 type, 1 aggregate, and 13 sequences. Pagila’s tables contain between 2 and 16K rows.

Pattern 1: Single Schema

Pattern 1: Single Schema is one of the most basic database patterns. There is one database instance containing a single database. That database has a single schema containing all tables and other database objects.

Pattern 1: Single Schema

As organizations begin to move from monolithic to microservices architectures, they often retain their monolithic database architecture for some time.

Beginning to decompose the monolith application

Frequently, the monolithic database’s data model is equally monolithic, lacking proper separation of concerns using simple database constructs such as schemas. The Pagila database is an example of this first pattern. The Pagila database has a single schema containing all database object types, including tables, functions, views, procedures, sequences, and triggers.

To create a copy of the Pagila database, we can use pg_restore to restore any of several publically available custom-format database archive files. If you already have the Pagila database running, simply create a copy with pg_dump.

# set postgres environment variables
# ** CHANGE ME **
export PGHOST="postgres1.abcxyzdef.us-east-1.rds.amazonaws.com"
export PGPORT=5432
export PGDATABASE="postgres"
export PGUSER="admin"
export PGPASSWORD="change_me!"
# create new v1 of pagila database
export PGDATABASE="postgres"
psql -c "CREATE DATABASE pagila_v1;"
# restore original version of pagila database
pg_restore -d pagila_v1 pagila.dump
# confirm pagila tables in public schema
export PGDATABASE="pagila_v1"
psql -c "\dt"
Create a new version of the Pagila database for Pattern 1

Below we see the table layout of the Pagila database, which contains the single, default public schema.

-----------+----------+--------+------------
Instance | Database | Schema | Table
-----------+----------+--------+------------
postgres1 | pagila | public | actor
postgres1 | pagila | public | address
postgres1 | pagila | public | category
postgres1 | pagila | public | city
postgres1 | pagila | public | country
postgres1 | pagila | public | customer
postgres1 | pagila | public | film
postgres1 | pagila | public | film_actor
postgres1 | pagila | public | film_category
postgres1 | pagila | public | inventory
postgres1 | pagila | public | language
postgres1 | pagila | public | payment
postgres1 | pagila | public | rental
postgres1 | pagila | public | staff
postgres1 | pagila | public | store

Using a single schema to house all tables, especially the public schema is generally considered poor database design. As a database grows in complexity, creating, organizing, managing, and securing dozens, hundreds, or thousands of database objects, including tables, within a single schema becomes impossible. For example, given a single schema, the only way to organize large numbers of database objects is by using lengthy and cryptic naming conventions.

Public Schema

According to the PostgreSQL docs, if tables or other object types are created without specifying a schema name, they are automatically assigned to the default public schema. Every new database contains a public schema. By default, users cannot access any objects in schemas they do not own. To allow that, the schema owner must grant the USAGE privilege on the schema. by default, everyone has CREATE and USAGE privileges on the schema public. These default privileges enable all users to connect to a given database to create objects in its public schema. Some usage patterns call for revoking that privilege, which is a compelling reason not to use the public schema as part of your database design.

Pattern 2: Multiple Schemas

Separating tables and other database objects into multiple schemas is an excellent first step to refactoring a database to support microservices. As application complexity and databases naturally grow over time, schemas to separate functionality by business subdomain or teams will benefit significantly.

According to the PostgreSQL docs, there are several reasons why one might want to use schemas:

  • To allow many users to use one database without interfering with each other.
  • To organize database objects into logical groups to make them more manageable.
  • Third-party applications can be put into separate schemas, so they do not collide with the names of other objects.

Schemas are analogous to directories at the operating system level, except schemas cannot be nested.

Pattern 2: Multiple Schemas

With Pattern 2, as an organization continues to decompose its monolithic application architecture to a microservices-based application, it could transition to a schema-per-microservice or similar level or organizational granularity.

Continuing to decompose the monolith into microservices

Applying Domain-driven Design Principles

Domain-driven design (DDD) is “a software design approach focusing on modeling software to match a domain according to input from that domain’s experts” (Wikipedia). Architects often apply DDD principles to decompose a monolithic application into microservices. For example, a microservice or set of related microservices might represent a Bounded Context. In DDD, a Bounded Context is “a description of a boundary, typically a subsystem or the work of a particular team, within which a particular model is defined and applicable.” (hackernoon.com). Examples of Bounded Context might include Sales, Shipping, and Support.

One technique to apply schemas when refactoring a database is to mirror the Bounded Contexts, which reflect the microservices. For each microservice or set of closely related microservices, there is a schema. Unfortunately, there is no absolute way to define the Bounded Contexts of a Domain, and henceforth, schemas to a database. It depends on many factors, including your application architecture, features, security requirements, and often an organization’s functional team structure.

Reviewing the purpose of each table in the Pagila database and their relationships to each other, we could infer Bounded Contexts, such as Films, Stores, Customers, and Sales. We can represent these Bounded Contexts as schemas within the database as a way to organize the data. The individual tables in a schema mirror DDD concepts, such as aggregates, entities, or value objects.

# dump v1 of pagila database
pg_dump -Fc -d pagila_v1 -f pagila_v1.dump
# create new v2 of pagila database
psql -c "CREATE DATABASE pagila_v2;"
# restore v1 of pagila database
pg_restore -d pagila_v2 pagila_v1.dump
# connect to new pagila database
export PGDATABASE="pagila_v2"
psql
Create a new version of the Pagila database for Pattern 2
wrap in transaction
BEGIN;
optional, should be set to public by default
SET search_path TO public;
create new schemas
CREATE SCHEMA common;
CREATE SCHEMA customers;
CREATE SCHEMA films;
CREATE SCHEMA sales;
CREATE SCHEMA staff;
CREATE SCHEMA stores;
common
ALTER TABLE address SET SCHEMA common;
ALTER TABLE city SET SCHEMA common;
ALTER TABLE country SET SCHEMA common;
customers
ALTER TABLE customer SET SCHEMA customers;
films
ALTER TABLE actor SET SCHEMA films;
ALTER TABLE category SET SCHEMA films;
ALTER TABLE film SET SCHEMA films;
ALTER TABLE language SET SCHEMA films;
ALTER TABLE film_actor SET SCHEMA films;
ALTER TABLE film_category SET SCHEMA films;
sales
ALTER TABLE payment SET SCHEMA sales;
ALTER TABLE rental SET SCHEMA sales;
staff
ALTER TABLE staff SET SCHEMA staff;
stores
ALTER TABLE store SET SCHEMA stores;
ALTER TABLE inventory SET SCHEMA stores;
COMMIT;
confirm all tables are removed from public schema
\dt
view raw pagila_v2.sql hosted with ❤ by GitHub
Add the new schemas and move tables and objects accordingly

As shown below, the tables of the Pagila database have been relocated into six new schemas: commoncustomersfilmssalesstaff, and stores. The common schema contains tables with address data references tables in several other schemas. There are now no tables left in the public schema. We will assume other database objects (e.g., functions, views, and triggers) have also been moved and modified if necessary to reflect new table locations.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | address
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store

By applying schemas, we align tables and other database objects to individual microservices or functional teams that own the microservices and the associated data. Schemas allow us to apply fine-grain access control over objects and data within the database more effectively.

Refactoring other Database Objects

Typically with psql, when moving tables across schemas using an ALTER TABLE...SET SCHEMA... SQL statement, objects such as database views will be updated to the table’s new location. For example, take Pagila’s sales_by_store view. Note the schemas have been automatically updated for multiple tables from their original location in the public schema. The view was also moved to the sales schema.

CREATE OR REPLACE VIEW sales.sales_by_store AS
SELECT (c.city || ','::text) || cy.country AS store,
(m.first_name || ' '::text) || m.last_name AS manager,
sum(p.amount) AS total_sales
FROM sales.payment p
JOIN sales.rental r ON p.rental_id = r.rental_id
JOIN stores.inventory i ON r.inventory_id = i.inventory_id
JOIN stores.store s ON i.store_id = s.store_id
JOIN common.address a ON s.address_id = a.address_id
JOIN common.city c ON a.city_id = c.city_id
JOIN common.country cy ON c.country_id = cy.country_id
JOIN staff.staff m ON s.manager_staff_id = m.staff_id
GROUP BY cy.country, c.city, s.store_id,
m.first_name, m.last_name
ORDER BY cy.country, c.city;
Pagila’s sales_by_store database view with new schema pattern

Splitting Table Data Across Multiple Schemas

When refactoring a database, you may have to split data by replicating table definitions across multiple schemas. Take, for example, Pagila’s address table, which contains the addresses of customers, staff, and stores. The customers.customerstores.staff, and stores.store all have foreign key relationships with the common.address table. The address table has a foreign key relationship with both the city and country tables. Thus for convenience, the addresscity, and country tables were all placed into the common schema in the example above.

Although, at first, storing all the addresses in a single table might appear to be sound database normalization, consider the risks of having the address table’s data exposed. The store addresses are not considered sensitive data. However, the home addresses of customers and staff are likely considered sensitive personally identifiable information (PII). Also, consider as an application evolves, you may have fields unique to one type of address that does not apply to other categories of addresses. The table definitions for a store’s address may be defined differently than the address of a customer. For example, we might choose to add a county column to the customers.address table for e-commerce tax purposes, or an on_site_parking boolean column to the stores.address table.

In the example below, a new staff schema was added. The address table definition was replicated in the customersstaff, and stores schemas. The assumption is that the mixed address data in the original table was distributed to the appropriate address tables. Note the way schemas help us avoid table name collisions.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store

To create the new customers.address table, we could use the following SQL statements. The statements to create the other two address tables are nearly identical.

wrap in transaction
BEGIN;
create new customers.address table
CREATE SEQUENCE IF NOT EXISTS customers.address_address_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 1;
ALTER SEQUENCE customers.address_address_id_seq
OWNER TO pagila_admin;
CREATE TABLE IF NOT EXISTS customers.address (
address_id integer DEFAULT nextval('address_address_id_seq'::regclass) NOT NULL PRIMARY KEY,
address text NOT NULL,
address2 text,
district text NOT NULL,
city_id smallint NOT NULL REFERENCES common.city ON UPDATE CASCADE ON DELETE RESTRICT,
postal_code text,
phone text NOT NULL,
last_update timestamp with time zone DEFAULT now() NOT NULL
);
ALTER TABLE customers.address
OWNER TO pagila_admin;
CREATE INDEX IF NOT EXISTS idx_fk_city_id ON customers.address(city_id);
CREATE TRIGGER last_updated
BEFORE UPDATE ON customers.address FOR EACH ROW
EXECUTE PROCEDURE last_updated();
COMMIT;
Creating new customers.address table and associated objects

Although we now have two additional tables with identical table definitions, we do not duplicate any data. We could use the following SQL statements to migrate unique address data into the appropriate tables and confirm the results.

wrap in transaction
BEGIN;
copy only customer addresses to new customers.address table
INSERT INTO customers.address
SELECT *
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM customers.customer
);
copy only staff addresses to new staff.address table
INSERT INTO staff.address
SELECT COUNT(*)
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM staff.staff
);
copy only store addresses to new stores.address table
INSERT INTO stores.address
SELECT *
FROM common.address
WHERE common.address.address_id IN (
SELECT DISTINCT address_id
FROM stores.store
);
check for extraneous data in common.address before deleting
SELECT *
FROM common.address
WHERE common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM customers.customer)
AND common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM staff.staff)
AND common.address.address_id NOT IN
(SELECT DISTINCT address_id FROM stores.store);
COMMIT;
Migrating unique address data into the appropriate tables

Lastly, alter the existing foreign key constraints to point to the new address tables. The SQL statements for the other two address tables are nearly identical.

wrap in transaction
BEGIN;
customers.customer
ALTER TABLE IF EXISTS customers.customer
DROP CONSTRAINT IF EXISTS customer_address_id_fkey;
ALTER TABLE IF EXISTS customers.customer
ADD CONSTRAINT customer_address_id_fkey FOREIGN KEY (address_id)
REFERENCES customers.address (address_id) MATCH SIMPLE
ON UPDATE CASCADE
ON DELETE RESTRICT;
COMMIT;
Updating the existing foreign key constraints

There is now a reduced risk of exposing sensitive customer or staff data when querying store addresses, and the three address entities can evolve independently. Individual functional teams separately responsible customersstaff, and stores, can own and manage just the data within their domain.

Before dropping the common.address tables, you would still need to modify the remaining database objects that have dependencies on this table, such as views and functions. For example, take Pagila’s sales_by_store view we saw previously. Note line 9, below, the schema of the address table has been updated from common.address to stores.address. The stores.address table only contains addresses of stores, not customers or staff.

CREATE OR REPLACE VIEW sales.sales_by_store AS
SELECT (c.city || ','::text) || cy.country AS store,
(m.first_name || ' '::text) || m.last_name AS manager,
sum(p.amount) AS total_sales
FROM sales.payment p
JOIN sales.rental r ON p.rental_id = r.rental_id
JOIN stores.inventory i ON r.inventory_id = i.inventory_id
JOIN stores.store s ON i.store_id = s.store_id
JOIN stores.address a ON s.address_id = a.address_id
JOIN common.city c ON a.city_id = c.city_id
JOIN common.country cy ON c.country_id = cy.country_id
JOIN staff.staff m ON s.manager_staff_id = m.staff_id
GROUP BY cy.country, c.city, s.store_id,
m.first_name, m.last_name
ORDER BY cy.country, c.city;
Pagila’s sales_by_store database view with the new schema pattern

Below, we see the final table structure for the Pagila database after refactoring. Tables have been loosely grouped together schema in the diagram.

Database diagram showing new table relationships

Pattern 3: Multiple Databases

Similar to how individual schemas allow us to organize tables and other database objects and provide better separation of concerns, we can use databases the same way. For example, we could choose to spread the Pagila data across more than one database within a single RDS database instance. Again, using DDD concepts, while schemas might represent Bounded Contexts, databases most closely align to Domains, which are “spheres of knowledge and activity where the application logic revolves” (hackernoon.com).

Pattern 3: Multiple Databases

With Pattern 3, as an organization continues to refine its microservices-based application architecture, it might find that multiple databases within the same database instance are advantageous to further separate and organize application data.

Moving from a single- to multi-database architecture

Let’s assume that the data in the films schema is owned and managed by a completely separate team who should never have access to sensitive data stored in the customersstores, and sales schemas. According to the PostgreSQL docs, database access permissions are managed using the concept of roles. Depending on how the role is set up, a role can be thought of as either a database user or a group of users.

To provide greater separation of concerns than just schemas, we can create a second, completely separate database within the same RDS database instance for data related to films. With two separate databases, it is easier to create and manage distinct roles and ensure access to customersstores, or sales data is only accessible to teams that need access.

# dump v2 of pagila database
pg_dump -Fc -d pagila_v2 -f pagila_v2.dump
# create 2 new v3 databases
export PGDATABASE="postgres"
psql << EOF
\x
CREATE DATABASE pagila_v3;
CREATE DATABASE products_v3;
EOF
# restore v2 of pagila database
pg_restore -d pagila_v3 pagila_v2.dump
pg_restore -d products_v3 -n films pagila_v2.dump
# connect to new pagila database
export PGDATABASE="pagila_v3"
psql
Create a new version of the Pagila and Products database for Pattern 3

Below, we see the new layout of tables now spread across two databases within the same RDS database instance. Two new tables, highlighted in bold, are explained below.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | film
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store
-----------+----------+-----------+---------------
postgres1 | products | films | actor
postgres1 | products | films | category
postgres1 | products | films | film
postgres1 | products | films | film_actor
postgres1 | products | films | film_category
postgres1 | products | films | language
postgres1 | products | films | outbox

Change Data Capture and the Outbox Pattern

Inserts, updates, and deletes of film data can be replicated between the two databases using several methods, including Change Data Capture (CDC) with the Outbox Pattern. CDC is “a pattern that enables database changes to be monitored and propagated to downstream systems” (RedHat). The Outbox Pattern uses the PostgreSQL database’s ability to perform an commit to two tables atomically using a transaction. Transactions bundles multiple steps into a single, all-or-nothing operation.

In this example, data is written to existing tables in the products.films schema (updated aggregate’s state) as well as a new products.films.outbox table (new domain events), wrapped in a transaction. Using CDC, the domain events from the products.films.outbox table are replicated to the pagila.films.film table. The replication of data between the two databases using CDC is also referred to as eventual consistency.

Change Data Capture (CDC) with the Outbox Pattern

In this example, films in the pagila.films.film and products.films.outbox tables are represented in a denormalized, aggregated view of a film instead of the original, normalized relational multi-table structure. The table definition of the new pagila.films.film table is very different than that of the original Pagila products.films.films table. A concept such as a film, represented as an aggregate or entity, can be common to multiple Bounded Contexts, yet have a different definition.

CREATE TABLE IF NOT EXISTS films.outbox
(
film_id integer NOT NULL,
title character varying(50) NOT NULL,
release_year smallint NOT NULL,
film_language character varying(20) NOT NULL,
rating character varying(5) COLLATE NOT NULL,
categories character varying(100) NOT NULL,
actors character varying NOT NULL,
rental_duration smallint NOT NULL,
length_minutes smallint NOT NULL,
replacement_cost numeric(5,2) NOT NULL,
rental_rate numeric(4,2) NOT NULL,
last_update timestamp with time zone NOT NULL DEFAULT now(),
CONSTRAINT outbox_pkey PRIMARY KEY (film_id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS films.outbox
OWNER to products_admin;
Example products.films.outbox table definition (similar for pagila.films.film)

Note the Confluent JDBC Source Connector  (io.confluent.connect.jdbc.JdbcSourceConnector) used here will not work with PostgreSQL arrays, which would be ideal for one-to-many categories and actors columns. Arrays can be converted to text using ::text or by building value-delimited strings using string_agg aggregate function.

PROCEDURE: films.insert_into_outbox(integer)
DROP PROCEDURE IF EXISTS films.insert_into_outbox(integer);
EXAMPLE: "CALL films.insert_into_outbox(100);"
CREATE OR REPLACE PROCEDURE films.insert_into_outbox(IN filmid integer)
LANGUAGE 'sql'
BEGIN ATOMIC
delete existing record
DELETE
FROM films.outbox
WHERE (outbox.film_id = insert_into_outbox.filmid);
insert new record
INSERT INTO films.outbox (film_id, title, release_year,
film_language, rating, categories,
actors, rental_duration, length_minutes,
replacement_cost, rental_rate)
SELECT f.film_id,
initcap(f.title) AS title,
f.release_year,
trim(BOTH FROM l.name) AS film_language,
f.rating,
(SELECT array
(SELECT c.name
FROM films.film_category AS fc
JOIN films.category AS c ON fc.category_id = c.category_id
WHERE film_id = f.film_id)::text AS categories),
(SELECT array
(SELECT initcap(concat(a.first_name, ' ', a.last_name)) AS actors
FROM films.film_actor AS fa
JOIN films.actor AS a ON fa.actor_id = a.actor_id
WHERE film_id = f.film_id)::text AS actor_array),
f.rental_duration,
f.length AS length_minutes,
f.replacement_cost,
f.rental_rate
FROM films.film f
JOIN films.language l ON f.language_id = l.language_id
WHERE (f.film_id = insert_into_outbox.filmid)
GROUP BY f.film_id, (trim(BOTH FROM l.name));
END;
ALTER PROCEDURE films.insert_into_outbox (integer)
OWNER TO products_admin;
An example query to insert data into the products.films.outbox table

Given this table definition, the resulting data would look as follows.

film_id title release_year film_language rating categories actor_array rental_duration length_minutes replacement_cost rental_rate
389 Gunfighter Mussolini 2006 English PG-13 {Sports} {"Audrey Olivier","Judy Dean","Scarlett Damon","Russell Close"} 3 127 9.99 2.99
581 Minority Kiss 2006 English G {Music} {"Vivien Basinger"} 4 59 16.99 0.99
598 Mosquito Armageddon 2006 English G {Sports} {"Goldie Brody","Kirk Jovovich","Nick Stallone","Reese West"} 6 57 22.99 0.99
943 Villain Desperate 2006 English PG-13 {Documentary} {"Dustin Tautou","Cary Mcconaughey"} 4 76 27.99 4.99
490 Jumanji Blade 2006 English G {New} {"Jennifer Davis","Bob Fawcett","Nick Stallone","Gary Phoenix","Mena Temple","Jim Mostel"} 4 121 13.99 2.99
243 Doors President 2006 English NC-17 {Animation} {"Karl Berry","Lucille Tracy","Natalie Hopkins","Christian Akroyd","Sylvester Dern","Gene Hopkins","Ed Mansfield","Kim Allen","Reese West"} 3 49 22.99 4.99
40 Army Flintstones 2006 English R {Documentary} {"Ed Chase","Cary Mcconaughey","Mae Hoffman","Gene Willis","Penelope Cronyn","Matthew Carrey","Russell Close"} 4 148 22.99 0.99
317 Fireball Philadelphia 2006 English PG {Comedy} {"Val Bolger","Jude Cruise","Adam Grant","James Pitt","Frances Tomei"} 4 148 25.99 0.99
17 Alone Trip 2006 English R {Music} {"Ed Chase","Karl Berry","Uma Wood","Woody Jolie","Spencer Depp","Chris Depp","Laurence Bullock","Renee Ball"} 3 82 14.99 0.99
195 Crowds Telemark 2006 English R {Sci-Fi} {"Matthew Johansson","Anne Cronyn","Jeff Silverstone","Matthew Carrey"} 3 112 16.99 4.99
Example of data in the pagila.films.film and products.films.outbox tables

The existing pagila.stores.inventory table has a foreign key constraint on the the pagila.films.film table. However, the films schema and associated tables have been migrated to the products database’s films schema. To overcome this challenge, we can:

  1. Create a new pagila.films.film table
  2. Continuously replicate data from the products database to the pagila.films.film table data using CDC (see below)
  3. Modify the pagila.stores.inventory table to take a dependency on the new film table
  4. Drop the duplicate tables and other objects from the pagila.films schema

Debezium and Confluent for CDC

There are several technology choices for performing CDC. For this post, I have used RedHat’s Debezium connector for PostgreSQL and Debezium Outbox Event Router, and Confluent’s JDBC Sink Connector. Below, we see a typical example of a Kafka Connect Source Connector using the Debezium connector for PostgreSQL and a Sink Connector using the Confluent JDBC Sink Connector. The Source Connector streams changes from the products logs, using PostgreSQL’s Write-Ahead Logging (WAL) feature, to an Apache Kafka topic. A corresponding Sink Connector streams the changes from the Kafka topic to the pagila database.

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres1.abcxyzdef.us-east-1.rds.amazonaws.com",
"database.port": "5432",
"database.user": "cdc_source_user",
"database.password": "change_me!",
"database.dbname": "products",
"database.server.name": "products",
"table.include.list": "films.outbox",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"slot.name": "debezium_source_connector"
}
Debezium connector for PostgreSQL example
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "products.films.outbox",
"connection.url": "jdbc:postgresql://postgres1.abcxyzdef.us-east-1.rds.amazonaws.com:5432/pagila?stringtype=unspecified",
"connection.user": "cdc_sink_user",
"connection.password": "change_me!",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "films.film",
"auto-evolve": "true",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "film_id",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
Confluent JDBC Sink Connector example

Pattern 4: Multiple Database Instances

At some point in the evolution of a microservices-based application, it might become advantageous to separate the data into multiple database instances using the same database engine. Although managing numerous database instances may require more resources, there are also advantages. Each database instance will have independent connection configurations, roles, and administrators. Each database instance could run different versions of the database engine, and each could be upgraded and maintained independently.

Pattern 4: Multiple Database Instances

With Pattern 4, as an organization continues to refine its application architecture, it might find that multiple database instances are beneficial to further separate and organize application data.

Moving from multiple databases to multiple DB instances

Below is one possible refactoring of the Pagila database, splitting the data between two database engines. The first database instance, postgres1, contains two databases, pagila and products. The second database instance, postgres2, contains a single database, products.

-----------+----------+-----------+---------------
Instance | Database | Schema | Table
-----------+----------+-----------+---------------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+---------------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+---------------
postgres1 | pagila | films | actor
postgres1 | pagila | films | category
postgres1 | pagila | films | film
postgres1 | pagila | films | film_actor
postgres1 | pagila | films | film_category
postgres1 | pagila | films | language
-----------+----------+-----------+---------------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+---------------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store
-----------+----------+-----------+---------------
postgres1 | pagila | sales | payment
postgres1 | pagila | sales | rental
-----------+----------+-----------+---------------
postgres2 | products | films | actor
postgres2 | products | films | category
postgres2 | products | films | film
postgres2 | products | films | film_actor
postgres2 | products | films | film_category
postgres2 | products | films | language

Data Replication with CDC

Note the films schema is duplicated between the two databases, shown above. Again, using the CDC allows us to keep the six postgres1.pagila.films tables in sync with the six  postgres2.products.films tables using CDC. In this example, we are not using the OutBox Pattern, as used previously in Pattern 3. Instead, we are replicating any changes to any of the tables in postgres2.products.films schema to the corresponding tables in the postgres1.pagila.films schema.

Multi-table data replication between database instances using Change Data Capture (CDC)

To ensure the tables stay in sync, the tables and other objects in the postgres1.pagila.films schema should be limited to read-only access (SELECT) for all users. The postgres2.products.films tables represent the authoritative source of data, the System of Record (SoR). Any inserts, updates, or deletes, must be made to these tables and replicated using CDC.

CREATE USER read_only_user WITH ENCRYPTED PASSWORD 'change_me!';
GRANT CONNECT ON DATABASE pagila TO read_only_user;
GRANT USAGE ON SCHEMA films TO read_only_user;
GRANT SELECT ON ALL TABLES IN SCHEMA films TO read_only_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA films
GRANT SELECT ON TABLES TO read_only_user;
Example of a user with read-only rights (SELECT) to films schema

Pattern 5: Multiple Database Engines

AWS commonly uses the term ‘purpose-built databases.’ AWS offers over fifteen purpose-built database engines to support diverse data models, including relational, key-value, document, in-memory, graph, time series, wide column, and ledger. There may be instances where using multiple, purpose-built databases makes sense. Using different database engines allows architects to take advantage of the unique characteristics of each engine type to support diverse application requirements.

With Pattern 5, as an organization continues to refine its application architecture, it might choose to leverage multiple, different database engines.

Moving from multiple databases to multiple database engines

Take for example an application that uses a combination of relational, NoSQL, and in-memory databases to persist data. In addition to PostgreSQL, the application benefits from moving a certain subset of its relational data to a non-relational, high-performance key-value store, such as Amazon DynamoDB. Furthermore, the application implements a database cache using an ultra-fast in-memory database, such as Amazon ElastiCache for Redis.

Pattern 5: Multiple Database Engines

Below is one possible refactoring of the Pagila database, splitting the data between two different database engines, PostgreSQL and Amazon DynamoDB.

-----------+----------+-----------+-----------
Instance | Database | Schema | Table
-----------+----------+-----------+-----------
postgres1 | pagila | common | city
postgres1 | pagila | common | country
-----------+----------+-----------+-----------
postgres1 | pagila | customers | address
postgres1 | pagila | customers | customer
-----------+----------+-----------+-----------
postgres1 | pagila | films | film
-----------+----------+-----------+-----------
postgres1 | sales | sales | payment
postgres1 | sales | sales | rental
-----------+----------+-----------+-----------
postgres1 | pagila | staff | address
postgres1 | pagila | staff | staff
-----------+----------+-----------+-----------
postgres1 | pagila | stores | address
postgres1 | pagila | stores | film
postgres1 | pagila | stores | inventory
postgres1 | pagila | stores | store
-----------+----------+-----------+-----------
DynamoDB | - | - | Films

The assumption is that based on the application’s access patterns for film data, the application could benefit from the addition of a non-relational, high-performance key-value store. Further, the film-related data entities, such as a film , category, and actor, could be modeled using DynamoDB’s single-table data model architecture. In this model, multiple entity types can be stored in the same table. If necessary, to replicate data back to the PostgreSQL instance from the DynamoBD instance, we can perform CDC with DynamoDB Streams.

Creating a new Films data model for DynamoDB using NoSQL Workbench
Aggregate view of the DynamoDB single-table Films data model

CQRS

Command Query Responsibility Segregation (CQRS), a popular software architectural pattern, is another use case for multiple database engines. The CQRS pattern is, as the name implies, “a software design pattern that separates command activities from query activities. In CQRS parlance, a command writes data to a data source. A query reads data from a data source. CQRS addresses the problem of data access performance degradation when applications running at web-scale have too much burden placed on the physical database and the network on which it resides” (RedHat). CQRS commonly uses one database engine optimized for writes and a separate database optimized for reads.

CQRS architectural pattern using two different database engines

Conclusion

Embracing a microservices-based application architecture may have many business advantages for an organization. However, ignoring the application’s existing databases can negate many of the benefits of microservices. This post examined several common patterns for refactoring relational databases to match a modern microservices-based application architecture.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author.

, , , ,

Leave a comment

End-to-End Data Discovery, Observability, and Governance on AWS with LinkedIn’s Open-source DataHub

Use DataHub’s data catalog capabilities to collect, organize, enrich, and search for metadata across multiple platforms

Introduction

According to Shirshanka Das, Founder of LinkedIn DataHub, Apache Gobblin, and Acryl Data, one of the simplest definitions for a data catalog can be found on the Oracle website: “Simply put, a data catalog is an organized inventory of data assets in the organization. It uses metadata to help organizations manage their data. It also helps data professionals collect, organize, access, and enrich metadata to support data discovery and governance.

Another succinct description of a data catalog’s purpose comes from Alation: “a collection of metadata, combined with data management and search tools, that helps analysts and other data users to find the data that they need, serves as an inventory of available data, and provides information to evaluate the fitness of data for intended uses.

Working with many organizations in the area of Analytics, one of the more common requests I receive regards choosing and implementing a data catalog. Organizations have datasources hosted in corporate data centers, on AWS, by SaaS providers, and with other Cloud Service Providers. Several of these organizations have recently gravitated to DataHub, the open-source metadata platform for the modern data stack, originally developed by LinkedIn.

View of DataHub’s home screen showing a variety of datasources

In this post, we will explore the capabilities of DataHub to build a centralized data catalog on AWS for datasources hosted in multiple AWS accounts, SaaS providers, cloud service providers, and corporate data centers. I will demonstrate how to build a DataHub data catalog using out-of-the-box data source plugins for automated metadata ingestion.

Another example of searching for cataloged entities in DataHub’s browser-based UI

Data Catalog Competitors

Data catalogs are not new; technologies such as data dictionaries have been around as far back as the 1980’s. Gartner publishes their Metadata Management (EMM) Solutions Reviews and Ratings and Metadata Management Magic Quadrant. These reports contain a comprehensive list of traditional commercial enterprise players, modern cloud-native SaaS vendors, and Cloud Service Provider (CSP) offerings. DBMS Tools also hosts a comprehensive list of 30 data catalogs. A sampling of current data catalogs includes:

Open Source Software

Commercial

Cloud Service Providers

Data Catalog Features

DataHub describes itself as “a modern data catalog built to enable end-to-end data discovery, data observability, and data governance.” Sorting through vendor’s marketing jargon and hype, standard features of leading data catalogs include:

  • Metadata ingestion
  • Data discovery
  • Data governance
  • Data observability
  • Data lineage
  • Data dictionary
  • Data classification
  • Usage/popularity statistics
  • Sensitive data handling
  • Data fitness (aka data quality or data profiling)
  • Manage both technical and business metadata
  • Business glossary
  • Tagging
  • Natively supported datasource integrations
  • Advanced metadata search
  • Fine-grain authentication and authorization
  • UI- and API-based interaction

Datasources

When considering a data catalog solution, in my experience, the most common datasources that customers want to discover, inventory, and search include:

  • Relational databases and other OLTP datasources such as PostgreSQL, MySQL, Microsoft SQL Server, and Oracle
  • Cloud Data Warehouses and other OLAP datasources such as Amazon Redshift, Snowflake, and Google BigQuery
  • NoSQL datasources such as MongoDB, MongoDB Atlas, and Azure Cosmos DB
  • Persistent event-streaming platforms such as Apache Kafka (Amazon MSK and Confluent)
  • Distributed storage datasets (e.g., Data Lakes) such as Amazon S3, Apache Hive, and AWS Glue Data Catalogs
  • Business Intelligence (BI), dashboards, and data visualization sources such as Looker, Tableau, and Microsoft Power BI
  • ETL sources, such as Apache Spark, Apache Airflow, Apache NiFi, and dbt

DataHub on AWS

DataHub’s convenient AWS setup guide covers options to deploy DataHub to AWS. For this post, I have hosted DataHub on Kubernetes, using Amazon Elastic Kubernetes Service (Amazon EKS). Alternately, you could choose Google Kubernetes Engine (GKE) on Google Cloud or Azure Kubernetes Service (AKS) on Microsoft Azure.

Conveniently, DataHub offers a Helm chart, making deployment to Kubernetes straightforward. Furthermore, Helm charts are easily integrated with popular CI/CD tools. For this post, I’ve used ArgoCD, the declarative GitOps continuous delivery tool for Kubernetes, to deploy the DataHub Helm charts to Amazon EKS.

ArgoCD UI showing DataHub and its dependencies deployed to Amazon EKS

According to the documentation, DataHub consists of four main components: GMS, MAE Consumer (optional), MCE Consumer (optional), and Frontend. Kubernetes deployment for each of the components is defined as sub-charts under the main DataHub Helm chart.

External Storage Layer Dependencies

Four external storage layer dependencies power the main DataHub components: Kafka, Local DB (MySQL, Postgres, or MariaDB), Search Index (Elasticsearch), and Graph Index (Neo4j or Elasticsearch). DataHub has provided a separate DataHub Prerequisites Helm chart for the dependencies. The dependencies must be deployed before deploying DataHub.

Alternately, you can substitute AWS managed services for the external storage layer dependencies, which is also detailed in the Deploying to AWS documentation. AWS managed service dependency substitutions include Amazon RDS for MySQL, Amazon OpenSearch (fka Amazon Elasticsearch), and Amazon Managed Streaming for Apache Kafka (Amazon MSK). According to DataHub, support for using AWS Neptune as the Graph Index is coming soon.

DataHub CLI and Plug-ins

DataHub comes with the datahub CLI, allowing you to perform many common operations on the command line. You can install and use the DataHub CLI within your development environment or integrate it with your CI/CD tooling.

Available DataHub CLI commands

DataHub uses a plugin architecture. Plugins allow you to install only the datasource dependencies you need. For example, if you want to ingest metadata from Amazon Athena, just install the Athena plugin: pip install 'acryl-datahub[athena]'. DataHub Source, Sink, and Transformer plugins can be displayed using the datahub check plugins CLI command.

Example list of DataHub Source plugins installed
Example list of DataHub Sink and Transformer plugins installed

Secure Metadata Ingestion

Often, datasources are not externally accessible for security reasons. Further, many datasources may not be accessible to individual users, especially in higher environments like UAT, Staging, and Production. They are only accessible to applications or CI/CD tooling. To overcome these limitations when extracting metadata with DataHub, I prefer to perform my DataHub-related development and testing locally but execute all DataHub ingestion securely on AWS.

In my local development environment, I use JetBrains PyCharm to author the Python and YAML-based DataHub configuration files and ingestion pipeline recipes, then commit those files to git and push them to a private GitHub repository. Finally, I use GitHub Actions to test DataHub files.

To run DataHub ingestion jobs and push the results to DataHub running in Kubernetes on Amazon EKS, I have built a custom Python-based Docker container. The container runs the DataHub CLI, required DataHub plugins, and any additional Python dependencies. The container’s pod has the appropriate AWS IAM permissions, using IAM Roles for Service Accounts (IRSA), to securely access datasources to ingest and the DataHub application.

Schedule and Monitor Pipelines

Scheduling and managing multiple metadata ingestion jobs on AWS is best handled with Apache Airflow with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Ingestion jobs run as Airflow DAG tasks, which call the EKS-based DataHub CLI container. With MWAA, datasource connections, credentials, and other sensitive configurations can be kept secure and not be exposed externally or in plain text.

When running the ingestion pipelines on AWS with DataHub, all communications between AWS-based datasources, ingestion jobs running in Airflow, and DataHub, should use secure private IP addressing and DNS resolution instead of transferring metadata over the Internet. Make sure to create all the necessary VPC peering connections, network route table configurations, and VPC endpoints to connect all relevant services.

SaaS services such as Snowflake or MongoDB Atlas, services provided by other Cloud Service Providers such as Google Cloud and Microsoft Azure, and datasources in corporate datasources require alternate networking and security strategies to access metadata securely.

AWS-based DataHub high-level architecture

Markup or Code?

According to the documentation, a DataHub recipe is a configuration file that tells ingestion scripts where to pull data from (source) and where to put it (sink). Recipes normally contain a source, sink, and transformers configuration section. Mark-up language-based job automation written in YAML, JSON, or Domain Specific Languages (DSLs) is often an alternative to writing code. DataHub recipes can be written in YAML. The example recipe shown below is used to ingest metadata from an Amazon RDS for PostgreSQL database, running on AWS.

YAML-based recipes can also use automatic environment variable expansion for convenience, automation, and security. It is considered best practice to secure sensitive configuration values, such as database credentials, in a secure location and reference them as environment variables. For example, note the server: ${DATAHUB_REST_ENDPOINT} entry in the sink section below. The DATAHUB_REST_ENDPOINT environment variable is set ahead of time and re-used for all ingestion jobs. Sensitive database connection information has also been variablized and stored separately.

# Purpose: DataHub example recipe for PostgreSQL datasource
# Author: Gary A. Stafford
# Date: March 2022
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/postgres
source:
type: postgres
config:
# Coordinates
host_port: ${DB_HOST_PORT}
database: tickit
# Credentials
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
# Options
profiling:
enabled: true
# Environment
env: DEV
# see https://datahubproject.io/docs/metadata-ingestion/transformers/#adding-a-set-of-tags
transformers:
type: "simple_add_dataset_tags"
config:
tag_urns:
"urn:li:tag:AWS"
"urn:li:tag:${ACCOUNT_ID}"
"urn:li:tag:us-east-1"
type: "pattern_add_dataset_terms"
config:
term_pattern:
rules:
".*users.*": ["urn:li:glossaryTerm:Classification.Sensitive"]
type: "simple_add_dataset_ownership"
config:
owner_urns:
"urn:li:corpuser:Database Administrators"
ownership_type: "DATAOWNER"
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
config:
server: ${DATAHUB_REST_ENDPOINT}
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/reporting_telemetry/
pipeline_name: "postgres-pipeline-tickit"
reporting:
type: "datahub"
config:
datahub_api:
server: ${DATAHUB_REST_ENDPOINT}

Using Python

You can configure and run a pipeline entirely from within a custom Python script using DataHub’s Python API as an alternative to YAML. Below, we see two nearly identical ingestion recipes to the YAML above, written in Python. Writing ingestion pipeline logic programmatically gives you increased flexibility for automation, error checking, unit-testing, and notification. Below is a basic pipeline written in Python. The code is functional, but not very Pythonic, secure, scalable, or Production ready.

# Purpose: Simple programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
# Reference: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/library/programatic_pipeline.py
from datahub.ingestion.run.pipeline import Pipeline
# The pipeline configuration is similar to the recipe YAML files provided to the CLI tool.
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": "demo-instance.abcd1234.us-east-1.rds.amazonaws.com:5432",
"database": "tickit",
"username": "datahub",
"password": "My5up3r53cr3tPa55w0rd",
"env": "DEV",
"profiling": {
"enabled": "true"
}
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:AWS",
f"urn:li:tag:111222333444",
f"urn:li:tag:us-east-1"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:Database Administrators"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://192.168.111.222:33333&quot;
}
}
}
)
# Run the pipeline and report the results.
pipeline.run()
pipeline.pretty_print_summary()

The second version of the same pipeline is more Production ready. The code is more Pythonic in nature and makes use of error checking, logging, and the AWS Systems Manager (SSM) Parameter Store. Like recipes written in YAML, environment variables can be used for convenience and security. In this example, commonly reused and sensitive connection configuration items have been extracted and placed in the SSM Parameter Store. Additional configuration is pulled from the environment, such as AWS Account ID and AWS Region. The script loads these values at runtime.

# Purpose: Programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
import json
import logging
import boto3
from botocore.exceptions import ClientError
from datahub.ingestion.run.pipeline import Pipeline
logging.basicConfig(
format="[%(asctime)s] %(levelname)s – %(message)s", level=logging.INFO
)
def main():
sts_client = boto3.client("sts")
params = get_parameters()
params["owner"] = "Database Administrators"
params["environment"] = "DEV"
params["database"] = "tickit"
params["region"] = sts_client.meta.region_name
params["account"] = sts_client.get_caller_identity()["Account"]
logging.info(f"Params: {json.dumps(params, indent=4, sort_keys=True)}")
ingestion_pipeline = create_pipeline(params)
run_pipeline(ingestion_pipeline)
def create_pipeline(params) -> Pipeline:
"""Constructs a Pipeline for a PostgreSQL Source and a DataHub Sink
:return: instance of datahub.ingestion.run.pipeline
"""
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": params.get("/datahub_demo/postgres_host_port_tickit"),
"database": params.get("database"),
"username": params.get("/datahub_demo/postgres_username_tickit"),
"password": params.get("/datahub_demo/postgres_password_tickit"),
"profiling": {
"enabled": "true"
},
"env": params.get("environment"),
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:{params.get('account')}",
f"urn:li:tag:{params.get('region')}"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:{params.get('owner')}"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": params.get("/datahub_demo/datahub_rest_endpoint_public")
}
}
}
)
return pipeline
def run_pipeline(pipeline):
"""Runs the ingestion pipeline and prints summary of the results
:param pipeline: instance of datahub.ingestion.run.pipeline
:return:
"""
pipeline.run()
pipeline.pretty_print_summary()
def get_parameters() -> dict:
"""
Load parameter values from AWS Systems Manager (SSM) Parameter Store
:return: dict of parameter k/v's
"""
ssm_client = boto3.client("ssm")
params: dict = {}
try:
# make a single SSM API call for all parameters
response = ssm_client.get_parameters_by_path(
Path="/datahub_demo"
)
# create a dictionary of parameter k/v's
for param in response.get("Parameters"):
params[param["Name"]] = param["Value"]
logging.debug(f"Params: {params}")
except ClientError as e:
logging.error(e)
exit(1)
return params
if __name__ == '__main__':
main()

Sinking to DataHub

When syncing metadata to DataHub, you have two choices, the GMS REST API or Kafka. According to DataHub, the advantage of the REST-based interface is that any errors can immediately be reported. On the other hand, the advantage of the Kafka-based interface is that it is asynchronous and can handle higher throughput. For this post, I am DataHub’s REST API.

DataHub ingestion pipeline results for a Microsoft SQL Server datasource
Another example of a DataHub ingestion pipeline results for a Google BigQuery datasource

Column-level Metadata

In addition to column names and data types, it is possible to extract column descriptions and key types from certain datasources. Column descriptions, tags, and glossary terms can also be input through the DataHub UI. Below, we see an example of an Amazon Redshift fact table, whose table and column descriptions were ingested as part of the metadata.

Amazon Redshift fact table showing column-level metadata, tags, owners, and documentation

Business Glossary

DataHub can assign business glossary terms to entities. The DataHub Business Glossary plugin pulls business glossary metadata from a YAML-based configuration file.

# see sample: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/bootstrap_data/business_glossary.yml
version: 1
source: DataHub
owners:
users:
datahub
url: "https://github.com/datahub-project/datahub/"
nodes:
name: Classification
description: A set of terms related to Data Classification
terms:
name: Sensitive
description: Sensitive Data
custom_properties:
is_confidential: false
name: Confidential
description: Confidential Data
custom_properties:
is_confidential: true
name: HighlyConfidential
description: Highly Confidential Data
custom_properties:
is_confidential: true
name: PersonalInformation
description: All terms related to personal information
owners:
users:
datahub
terms:
name: ID
description: An individual's unqiue identifier
inherits:
Classification.Sensitive
name: Name
description: An individual's Name
inherits:
Classification.Sensitive
name: SSN
description: An individual's SSN
inherits:
Classification.Confidential
name: DriverLicense
description: An individual's Driver License ID
inherits:
Classification.Confidential
name: Email
description: An individual's email address
inherits:
Classification.Confidential
name: Address
description: A physical address
name: Gender
description: The gender identity of the individual
inherits:
Classification.Sensitive

Business glossary terms can be reviewed in the Glossary Terms tab of the DataHub’s UI. Below, we see the three terms associated with the Classification glossary node: Confidential, HighlyConfidential, and Sensitive.

Example of a related set of terms in DataHub’s Business Glossary

We can search for entities inventoried in DataHub using their assigned business glossary terms.

Dataset search results based on a term in DataHub’s Business Glossary

Finally, we see an example of an AWS Athena data catalog table with business glossary terms applied to columns within the table’s schema.

AWS Athena table showing column-level descriptions, glossary terms, tags, owners, and documentation

SQL-based Profiler

DataHub also can extract statistics about entities in DataHub using the SQL-based Profiler. According to the DataHub documentation, the Profiler can extract the following:

  • Row and column counts for each table
  • Column null counts and proportions
  • Column distinct counts and proportions
  • Column min, max, mean, median, standard deviation, quantile values
  • Column histograms or frequencies of unique values

In addition, we can also track the historical stats for each profiled entity each time metadata is ingested.

Amazon Redshift fact table showing SQL-based profiler column-level statistics
Another example, a Google BigQuery table showing SQL-based profiler column-level statistics

Data Lineage

DataHub’s data lineage features allow us to view upstream and downstream relationships between different types of entities. DataHub can trace lineage across multiple platforms, datasets, pipelines, charts, and dashboards.

Below, we see a simple example of dataset entity-to-entity lineage in Amazon Redshift and then Apache Spark on Amazon EMR. The fact table has a downstream relationship to four database views. The views are based on SQL queries that include the upstream table as a datasource.

Visual lineage view of Amazon Redshift fact table and its four downstream view dependencies
Another visual lineage example of an Apache Spark job with Apache Hive tables as both the source and sink

DataHub Analytics

DataHub provides basic metadata quality and usage analytics in the DataHub UI: user activity, counts of datasource types, business glossary terms, environments, and actions.

Examples of DataHub’s metadata quality and usage analytics capabilities
More examples of DataHub’s metadata quality and usage analytics capabilities

Conclusion

In this post, we explored the features of a data catalog and learned about some of the leading commercial and open-source data catalogs. Next, we learned how DataHub could collect, organize, enrich, and search metadata across multiple datasources. Lastly, we discovered how easy it is to catalog metadata from datasources spread across multiple CSP, SaaS providers, and corporate data centers, and centralize those results in DataHub.

In addition to the basic features reviewed in this post, DataHub offers a growing number of additional capabilities, including GraphQL and Timeline APIs, robust authentication and authorization, application monitoring observability, and Great Expectations integration. All these qualities make DataHub an excellent choice for a data catalog.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Data Preparation on AWS: Comparing Available ELT Options to Cleanse and Normalize Data

Comparing the features and performance of different AWS analytics services for Extract, Load, Transform (ELT)

Introduction

According to Wikipedia, “Extract, load, transform (ELT) is an alternative to extract, transform, load (ETL) used with data lake implementations. In contrast to ETL, in ELT models the data is not transformed on entry to the data lake but stored in its original raw format. This enables faster loading times. However, ELT requires sufficient processing power within the data processing engine to carry out the transformation on demand, to return the results in a timely manner.

As capital investments and customer demand continue to drive the growth of the cloud-based analytics market, the choice of tools seems endless, and that can be a problem. Customers face a constant barrage of commercial and open-source tools for their batch, streaming, and interactive exploratory data analytics needs. The major Cloud Service Providers (CSPs) have even grown to a point where they now offer multiple services to accomplish similar analytics tasks.

This post will examine the choice of analytics services available on AWS capable of performing ELT. Specifically, this post will compare the features and performance of AWS Glue Studio, Amazon Glue DataBrew, Amazon Athena, and Amazon EMR using multiple ELT use cases and service configurations.

Data pipeline architecture showing a choice of AWS ELT services

Analytics Use Case

We will address a simple yet common analytics challenge for this comparison — preparing a nightly data feed for analysis the next day. Each night a batch of approximately 1.2 GB of raw CSV-format healthcare data will be exported from a Patient Administration System (PAS) and uploaded to Amazon S3. The data must be cleansed, deduplicated, refined, normalized, and made available to the Data Science team the following morning. The team of Data Scientists will perform complex data analytics on the data and build machine learning models designed for early disease detection and prevention.

Sample Dataset

The dataset used for this comparison is generated by Synthea, an open-source patient population simulation. The high-quality, synthetic, realistic patient data and associated health records cover every aspect of healthcare. The dataset contains the patient-related healthcare history for allergies, care plans, conditions, devices, encounters, imaging studies, immunizations, medications, observations, organizations, patients, payers, procedures, providers, and supplies.

The Synthea dataset was first introduced in my March 2021 post examining the handling of sensitive PII data using Amazon Macie: Data Lakes: Discovery, Security, and Privacy of Sensitive Data.

The Synthea synthetic patient data is available in different record volumes and various data formats, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Since this post seeks to measure the performance of different AWS ELT-capable services, we will use a larger version of the Synthea dataset containing hundreds of thousands to millions of records.

AWS Glue Data Catalog

The dataset comprises nine uncompressed CSV files uploaded to Amazon S3 and cataloged to an AWS Glue Data Catalog, a persistent metadata store, using an AWS Glue Crawler.

Raw Synthea CSV data, in S3, cataloged in AWS Glue Data Catalog

Test Cases

We will use three data preparation test cases based on the Synthea dataset to examine the different AWS ELT-capable services.

Specifications for three different test cases

Test Case 1: Encounters for Symptom

An encounter is a health care contact between the patient and the provider responsible for diagnosing and treating the patient. In our first test case, we will process 1.26M encounters records for an ongoing study of patient symptoms by our Data Science team.

id date patient code description reasoncode reasondescription
714fd61a-f9fd-43ff-87b9-3cc45a3f1e53 2014-01-09 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
23e07532-8b96-4d05-b14e-d4c5a5288ed2 2014-08-18 33f33990-ae8b-4be8-938f-e47ad473abfe 185349003 Outpatient Encounter
45044100-aaba-4209-8ad1-15383c76842d 2015-07-12 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 36971009 Sinusitis (disorder)
ffdddbfb-35e8-4a74-a801-89e97feed2f3 2014-08-12 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
352d1693-591a-4615-9b1b-f145648f49cc 2016-05-25 36d131ee-dd5b-4acb-acbe-19961c32c099 185349003 Outpatient Encounter
4620bd2f-8010-46a9-82ab-8f25eb621c37 2016-10-07 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 195662009 Acute viral pharyngitis (disorder)
815494d8-2570-4918-a8de-fd4000d8100f 2010-08-02 660bec03-9e58-47f2-98b9-2f1c564f3838 698314001 Consultation for treatment
67ec5c2d-f41e-4538-adbe-8c06c71ddc35 2010-11-22 660bec03-9e58-47f2-98b9-2f1c564f3838 170258001 Outpatient Encounter
dbe481ce-b961-4f43-ac0a-07fa8cfa8bdd 2012-11-21 660bec03-9e58-47f2-98b9-2f1c564f3838 50849002 Emergency room admission
b5f1ab7e-5e67-4070-bcf0-52451eb20551 2013-12-04 660bec03-9e58-47f2-98b9-2f1c564f3838 185345009 Encounter for symptom 10509002 Acute bronchitis (disorder)
view raw encounters.csv hosted with ❤ by GitHub
Sample of raw encounters data

Data preparation includes the following steps:

  1. Load 1.26M encounter records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Select only the records where the description column contains “Encounter for symptom.”
  4. Remove any rows with an empty reasoncodes column.
  5. Extract a new year, month, and day column from the date column.
  6. Remove the date column.
  7. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  8. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  9. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 2: Observations

Clinical observations ensure that treatment plans are up-to-date and correctly administered and allow healthcare staff to carry out timely and regular bedside assessments. We will process 5.38M encounters records for our Data Science team in our second test case.

date patient encounter code description value units
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8302-2 Body Height 175.76 cm
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 29463-7 Body Weight 56.51 kg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 39156-5 Body Mass Index 18.29 kg/m2
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8480-6 Systolic Blood Pressure 119.0 mmHg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8462-4 Diastolic Blood Pressure 77.0 mmHg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8302-2 Body Height 177.25 cm
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 29463-7 Body Weight 59.87 kg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 39156-5 Body Mass Index 19.05 kg/m2
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8480-6 Systolic Blood Pressure 113.0 mmHg
2012-03-26 36d131ee-dd5b-4acb-acbe-19961c32c099 296a1fd4-56de-451c-a5fe-b50f9a18472d 8302-2 Body Height 174.17 cm
Sample of raw observations data

Data preparation includes the following steps:

  1. Load 5.38M observation records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Extract a new year, month, and day column from the date column.
  4. Remove the date column.
  5. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  6. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  7. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 3: Sinusitis Study

A medical condition is a broad term that includes all diseases, lesions, and disorders. In our second test case, we will join the conditions records with the patient records and filter for any condition containing the term ‘sinusitis’ in preparation for our Data Science team.

start stop patient encounter code description
2012-09-05 2012-10-16 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 05a6ef43-d690-455e-ab2f-1ea19d902274 44465007 Sprain of ankle
2014-09-08 2014-09-28 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 1cdcbe46-caaf-4b3f-b58c-9ca9ccb13013 283371005 Laceration of forearm
2014-11-28 2014-12-13 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 b222e257-98da-4a1b-a46c-45d5ad01bbdc 195662009 Acute viral pharyngitis (disorder)
1980-01-09 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 40055000 Chronic sinusitis (disorder)
1989-06-25 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 201834006 Localized primary osteoarthritis of the hand
1996-01-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 196416002 Impacted molars
2016-02-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 748cda45-c267-46b2-b00d-3b405a44094e 15777000 Prediabetes
2016-04-27 2016-05-20 01858c8d-f81c-4a95-ab4f-bd79fb62b284 a64734f1-5b21-4a59-b2e8-ebfdb9058f8b 444814009 Viral sinusitis (disorder)
2014-02-06 2014-02-19 d32e9ad2-4ea1-4bb9-925d-c00fe85851ae c64d3637-8922-4531-bba5-f3051ece6354 43878008 Streptococcal sore throat (disorder)
1982-05-18 08858d24-52f2-41dd-9fe9-cbf1f77b28b2 3fff3d52-a769-475f-b01b-12622f4fee17 368581000119106 Neuropathy due to type 2 diabetes mellitus (disorder)
view raw conditions.csv hosted with ❤ by GitHub
Sample of raw conditions data

Data preparation includes the following steps:

  1. Load 483k condition records using the existing AWS Glue Data Catalog table.
  2. Inner join the condition records with the 132k patient records based on patient ID.
  3. Remove any duplicate records.
  4. Drop approximately 15 unneeded columns.
  5. Select only the records where the description column contains the term “sinusitis.”
  6. Remove any rows with empty ethnicity, race, gender, or marital columns.
  7. Create a new column, condition_age, based on a calculation of the age in days at which the patient’s condition was diagnosed.
  8. Write the resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet-format files. No partitions are necessary.
  9. Given the small resultset, bucket the data such that only one file is written to minimize the impact of too many small files on future query performance.
  10. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog.

AWS ELT Options

There are numerous options on AWS to handle the batch transformation use case described above; a non-exhaustive list includes:

  1. AWS Glue Studio (UI-driven with AWS Glue PySpark Extensions)
  2. Amazon Glue DataBrew
  3. Amazon Athena
  4. Amazon EMR with Apache Spark
  5. AWS Glue Studio (Apache Spark script)
  6. AWS Glue Jobs (Legacy jobs)
  7. Amazon EMR with Presto
  8. Amazon EMR with Trino
  9. Amazon EMR with Hive
  10. AWS Step Functions and AWS Lambda
  11. Amazon Redshift Spectrum
  12. Partner solutions on AWS, such as Databricks, Snowflake, Upsolver, StreamSets, Stitch, and Fivetran
  13. Self-managed custom solutions using a combination of OSS, such as dbt, Airbyte, Dagster, Meltano, Apache NiFi, Apache Drill, Apache Beam, Pandas, Apache Airflow, and Kubernetes

For this comparison, we will choose the first five options listed above to develop our ELT data preparation pipelines: AWS Glue Studio (UI-driven job creation with AWS Glue PySpark Extensions), Amazon Glue DataBrew, Amazon Athena, Amazon EMR with Apache Spark, and AWS Glue Studio (Apache Spark script).

Data pipeline architecture showing a choice of AWS ELT services

AWS Glue Studio

According to the documentation, “AWS Glue Studio is a new graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. You can visually compose data transformation workflows and seamlessly run them on AWS Glue’s Apache Spark-based serverless ETL engine. You can inspect the schema and data results in each step of the job.

AWS Glue Studio’s visual job creation capability uses the AWS Glue PySpark Extensions, an extension of the PySpark Python dialect for scripting ETL jobs. The extensions provide easier integration with AWS Glue Data Catalog and other AWS-managed data services. As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Spark scripts with AWS Glue Studio. In fact, we can use the exact same scripts run on Amazon EMR.

For the tests, we are using the G.2X worker type, Glue version 3.0 (Spark 3.1.1 and Python 3.7), and Python as the language choice for this comparison. We will test three worker configurations using both UI-driven job creation with AWS Glue PySpark Extensions and Apache Spark script options:

  • 10 workers with a maximum of 20 DPUs
  • 20 workers with a maximum of 40 DPUs
  • 40 workers with a maximum of 80 DPUs
AWS Glue Studio visual job creation UI for Test Case 3: Sinusitis Study

AWS Glue Studio Spark job details for Test Case 2: Observations

AWS Glue Studio job runs for Test Case 2: Observations

AWS Glue DataBrew

According to the documentation, “AWS Glue DataBrew is a visual data preparation tool that enables users to clean and normalize data without writing any code. Using DataBrew helps reduce the time it takes to prepare data for analytics and machine learning (ML) by up to 80 percent, compared to custom-developed data preparation. You can choose from over 250 ready-made transformations to automate data preparation tasks, such as filtering anomalies, converting data to standard formats, and correcting invalid values.

DataBrew allows you to set the maximum number of DataBrew nodes that can be allocated when a job runs. For this comparison, we will test three different node configurations:

  • 3 maximum nodes
  • 10 maximum nodes
  • 20 maximum nodes
AWS Glue DataBrew Project for Test Case 3: Sinusitis Study

AWS Glue DataBrew Recipe for Test Case 1: Encounters for Symptom

AWS Glue DataBrew recipe job runs for Test Case 1: Encounters for Symptom

Amazon Athena

According to the documentation, “Athena helps you analyze unstructured, semi-structured, and structured data stored in Amazon S3. Examples include CSV, JSON, or columnar data formats such as Apache Parquet and Apache ORC. You can use Athena to run ad-hoc queries using ANSI SQL, without the need to aggregate or load the data into Athena.

Although Athena is classified as an ad-hoc query engine, using a CREATE TABLE AS SELECT (CTAS) query, we can create a new table in the AWS Glue Data Catalog and write to Amazon S3 from the results of a SELECT statement from another query. That other query statement performs a transformation on the data using SQL.

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 2: Observations

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 3: Sinusitis Study

Amazon Athena is a fully managed AWS service and has no performance settings to adjust or monitor.

Amazon Athena CTAS statement for Test Case 1: Encounters for Symptom

Parquet data partitioned by year in Amazon S3 for Test Case 1: Encounters for Symptom, using Athena

CTAS and Partitions

A notable limitation of Amazon Athena for the batch use case is the 100 partition limit with CTAS queries. Athena [only] supports writing to 100 unique partition and bucket combinations with CTAS. Partitioned by year, month, and day, the observations test case requires 2,558 partitions, and the observations test case requires 10,433 partitions. There is a recommended workaround using an INSERT INTO statement. However, the workaround requires additional SQL logic, computation, and most important cost. It is not practical, in my opinion, compared to other methods when a higher number of partitions are needed. To avoid the partition limit with CTAS, we will only partition by year and bucket by month when using Athena. Take this limitation into account when comparing the final results.

Amazon EMR with Apache Spark

According to the documentation, “Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. You can quickly and easily create managed Spark clusters from the AWS Management Console, AWS CLI, or the Amazon EMR API.

For this comparison, we are using two different Spark 3.1.2 EMR clusters:

  • (1) r5.xlarge Master node and (2) r5.2xlarge Core nodes
  • (1) r5.2xlarge Master node and (4) r5.2xlarge Core nodes

All Spark jobs are written in both Python (PySpark) and Scala. We are using the AWS Glue Data Catalog as the metastore for Spark SQL instead of Apache Hive.

4-node Amazon EMR cluster shown in Amazon EMR Management Console

Completed EMR Steps (Spark Jobs) on 4-node Amazon EMR cluster

# Purpose: Process data for sinusitis study using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "sinusitis_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
patient,
code,
description,
datediff(
date(substr(start, 1, 10)),
date(substr(birthdate, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions as c, patients as p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.bucketBy(1, "patient") \
.sortBy("patient", "code") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 3: Sinusitis Study

# Purpose: Process encounters dataset using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "encounter_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition",
"true") \
.config("hive.exec.dynamic.partition.mode",
"nonstrict") \
.config("hive.exec.max.dynamic.partitions",
"10000") \
.config("hive.exec.max.dynamic.partitions.pernode",
"10000") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
id,
patient,
code,
description,
reasoncode,
reasondescription,
year(date) as year,
month(date) as month,
day(date) as day
FROM encounters
WHERE description='Encounter for symptom';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.partitionBy("year", "month", "day") \
.bucketBy(1, "patient") \
.sortBy("patient") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 1: Encounters for Symptom

package main.spark.demo
// Purpose: Process observations dataset using Spark on Amazon EMR with Scala
// Author: Gary A. Stafford
// Date: 2022-03-06
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object Observations {
def main(args: Array[String]): Unit = {
val (spark: SparkSession, sc: SparkContext) = createSession
performELT(spark, sc)
}
private def createSession = {
val spark: SparkSession = SparkSession.builder
.appName("Observations ELT App")
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
.config("hive.exec.dynamic.partition",
"true")
.config("hive.exec.dynamic.partition.mode",
"nonstrict")
.config("hive.exec.max.dynamic.partitions",
"10000")
.config("hive.exec.max.dynamic.partitions.pernode",
"10000")
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("INFO")
(spark, sc)
}
private def performELT(spark: SparkSession, sc: SparkContext) = {
val tableName: String = sc.getConf.get("spark.executorEnv.TABLE_NAME")
val dataLakeBucket: String = sc.getConf.get("spark.executorEnv.DATA_LAKE_BUCKET")
spark.sql("USE synthea_patient_big_data;")
val sql_query_data: String =
"""
SELECT DISTINCT
patient,
encounter,
code,
description,
value,
units,
year(date) as year,
month(date) as month,
day(date) as day
FROM observations
WHERE date <> 'date';
"""
val observationsDF: DataFrame = spark
.sql(sql_query_data)
observationsDF
.coalesce(1)
.write
.partitionBy("year", "month", "day")
.bucketBy(1, "patient")
.sortBy("patient")
.mode("overwrite")
.format("parquet")
.option("path", s"s3://${dataLakeBucket}/${tableName}/")
.saveAsTable(tableName = tableName)
spark.sql(s"ALTER TABLE ${tableName} SET TBLPROPERTIES ('classification'='parquet');")
}
}
Spark jobs written in Scala had nearly identical execution times, such as Test Case 2: Observations

Partitions in the AWS Glue Data Catalog table for Test Case 1: Encounters for Symptom

Results

Data pipelines were developed and tested for each of the three test cases using the five chosen AWS ELT services and configuration variations. Each pipeline was then run 3–5 times, for a total of approximately 150 runs. The resulting AWS Glue Data Catalog table and data in Amazon S3 were deleted between each pipeline run. Each new run created a new data catalog table and wrote new results to Amazon S3. The median execution times from these tests are shown below.

Number of raw and processed records for each test case

Overall results (see details below) — lower times are better

Although we can make some general observations about the execution times of the chosen AWS services, the results are not meant to be a definitive guide to performance. An accurate comparison would require a deeper understanding of how each of these managed services works under the hood, in order to both optimize and balance their compute profiles correctly.

Amazon Athena

The Resultset column contains the final number of records written to Amazon S3 by Athena. The results contain the data pipeline’s median execution time and any additional data points.

Results for Amazon Athena data pipelines

AWS Glue Studio (AWS Glue PySpark Extensions)

Tests were run with three different configurations for AWS Glue Studio using the graphical interface for creating jobs with AWS Glue PySpark Extensions. Times for each configuration were nearly identical.

Results for data pipelines using AWS Glue Studio with AWS Glue PySpark Extensions

AWS Glue Studio (Apache PySpark script)

As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Apache Spark scripts with AWS Glue Studio. The tests were run with the same three configurations as above. The execution times compared to the Amazon EMR tests, below, are almost identical.

Results for data pipelines using PySpark scripts on AWS Glue Studio

Amazon EMR with Apache Spark

Tests were run with three different configurations for Amazon EMR with Apache Spark using PySpark. The first set of results is for the 2-node EMR cluster. The second set of results is for the 4-node cluster. The third set of results is for the same 4-node cluster in which the data was not bucketed into a single file within each partition. Compare the execution times and the number of objects against the previous set of results. Too many small files can negatively impact query performance.

Results for data pipelines using Amazon EMR with Apache Spark — times for PySpark scripts

It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had similar execution times for all three test cases.

Results for data pipelines using Amazon EMR with Apache Spark — Python vs. Scala

Amazon Glue DataBrew

Tests were run with three different configurations Amazon Glue DataBrew, including 3, 10, and 20 maximum nodes. Times for each configuration were nearly identical.

Results for data pipelines using Amazon Glue DataBrew

Observations

  1. All tested AWS services can read and write to an AWS Glue Data Catalog and the underlying datastore, Amazon S3. In addition, they all work with the most common analytics data file formats.
  2. All tested AWS services have rich APIs providing access through the AWS CLI and SDKs, which support multiple programming languages.
  3. Overall, AWS Glue Studio, using the AWS Glue PySpark Extensions, appears to be the most capable ELT tool of the five services tested and with the best performance.
  4. Both AWS Glue DataBrew and AWS Glue Studio are no-code or low-code services, democratizing access to data for non-programmers. Conversely, Amazon Athena requires knowledge of ANSI SQL, and Amazon EMR with Apache Spark requires knowledge of Scala or Python. Be cognizant of the potential trade-offs from using no-code or low-code services on observability, configuration control, and automation.
  5. Both AWS Glue DataBrew and AWS Glue Studio can write a custom Parquet writer type optimized for Dynamic Frames, GlueParquet. One potential advantage, a pre-computed schema is not required before writing.
  6. There is a slight ‘cold-start’ with Glue Studio. Studio startup times ranged from 7 seconds to 2 minutes and 4 seconds in the tests. However, the lower execution time of AWS Glue Studio compared to Amazon EMR with Spark and AWS Glue DataBrew in the tests offsets any initial cold-start time, in my opinion.
  7. Changing the maximum number of units from 3 to 10 to 20 for AWS Glue DataBrew made negligible differences in job execution times. Given the nearly identical execution times, it is unclear exactly how many units are being used by the job. More importantly, how many DataBrew node hours we are being billed for. These are some of the trade-offs with a fully-managed service — visibility and fine-tuning configuration.
  8. Similarly, with AWS Glue Studio, using either 10 workers w/ max. 20 DPUs, 20 workers w/ max. 40 DPUs, or 40 workers w/ max. 80 DPUs resulted in nearly identical executions times.
  9. Amazon Athena had the fastest execution times but is limited by the 100 partition limit for large CTAS resultsets. Athena is not practical, in my opinion, compared to other ELT methods, when a higher number of partitions are needed.
  10. It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had almost identical execution times for all three test cases.
  11. Using Amazon EMR with EC2 instances takes about 9 minutes to provision a new cluster for this comparison fully. Given nearly identical execution times to AWS Glue Studio with Apache Spark scripts, Glue has the clear advantage of nearly instantaneous startup times.
  12. AWS recently announced Amazon EMR Serverless. Although this service is still in Preview, this new version of EMR could potentially reduce or eliminate the lengthy startup time for ephemeral clusters requirements.
  13. Although not discussed, scheduling the data pipelines to run each night was a requirement for our use case. AWS Glue Studio jobs and AWS Glue DataBrew jobs are schedulable from those services. For Amazon EMR and Amazon Athena, we could use Amazon Managed Workflows for Apache Airflow (MWAA), AWS Data Pipeline, or AWS Step Functions combined with Amazon CloudWatch Events Rules to schedule the data pipelines.

Conclusion

Customers have many options for ELT — the cleansing, deduplication, refinement, and normalization of raw data. We examined chosen services on AWS, each capable of handling the analytics use case presented. The best choice of tools depends on your specific ELT use case and performance requirements.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Video Demonstration: Lakehouse Automation on AWS with Apache Airflow

Programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow

Introduction

In the following video demonstration, we will learn how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow. Since we are on AWS, we will be using the fully-managed Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Using Airflow, we will COPY raw data into staging tables, then merge that staging data into a series of tables. We will then load incremental data into Redshift on a regular schedule. Next, we will join and aggregate data from several tables and UNLOAD the resulting dataset to an Amazon S3-based data lake. Lastly, we will catalog the data in S3 using AWS Glue and query with Amazon Athena.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL statements, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs included in the GitHub project are:

Demonstration DAGs as seen in MWAA Airflow UI

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Video Demonstration: Ahana Cloud for Presto on AWS using Apache Hive and AWS Glue

Using Ahana Cloud for Presto to perform analytics on AWS using both Apache Hive and AWS Glue as metastores

Introduction

The following series of five videos are an extended version of the demonstration featured in the October 2021 webinar, Build an Open Data Lake on AWS with Presto. An on-demand copy of the live webinar is available on Ahana.io, featuring Dipti Borkar (Ahana Co-Founder and CPO) and I.

In the demonstration, we will build a data lake on AWS using a combination of Ahana Cloud for Presto, Apache Hive, Apache Superset, Amazon S3, AWS Glue, and Amazon Athena. We then analyze the data in Apache Superset using Ahana Cloud for Presto.

Build an Open Data Lake on AWS with Presto

Demonstration

The demonstration is divided into five YouTube videos (playlist):

Ahana Cloud for Presto Demo — Part 1/5: Public GitHub Resources

Ahana Cloud for Presto Demo — Part 2/5: MoMa Datasource

Ahana Cloud for Presto Demo — Part 3/5: Ahana SaaS

Ahana Cloud for Presto Demo — Part 4/5: AWS Glue & Amazon

Ahana Cloud for Presto Demo — Part 5/5: PrestoDB & Apache Hive

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. In the webinar and the videos, the Apache Hive and AWS Glue data catalog tables contain an _athena or _presto suffix. For clarity, in the source code, I have changed those to indicate the metastore they are associated with, _hive or _glue, since either set of tables can be queried Presto. Additionally, in the webinar and the videos, the raw data files were uploaded to Amazon S3 in uncompressed CSV format; this is unnecessary. The CTAS SQL statements both expect GZIP-compressed CSV files. To save time and cost, upload the compressed files, as they are, to Amazon S3.

The following files are used in the demonstration:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , , , ,

Leave a comment

Java Development with Microsoft SQL Server: Calling Microsoft SQL Server Stored Procedures from Java Applications Using JDBC

Introduction

Enterprise software solutions often combine multiple technology platforms. Accessing an Oracle database via a Microsoft .NET application and vice-versa, accessing Microsoft SQL Server from a Java-based application is common. In this post, we will explore the use of the JDBC (Java Database Connectivity) API to call stored procedures from a Microsoft SQL Server 2017 database and return data to a Java 11-based console application.

View of the post’s Java project from JetBrains’ IntelliJ IDE

The objectives of this post include:

  • Demonstrate the differences between using static SQL statements and stored procedures to return data.
  • Demonstrate three types of JDBC statements to return data: Statement, PreparedStatement, and CallableStatement.
  • Demonstrate how to call stored procedures with input and output parameters.
  • Demonstrate how to return single values and a result set from a database using stored procedures.

Why Stored Procedures?

To access data, many enterprise software organizations require their developers to call stored procedures within their code as opposed to executing static T-SQL (Transact-SQL) statements against the database. There are several reasons stored procedures are preferred:

  • Optimization: Stored procedures are often written by DBAs or database developers who specialize in database development. They understand the best way to construct queries for optimal performance and minimal load on the database server. Think of it as a developer using an API to interact with the database.
  • Safety and Security: Stored procedures are considered safer and more secure than static SQL statements. The stored procedure provides tight control over the content of the queries, preventing malicious or unintentionally destructive code from being executed against the database.
  • Error Handling: Stored procedures can contain logic for handling errors before they bubble up to the application layer and possibly to the end-user.

AdventureWorks 2017 Database

For brevity, I will use an existing and well-known Microsoft SQL Server database, AdventureWorks. The AdventureWorks database was originally published by Microsoft for SQL Server 2008. Although a bit dated architecturally, the database comes prepopulated with plenty of data for demonstration purposes.

The HumanResources schema, one of five schemas within the AdventureWorks database

For the demonstration, I have created an Amazon RDS for SQL Server 2017 Express Edition instance on AWS. You have several options for deploying SQL Server, including AWS, Microsoft Azure, Google Cloud, or installed on your local workstation.

There are many methods to deploy the AdventureWorks database to Microsoft SQL Server. For this post’s demonstration, I used the AdventureWorks2017.bak backup file, which I copied to Amazon S3. Then, I enabled and configured the native backup and restore feature of Amazon RDS for SQL Server to import and install the backup.

DROP DATABASE IF EXISTS AdventureWorks;
GO

EXECUTE msdb.dbo.rds_restore_database
@restore_db_name='AdventureWorks',
@s3_arn_to_restore_from='arn:aws:s3:::my-bucket/AdventureWorks2017.bak',
@type='FULL',
@with_norecovery=0;

-- get task_id from output (e.g. 1)

EXECUTE msdb.dbo.rds_task_status
@db_name='AdventureWorks',
@task_id=1;

Install Stored Procedures

For the demonstration, I have added four stored procedures to the AdventureWorks database to use in this post. To follow along, you will need to install these stored procedures, which are included in the GitHub project.

View of the new stored procedures from JetBrains’ IntelliJ IDE Database tab

Data Sources, Connections, and Properties

Using the latest Microsoft JDBC Driver 8.4 for SQL Server (ver. 8.4.1.jre11), we create a SQL Server data source, com.microsoft.sqlserver.jdbc.SQLServerDataSource, and database connection, java.sql.Connection. There are several patterns for creating and working with JDBC data sources and connections. This post does not necessarily focus on the best practices for creating or using either. In this example, the application instantiates a connection class, SqlConnection.java, which in turn instantiates the java.sql.Connection and com.microsoft.sqlserver.jdbc.SQLServerDataSource objects. The data source’s properties are supplied from an instance of a singleton class, ProjectProperties.java. This properties class instantiates the java.util.Properties class, which reads values from a configuration properties file, config.properties. On startup, the application creates the database connection, calls each of the example methods, and then closes the connection.

Examples

For each example, I will show the stored procedure, if applicable, followed by the Java method that calls the procedure or executes the static SQL statement. I have left out the data source and connection code in the article. Again, a complete copy of all the code for this article is available on GitHub, including Java source code, SQL statements, helper SQL scripts, and a set of basic JUnit tests.

To run the JUnit unit tests, using Gradle, which the project is based on, use the ./gradlew cleanTest test --warning-mode none command.

A successful run of the JUnit tests

To build and run the application, using Gradle, which the project is based on, use the ./gradlew run --warning-mode none command.

The output of the Java console application

Example 1: SQL Statement

Before jumping into stored procedures, we will start with a simple static SQL statement. This example’s method, getAverageProductWeightST, uses the java.sql.Statement class. According to Oracle’s JDBC documentation, the Statement object is used for executing a static SQL statement and returning the results it produces. This SQL statement calculates the average weight of all products in the AdventureWorks database. It returns a solitary double numeric value. This example demonstrates one of the simplest methods for returning data from SQL Server.

/**
* Statement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightST() {
double averageWeight = 0;
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.getConnection().createStatement();
String sql = "WITH Weights_CTE(AverageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2)" +
"FROM [Weights_CTE];";
rs = stmt.executeQuery(sql);
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 2: Prepared Statement

Next, we will execute almost the same static SQL statement as in Example 1. The only change is the addition of the column name, averageWeight. This allows us to parse the results by column name, making the code easier to understand as opposed to using the numeric index of the column as in Example 1.

Also, instead of using the java.sql.Statement class, we use the java.sql.PreparedStatement class. According to Oracle’s documentation, a SQL statement is precompiled and stored in a PreparedStatement object. This object can then be used to execute this statement multiple times efficiently.

/**
* PreparedStatement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightPS() {
double averageWeight = 0;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
String sql = "WITH Weights_CTE(averageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2) AS [averageWeight]" +
"FROM [Weights_CTE];";
pstmt = connection.getConnection().prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
averageWeight = rs.getDouble("averageWeight");
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 3: Callable Statement

In this example, the average product weight query has been moved into a stored procedure. The procedure is identical in functionality to the static statement in the first two examples. To call the stored procedure, we use the java.sql.CallableStatement class. According to Oracle’s documentation, the CallableStatement extends PreparedStatement. It is the interface used to execute SQL stored procedures. The CallableStatement accepts both input and output parameters; however, this simple example does not use either. Like the previous two examples, the procedure returns a double numeric value.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeight]
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
ResultSet rs = null;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeight]}");
cstmt.execute();
rs = cstmt.getResultSet();
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 4: Calling a Stored Procedure with an Output Parameter

In this example, we use almost the same stored procedure as in Example 3. The only difference is the inclusion of an output parameter. This time, instead of returning a result set with a value in a single unnamed column, the column has a name, averageWeight. We can now call that column by name when retrieving the value.

The stored procedure patterns found in Examples 3 and 4 are both commonly used. One procedure uses an output parameter, and one not, both return the same value(s). You can use the CallableStatement to for either type.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeightOUT]@averageWeight DECIMAL(8, 2) OUT
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT @averageWeight = ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) output parameter, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightOutCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeightOUT](?)}");
cstmt.registerOutParameter("averageWeight", Types.DECIMAL);
cstmt.execute();
averageWeight = cstmt.getDouble("averageWeight");
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 5: Calling a Stored Procedure with an Input Parameter

In this example, the procedure returns a result set, java.sql.ResultSet, of employees whose last name starts with a particular sequence of characters (e.g., ‘M’ or ‘Sa’). The sequence of characters is passed as an input parameter, lastNameStartsWith, to the stored procedure using the CallableStatement.

The method making the call iterates through the rows of the result set returned by the stored procedure, concatenating multiple columns to form the employee’s full name as a string. Each full name string is then added to an ordered collection of strings, a List<String> object. The List instance is returned by the method. You will notice this procedure takes a little longer to run because of the use of the LIKE operator. The database server has to perform pattern matching on each last name value in the table to determine the result set.

CREATE OR
ALTER PROCEDURE [HumanResources].[uspGetEmployeesByLastName]
@lastNameStartsWith VARCHAR(20) = 'A'
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[FirstName], p.[MiddleName], p.[LastName], p.[Suffix], e.[JobTitle], m.[EmailAddress]
FROM [HumanResources].[Employee] AS e
LEFT JOIN [Person].[Person] p ON e.[BusinessEntityID] = p.[BusinessEntityID]
LEFT JOIN [Person].[EmailAddress] m ON e.[BusinessEntityID] = m.[BusinessEntityID]
WHERE e.[CurrentFlag] = 1
AND p.[PersonType] = 'EM'
AND p.[LastName] LIKE @lastNameStartsWith + '%'
ORDER BY p.[LastName], p.[FirstName], p.[MiddleName]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) input parameter, returns ResultSet
*
*
@param lastNameStartsWith
*
@return List of employee names
*/
public List<String> getEmployeesByLastNameCS(String lastNameStartsWith) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<String> employeeFullName = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [HumanResources].[uspGetEmployeesByLastName](?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("lastNameStartsWith", lastNameStartsWith);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
employeeFullName.add(
rs.getString("LastName") + ", "
+ rs.getString("FirstName") + " "
+ rs.getString("MiddleName"));
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return employeeFullName;
}

Example 6: Converting a Result Set to Ordered Collection of Objects

In this last example, we pass two input parameters, productColor and productSize, to a slightly more complex stored procedure. The stored procedure returns a result set containing several columns of product information. This time, the example’s method iterates through the result set returned by the procedure and constructs an ordered collection of products, List<Product> object. The Product objects in the list are instances of the Product.java POJO class. The method converts each results set’s row’s field value into a Product property (e.g., Product.Size, Product.Model). Using a collection is a common method for persisting data from a result set in an application.

CREATE OR
ALTER PROCEDURE [Production].[uspGetProductsByColorAndSize]
@productColor VARCHAR(20),
@productSize INTEGER
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[ProductNumber], m.[Name] AS [Model], p.[Name] AS [Product], p.[Color], p.[Size]
FROM [Production].[ProductModel] AS m
INNER JOIN
[Production].[Product] AS p ON m.[ProductModelID] = p.[ProductModelID]
WHERE (p.[Color] = @productColor)
AND (p.[Size] = @productSize)
ORDER BY p.[ProductNumber], [Model], [Product]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (2) input parameters, returns ResultSet
*
*
@param color
*
@param size
*
@return List of Product objects
*/
public List<Product> getProductsByColorAndSizeCS(String color, String size) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<Product> productList = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetProductsByColorAndSize](?, ?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("productColor", color);
cstmt.setString("productSize", size);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
Product product = new Product(
rs.getString("Product"),
rs.getString("ProductNumber"),
rs.getString("Color"),
rs.getString("Size"),
rs.getString("Model"));
productList.add(product);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return productList;
}

Proper T-SQL: Schema Reference and Brackets

You will notice in all T-SQL statements, I refer to the schema as well as the table or stored procedure name (e.g., {call [Production].[uspGetAverageProductWeightOUT](?)}). According to Microsoft, it is always good practice to refer to database objects by a schema name and the object name, separated by a period; that even includes the default schema (e.g., dbo).

You will also notice I wrap the schema and object names in square brackets (e.g., SELECT [ProductNumber] FROM [Production].[ProductModel]). The square brackets are to indicate that the name represents an object and not a reserved word (e.g, CURRENT or NATIONAL). By default, SQL Server adds these to make sure the scripts it generates run correctly.

Running the Examples

The application will display the name of the method being called, a description, the duration of time it took to retrieve the data, and the results returned by the method.

package com.article.examples;
import java.util.List;
/**
* Main class that calls all example methods
*
* @author Gary A. Stafford
*/
public class RunExamples {
private static final Examples examples = new Examples();
private static final ProcessTimer timer = new ProcessTimer();
/**
* @param args the command line arguments
* @throws Exception
*/
public static void main(String[] args) throws Exception {
System.out.println();
System.out.println("SQL SERVER STATEMENT EXAMPLES");
System.out.println("======================================");
// Statement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
double averageWeight = examples.getAverageProductWeightST();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightST");
System.out.println("Description: Statement, no parameters, returns Integer");