Archive for category Analytics

End-to-End Data Discovery, Observability, and Governance on AWS with LinkedIn’s Open-source DataHub

Use DataHub’s data catalog capabilities to collect, organize, enrich, and search for metadata across multiple platforms

Introduction

According to Shirshanka Das, Founder of LinkedIn DataHub, Apache Gobblin, and Acryl Data, one of the simplest definitions for a data catalog can be found on the Oracle website: “Simply put, a data catalog is an organized inventory of data assets in the organization. It uses metadata to help organizations manage their data. It also helps data professionals collect, organize, access, and enrich metadata to support data discovery and governance.

Another succinct description of a data catalog’s purpose comes from Alation: “a collection of metadata, combined with data management and search tools, that helps analysts and other data users to find the data that they need, serves as an inventory of available data, and provides information to evaluate the fitness of data for intended uses.

Working with many organizations in the area of Analytics, one of the more common requests I receive regards choosing and implementing a data catalog. Organizations have datasources hosted in corporate data centers, on AWS, by SaaS providers, and with other Cloud Service Providers. Several of these organizations have recently gravitated to DataHub, the open-source metadata platform for the modern data stack, originally developed by LinkedIn.

View of DataHub’s home screen showing a variety of datasources

In this post, we will explore the capabilities of DataHub to build a centralized data catalog on AWS for datasources hosted in multiple AWS accounts, SaaS providers, cloud service providers, and corporate data centers. I will demonstrate how to build a DataHub data catalog using out-of-the-box data source plugins for automated metadata ingestion.

Another example of searching for cataloged entities in DataHub’s browser-based UI

Data Catalog Competitors

Data catalogs are not new; technologies such as data dictionaries have been around as far back as the 1980’s. Gartner publishes their Metadata Management (EMM) Solutions Reviews and Ratings and Metadata Management Magic Quadrant. These reports contain a comprehensive list of traditional commercial enterprise players, modern cloud-native SaaS vendors, and Cloud Service Provider (CSP) offerings. DBMS Tools also hosts a comprehensive list of 30 data catalogs. A sampling of current data catalogs includes:

Open Source Software

Commercial

Cloud Service Providers

Data Catalog Features

DataHub describes itself as “a modern data catalog built to enable end-to-end data discovery, data observability, and data governance.” Sorting through vendor’s marketing jargon and hype, standard features of leading data catalogs include:

  • Metadata ingestion
  • Data discovery
  • Data governance
  • Data observability
  • Data lineage
  • Data dictionary
  • Data classification
  • Usage/popularity statistics
  • Sensitive data handling
  • Data fitness (aka data quality or data profiling)
  • Manage both technical and business metadata
  • Business glossary
  • Tagging
  • Natively supported datasource integrations
  • Advanced metadata search
  • Fine-grain authentication and authorization
  • UI- and API-based interaction

Datasources

When considering a data catalog solution, in my experience, the most common datasources that customers want to discover, inventory, and search include:

  • Relational databases and other OLTP datasources such as PostgreSQL, MySQL, Microsoft SQL Server, and Oracle
  • Cloud Data Warehouses and other OLAP datasources such as Amazon Redshift, Snowflake, and Google BigQuery
  • NoSQL datasources such as MongoDB, MongoDB Atlas, and Azure Cosmos DB
  • Persistent event-streaming platforms such as Apache Kafka (Amazon MSK and Confluent)
  • Distributed storage datasets (e.g., Data Lakes) such as Amazon S3, Apache Hive, and AWS Glue Data Catalogs
  • Business Intelligence (BI), dashboards, and data visualization sources such as Looker, Tableau, and Microsoft Power BI
  • ETL sources, such as Apache Spark, Apache Airflow, Apache NiFi, and dbt

DataHub on AWS

DataHub’s convenient AWS setup guide covers options to deploy DataHub to AWS. For this post, I have hosted DataHub on Kubernetes, using Amazon Elastic Kubernetes Service (Amazon EKS). Alternately, you could choose Google Kubernetes Engine (GKE) on Google Cloud or Azure Kubernetes Service (AKS) on Microsoft Azure.

Conveniently, DataHub offers a Helm chart, making deployment to Kubernetes straightforward. Furthermore, Helm charts are easily integrated with popular CI/CD tools. For this post, I’ve used ArgoCD, the declarative GitOps continuous delivery tool for Kubernetes, to deploy the DataHub Helm charts to Amazon EKS.

ArgoCD UI showing DataHub and its dependencies deployed to Amazon EKS

According to the documentation, DataHub consists of four main components: GMS, MAE Consumer (optional), MCE Consumer (optional), and Frontend. Kubernetes deployment for each of the components is defined as sub-charts under the main DataHub Helm chart.

External Storage Layer Dependencies

Four external storage layer dependencies power the main DataHub components: Kafka, Local DB (MySQL, Postgres, or MariaDB), Search Index (Elasticsearch), and Graph Index (Neo4j or Elasticsearch). DataHub has provided a separate DataHub Prerequisites Helm chart for the dependencies. The dependencies must be deployed before deploying DataHub.

Alternately, you can substitute AWS managed services for the external storage layer dependencies, which is also detailed in the Deploying to AWS documentation. AWS managed service dependency substitutions include Amazon RDS for MySQL, Amazon OpenSearch (fka Amazon Elasticsearch), and Amazon Managed Streaming for Apache Kafka (Amazon MSK). According to DataHub, support for using AWS Neptune as the Graph Index is coming soon.

DataHub CLI and Plug-ins

DataHub comes with the datahub CLI, allowing you to perform many common operations on the command line. You can install and use the DataHub CLI within your development environment or integrate it with your CI/CD tooling.

Available DataHub CLI commands

DataHub uses a plugin architecture. Plugins allow you to install only the datasource dependencies you need. For example, if you want to ingest metadata from Amazon Athena, just install the Athena plugin: pip install 'acryl-datahub[athena]'. DataHub Source, Sink, and Transformer plugins can be displayed using the datahub check plugins CLI command.

Example list of DataHub Source plugins installed
Example list of DataHub Sink and Transformer plugins installed

Secure Metadata Ingestion

Often, datasources are not externally accessible for security reasons. Further, many datasources may not be accessible to individual users, especially in higher environments like UAT, Staging, and Production. They are only accessible to applications or CI/CD tooling. To overcome these limitations when extracting metadata with DataHub, I prefer to perform my DataHub-related development and testing locally but execute all DataHub ingestion securely on AWS.

In my local development environment, I use JetBrains PyCharm to author the Python and YAML-based DataHub configuration files and ingestion pipeline recipes, then commit those files to git and push them to a private GitHub repository. Finally, I use GitHub Actions to test DataHub files.

To run DataHub ingestion jobs and push the results to DataHub running in Kubernetes on Amazon EKS, I have built a custom Python-based Docker container. The container runs the DataHub CLI, required DataHub plugins, and any additional Python dependencies. The container’s pod has the appropriate AWS IAM permissions, using IAM Roles for Service Accounts (IRSA), to securely access datasources to ingest and the DataHub application.

Schedule and Monitor Pipelines

Scheduling and managing multiple metadata ingestion jobs on AWS is best handled with Apache Airflow with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Ingestion jobs run as Airflow DAG tasks, which call the EKS-based DataHub CLI container. With MWAA, datasource connections, credentials, and other sensitive configurations can be kept secure and not be exposed externally or in plain text.

When running the ingestion pipelines on AWS with DataHub, all communications between AWS-based datasources, ingestion jobs running in Airflow, and DataHub, should use secure private IP addressing and DNS resolution instead of transferring metadata over the Internet. Make sure to create all the necessary VPC peering connections, network route table configurations, and VPC endpoints to connect all relevant services.

SaaS services such as Snowflake or MongoDB Atlas, services provided by other Cloud Service Providers such as Google Cloud and Microsoft Azure, and datasources in corporate datasources require alternate networking and security strategies to access metadata securely.

AWS-based DataHub high-level architecture

Markup or Code?

According to the documentation, a DataHub recipe is a configuration file that tells ingestion scripts where to pull data from (source) and where to put it (sink). Recipes normally contain a source, sink, and transformers configuration section. Mark-up language-based job automation written in YAML, JSON, or Domain Specific Languages (DSLs) is often an alternative to writing code. DataHub recipes can be written in YAML. The example recipe shown below is used to ingest metadata from an Amazon RDS for PostgreSQL database, running on AWS.

YAML-based recipes can also use automatic environment variable expansion for convenience, automation, and security. It is considered best practice to secure sensitive configuration values, such as database credentials, in a secure location and reference them as environment variables. For example, note the server: ${DATAHUB_REST_ENDPOINT} entry in the sink section below. The DATAHUB_REST_ENDPOINT environment variable is set ahead of time and re-used for all ingestion jobs. Sensitive database connection information has also been variablized and stored separately.

# Purpose: DataHub example recipe for PostgreSQL datasource
# Author: Gary A. Stafford
# Date: March 2022
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/postgres
source:
type: postgres
config:
# Coordinates
host_port: ${DB_HOST_PORT}
database: tickit
# Credentials
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
# Options
profiling:
enabled: true
# Environment
env: DEV
# see https://datahubproject.io/docs/metadata-ingestion/transformers/#adding-a-set-of-tags
transformers:
type: "simple_add_dataset_tags"
config:
tag_urns:
"urn:li:tag:AWS"
"urn:li:tag:${ACCOUNT_ID}"
"urn:li:tag:us-east-1"
type: "pattern_add_dataset_terms"
config:
term_pattern:
rules:
".*users.*": ["urn:li:glossaryTerm:Classification.Sensitive"]
type: "simple_add_dataset_ownership"
config:
owner_urns:
"urn:li:corpuser:Database Administrators"
ownership_type: "DATAOWNER"
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
config:
server: ${DATAHUB_REST_ENDPOINT}
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/reporting_telemetry/
pipeline_name: "postgres-pipeline-tickit"
reporting:
type: "datahub"
config:
datahub_api:
server: ${DATAHUB_REST_ENDPOINT}

Using Python

You can configure and run a pipeline entirely from within a custom Python script using DataHub’s Python API as an alternative to YAML. Below, we see two nearly identical ingestion recipes to the YAML above, written in Python. Writing ingestion pipeline logic programmatically gives you increased flexibility for automation, error checking, unit-testing, and notification. Below is a basic pipeline written in Python. The code is functional, but not very Pythonic, secure, scalable, or Production ready.

# Purpose: Simple programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
# Reference: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/library/programatic_pipeline.py
from datahub.ingestion.run.pipeline import Pipeline
# The pipeline configuration is similar to the recipe YAML files provided to the CLI tool.
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": "demo-instance.abcd1234.us-east-1.rds.amazonaws.com:5432",
"database": "tickit",
"username": "datahub",
"password": "My5up3r53cr3tPa55w0rd",
"env": "DEV",
"profiling": {
"enabled": "true"
}
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:AWS",
f"urn:li:tag:111222333444",
f"urn:li:tag:us-east-1"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:Database Administrators"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://192.168.111.222:33333"
}
}
}
)
# Run the pipeline and report the results.
pipeline.run()
pipeline.pretty_print_summary()

The second version of the same pipeline is more Production ready. The code is more Pythonic in nature and makes use of error checking, logging, and the AWS Systems Manager (SSM) Parameter Store. Like recipes written in YAML, environment variables can be used for convenience and security. In this example, commonly reused and sensitive connection configuration items have been extracted and placed in the SSM Parameter Store. Additional configuration is pulled from the environment, such as AWS Account ID and AWS Region. The script loads these values at runtime.

# Purpose: Programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
import json
import logging
import boto3
from botocore.exceptions import ClientError
from datahub.ingestion.run.pipeline import Pipeline
logging.basicConfig(
format="[%(asctime)s] %(levelname)s – %(message)s", level=logging.INFO
)
def main():
sts_client = boto3.client("sts")
params = get_parameters()
params["owner"] = "Database Administrators"
params["environment"] = "DEV"
params["database"] = "tickit"
params["region"] = sts_client.meta.region_name
params["account"] = sts_client.get_caller_identity()["Account"]
logging.info(f"Params: {json.dumps(params, indent=4, sort_keys=True)}")
ingestion_pipeline = create_pipeline(params)
run_pipeline(ingestion_pipeline)
def create_pipeline(params) -> Pipeline:
"""Constructs a Pipeline for a PostgreSQL Source and a DataHub Sink
:return: instance of datahub.ingestion.run.pipeline
"""
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": params.get("/datahub_demo/postgres_host_port_tickit"),
"database": params.get("database"),
"username": params.get("/datahub_demo/postgres_username_tickit"),
"password": params.get("/datahub_demo/postgres_password_tickit"),
"profiling": {
"enabled": "true"
},
"env": params.get("environment"),
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:{params.get('account')}",
f"urn:li:tag:{params.get('region')}"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:{params.get('owner')}"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": params.get("/datahub_demo/datahub_rest_endpoint_public")
}
}
}
)
return pipeline
def run_pipeline(pipeline):
"""Runs the ingestion pipeline and prints summary of the results
:param pipeline: instance of datahub.ingestion.run.pipeline
:return:
"""
pipeline.run()
pipeline.pretty_print_summary()
def get_parameters() -> dict:
"""
Load parameter values from AWS Systems Manager (SSM) Parameter Store
:return: dict of parameter k/v's
"""
ssm_client = boto3.client("ssm")
params: dict = {}
try:
# make a single SSM API call for all parameters
response = ssm_client.get_parameters_by_path(
Path="/datahub_demo"
)
# create a dictionary of parameter k/v's
for param in response.get("Parameters"):
params[param["Name"]] = param["Value"]
logging.debug(f"Params: {params}")
except ClientError as e:
logging.error(e)
exit(1)
return params
if __name__ == '__main__':
main()

Sinking to DataHub

When syncing metadata to DataHub, you have two choices, the GMS REST API or Kafka. According to DataHub, the advantage of the REST-based interface is that any errors can immediately be reported. On the other hand, the advantage of the Kafka-based interface is that it is asynchronous and can handle higher throughput. For this post, I am DataHub’s REST API.

DataHub ingestion pipeline results for a Microsoft SQL Server datasource
Another example of a DataHub ingestion pipeline results for a Google BigQuery datasource

Column-level Metadata

In addition to column names and data types, it is possible to extract column descriptions and key types from certain datasources. Column descriptions, tags, and glossary terms can also be input through the DataHub UI. Below, we see an example of an Amazon Redshift fact table, whose table and column descriptions were ingested as part of the metadata.

Amazon Redshift fact table showing column-level metadata, tags, owners, and documentation

Business Glossary

DataHub can assign business glossary terms to entities. The DataHub Business Glossary plugin pulls business glossary metadata from a YAML-based configuration file.

# see sample: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/bootstrap_data/business_glossary.yml
version: 1
source: DataHub
owners:
users:
datahub
url: "https://github.com/datahub-project/datahub/"
nodes:
name: Classification
description: A set of terms related to Data Classification
terms:
name: Sensitive
description: Sensitive Data
custom_properties:
is_confidential: false
name: Confidential
description: Confidential Data
custom_properties:
is_confidential: true
name: HighlyConfidential
description: Highly Confidential Data
custom_properties:
is_confidential: true
name: PersonalInformation
description: All terms related to personal information
owners:
users:
datahub
terms:
name: ID
description: An individual's unqiue identifier
inherits:
Classification.Sensitive
name: Name
description: An individual's Name
inherits:
Classification.Sensitive
name: SSN
description: An individual's SSN
inherits:
Classification.Confidential
name: DriverLicense
description: An individual's Driver License ID
inherits:
Classification.Confidential
name: Email
description: An individual's email address
inherits:
Classification.Confidential
name: Address
description: A physical address
name: Gender
description: The gender identity of the individual
inherits:
Classification.Sensitive

Business glossary terms can be reviewed in the Glossary Terms tab of the DataHub’s UI. Below, we see the three terms associated with the Classification glossary node: Confidential, HighlyConfidential, and Sensitive.

Example of a related set of terms in DataHub’s Business Glossary

We can search for entities inventoried in DataHub using their assigned business glossary terms.

Dataset search results based on a term in DataHub’s Business Glossary

Finally, we see an example of an AWS Athena data catalog table with business glossary terms applied to columns within the table’s schema.

AWS Athena table showing column-level descriptions, glossary terms, tags, owners, and documentation

SQL-based Profiler

DataHub also can extract statistics about entities in DataHub using the SQL-based Profiler. According to the DataHub documentation, the Profiler can extract the following:

  • Row and column counts for each table
  • Column null counts and proportions
  • Column distinct counts and proportions
  • Column min, max, mean, median, standard deviation, quantile values
  • Column histograms or frequencies of unique values

In addition, we can also track the historical stats for each profiled entity each time metadata is ingested.

Amazon Redshift fact table showing SQL-based profiler column-level statistics
Another example, a Google BigQuery table showing SQL-based profiler column-level statistics

Data Lineage

DataHub’s data lineage features allow us to view upstream and downstream relationships between different types of entities. DataHub can trace lineage across multiple platforms, datasets, pipelines, charts, and dashboards.

Below, we see a simple example of dataset entity-to-entity lineage in Amazon Redshift and then Apache Spark on Amazon EMR. The fact table has a downstream relationship to four database views. The views are based on SQL queries that include the upstream table as a datasource.

Visual lineage view of Amazon Redshift fact table and its four downstream view dependencies
Another visual lineage example of an Apache Spark job with Apache Hive tables as both the source and sink

DataHub Analytics

DataHub provides basic metadata quality and usage analytics in the DataHub UI: user activity, counts of datasource types, business glossary terms, environments, and actions.

Examples of DataHub’s metadata quality and usage analytics capabilities
More examples of DataHub’s metadata quality and usage analytics capabilities

Conclusion

In this post, we explored the features of a data catalog and learned about some of the leading commercial and open-source data catalogs. Next, we learned how DataHub could collect, organize, enrich, and search metadata across multiple datasources. Lastly, we discovered how easy it is to catalog metadata from datasources spread across multiple CSP, SaaS providers, and corporate data centers, and centralize those results in DataHub.

In addition to the basic features reviewed in this post, DataHub offers a growing number of additional capabilities, including GraphQL and Timeline APIs, robust authentication and authorization, application monitoring observability, and Great Expectations integration. All these qualities make DataHub an excellent choice for a data catalog.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Data Preparation on AWS: Comparing Available ELT Options to Cleanse and Normalize Data

Comparing the features and performance of different AWS analytics services for Extract, Load, Transform (ELT)

Introduction

According to Wikipedia, “Extract, load, transform (ELT) is an alternative to extract, transform, load (ETL) used with data lake implementations. In contrast to ETL, in ELT models the data is not transformed on entry to the data lake but stored in its original raw format. This enables faster loading times. However, ELT requires sufficient processing power within the data processing engine to carry out the transformation on demand, to return the results in a timely manner.

As capital investments and customer demand continue to drive the growth of the cloud-based analytics market, the choice of tools seems endless, and that can be a problem. Customers face a constant barrage of commercial and open-source tools for their batch, streaming, and interactive exploratory data analytics needs. The major Cloud Service Providers (CSPs) have even grown to a point where they now offer multiple services to accomplish similar analytics tasks.

This post will examine the choice of analytics services available on AWS capable of performing ELT. Specifically, this post will compare the features and performance of AWS Glue Studio, Amazon Glue DataBrew, Amazon Athena, and Amazon EMR using multiple ELT use cases and service configurations.

Data pipeline architecture showing a choice of AWS ELT services

Analytics Use Case

We will address a simple yet common analytics challenge for this comparison — preparing a nightly data feed for analysis the next day. Each night a batch of approximately 1.2 GB of raw CSV-format healthcare data will be exported from a Patient Administration System (PAS) and uploaded to Amazon S3. The data must be cleansed, deduplicated, refined, normalized, and made available to the Data Science team the following morning. The team of Data Scientists will perform complex data analytics on the data and build machine learning models designed for early disease detection and prevention.

Sample Dataset

The dataset used for this comparison is generated by Synthea, an open-source patient population simulation. The high-quality, synthetic, realistic patient data and associated health records cover every aspect of healthcare. The dataset contains the patient-related healthcare history for allergies, care plans, conditions, devices, encounters, imaging studies, immunizations, medications, observations, organizations, patients, payers, procedures, providers, and supplies.

The Synthea dataset was first introduced in my March 2021 post examining the handling of sensitive PII data using Amazon Macie: Data Lakes: Discovery, Security, and Privacy of Sensitive Data.

The Synthea synthetic patient data is available in different record volumes and various data formats, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Since this post seeks to measure the performance of different AWS ELT-capable services, we will use a larger version of the Synthea dataset containing hundreds of thousands to millions of records.

AWS Glue Data Catalog

The dataset comprises nine uncompressed CSV files uploaded to Amazon S3 and cataloged to an AWS Glue Data Catalog, a persistent metadata store, using an AWS Glue Crawler.

Raw Synthea CSV data, in S3, cataloged in AWS Glue Data Catalog

Test Cases

We will use three data preparation test cases based on the Synthea dataset to examine the different AWS ELT-capable services.

Specifications for three different test cases

Test Case 1: Encounters for Symptom

An encounter is a health care contact between the patient and the provider responsible for diagnosing and treating the patient. In our first test case, we will process 1.26M encounters records for an ongoing study of patient symptoms by our Data Science team.

id date patient code description reasoncode reasondescription
714fd61a-f9fd-43ff-87b9-3cc45a3f1e53 2014-01-09 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
23e07532-8b96-4d05-b14e-d4c5a5288ed2 2014-08-18 33f33990-ae8b-4be8-938f-e47ad473abfe 185349003 Outpatient Encounter
45044100-aaba-4209-8ad1-15383c76842d 2015-07-12 33f33990-ae8b-4be8-938f-e47ad473abfe 185345009 Encounter for symptom 36971009 Sinusitis (disorder)
ffdddbfb-35e8-4a74-a801-89e97feed2f3 2014-08-12 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 444814009 Viral sinusitis (disorder)
352d1693-591a-4615-9b1b-f145648f49cc 2016-05-25 36d131ee-dd5b-4acb-acbe-19961c32c099 185349003 Outpatient Encounter
4620bd2f-8010-46a9-82ab-8f25eb621c37 2016-10-07 36d131ee-dd5b-4acb-acbe-19961c32c099 185345009 Encounter for symptom 195662009 Acute viral pharyngitis (disorder)
815494d8-2570-4918-a8de-fd4000d8100f 2010-08-02 660bec03-9e58-47f2-98b9-2f1c564f3838 698314001 Consultation for treatment
67ec5c2d-f41e-4538-adbe-8c06c71ddc35 2010-11-22 660bec03-9e58-47f2-98b9-2f1c564f3838 170258001 Outpatient Encounter
dbe481ce-b961-4f43-ac0a-07fa8cfa8bdd 2012-11-21 660bec03-9e58-47f2-98b9-2f1c564f3838 50849002 Emergency room admission
b5f1ab7e-5e67-4070-bcf0-52451eb20551 2013-12-04 660bec03-9e58-47f2-98b9-2f1c564f3838 185345009 Encounter for symptom 10509002 Acute bronchitis (disorder)
view raw encounters.csv hosted with ❤ by GitHub
Sample of raw encounters data

Data preparation includes the following steps:

  1. Load 1.26M encounter records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Select only the records where the description column contains “Encounter for symptom.”
  4. Remove any rows with an empty reasoncodes column.
  5. Extract a new year, month, and day column from the date column.
  6. Remove the date column.
  7. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  8. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  9. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 2: Observations

Clinical observations ensure that treatment plans are up-to-date and correctly administered and allow healthcare staff to carry out timely and regular bedside assessments. We will process 5.38M encounters records for our Data Science team in our second test case.

date patient encounter code description value units
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8302-2 Body Height 175.76 cm
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 29463-7 Body Weight 56.51 kg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 39156-5 Body Mass Index 18.29 kg/m2
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8480-6 Systolic Blood Pressure 119.0 mmHg
2011-07-02 33f33990-ae8b-4be8-938f-e47ad473abfe 673daa98-67e9-4e80-be46-a0b547533653 8462-4 Diastolic Blood Pressure 77.0 mmHg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8302-2 Body Height 177.25 cm
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 29463-7 Body Weight 59.87 kg
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 39156-5 Body Mass Index 19.05 kg/m2
2012-06-17 33f33990-ae8b-4be8-938f-e47ad473abfe be0aa510-645e-421b-ad21-8a1ab442ca48 8480-6 Systolic Blood Pressure 113.0 mmHg
2012-03-26 36d131ee-dd5b-4acb-acbe-19961c32c099 296a1fd4-56de-451c-a5fe-b50f9a18472d 8302-2 Body Height 174.17 cm
Sample of raw observations data

Data preparation includes the following steps:

  1. Load 5.38M observation records using the existing AWS Glue Data Catalog table.
  2. Remove any duplicate records.
  3. Extract a new year, month, and day column from the date column.
  4. Remove the date column.
  5. Write resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet files, partitioned by year, month, and day.
  6. Given the small resultset, bucket the data such that only one file is written per day partition to minimize the impact of too many small files on future query performance.
  7. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog, including partitions.

Test Case 3: Sinusitis Study

A medical condition is a broad term that includes all diseases, lesions, and disorders. In our second test case, we will join the conditions records with the patient records and filter for any condition containing the term ‘sinusitis’ in preparation for our Data Science team.

start stop patient encounter code description
2012-09-05 2012-10-16 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 05a6ef43-d690-455e-ab2f-1ea19d902274 44465007 Sprain of ankle
2014-09-08 2014-09-28 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 1cdcbe46-caaf-4b3f-b58c-9ca9ccb13013 283371005 Laceration of forearm
2014-11-28 2014-12-13 bc33b032-8e41-4d16-bc7e-00b674b6b9f8 b222e257-98da-4a1b-a46c-45d5ad01bbdc 195662009 Acute viral pharyngitis (disorder)
1980-01-09 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 40055000 Chronic sinusitis (disorder)
1989-06-25 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 201834006 Localized primary osteoarthritis of the hand
1996-01-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 ffbd4177-280a-4a08-a1af-9770a06b5146 196416002 Impacted molars
2016-02-07 01858c8d-f81c-4a95-ab4f-bd79fb62b284 748cda45-c267-46b2-b00d-3b405a44094e 15777000 Prediabetes
2016-04-27 2016-05-20 01858c8d-f81c-4a95-ab4f-bd79fb62b284 a64734f1-5b21-4a59-b2e8-ebfdb9058f8b 444814009 Viral sinusitis (disorder)
2014-02-06 2014-02-19 d32e9ad2-4ea1-4bb9-925d-c00fe85851ae c64d3637-8922-4531-bba5-f3051ece6354 43878008 Streptococcal sore throat (disorder)
1982-05-18 08858d24-52f2-41dd-9fe9-cbf1f77b28b2 3fff3d52-a769-475f-b01b-12622f4fee17 368581000119106 Neuropathy due to type 2 diabetes mellitus (disorder)
view raw conditions.csv hosted with ❤ by GitHub
Sample of raw conditions data

Data preparation includes the following steps:

  1. Load 483k condition records using the existing AWS Glue Data Catalog table.
  2. Inner join the condition records with the 132k patient records based on patient ID.
  3. Remove any duplicate records.
  4. Drop approximately 15 unneeded columns.
  5. Select only the records where the description column contains the term “sinusitis.”
  6. Remove any rows with empty ethnicity, race, gender, or marital columns.
  7. Create a new column, condition_age, based on a calculation of the age in days at which the patient’s condition was diagnosed.
  8. Write the resulting dataset back to Amazon S3 as Snappy-compressed Apache Parquet-format files. No partitions are necessary.
  9. Given the small resultset, bucket the data such that only one file is written to minimize the impact of too many small files on future query performance.
  10. Catalog resulting dataset to a new table in the existing AWS Glue Data Catalog.

AWS ELT Options

There are numerous options on AWS to handle the batch transformation use case described above; a non-exhaustive list includes:

  1. AWS Glue Studio (UI-driven with AWS Glue PySpark Extensions)
  2. Amazon Glue DataBrew
  3. Amazon Athena
  4. Amazon EMR with Apache Spark
  5. AWS Glue Studio (Apache Spark script)
  6. AWS Glue Jobs (Legacy jobs)
  7. Amazon EMR with Presto
  8. Amazon EMR with Trino
  9. Amazon EMR with Hive
  10. AWS Step Functions and AWS Lambda
  11. Amazon Redshift Spectrum
  12. Partner solutions on AWS, such as Databricks, Snowflake, Upsolver, StreamSets, Stitch, and Fivetran
  13. Self-managed custom solutions using a combination of OSS, such as dbt, Airbyte, Dagster, Meltano, Apache NiFi, Apache Drill, Apache Beam, Pandas, Apache Airflow, and Kubernetes

For this comparison, we will choose the first five options listed above to develop our ELT data preparation pipelines: AWS Glue Studio (UI-driven job creation with AWS Glue PySpark Extensions), Amazon Glue DataBrew, Amazon Athena, Amazon EMR with Apache Spark, and AWS Glue Studio (Apache Spark script).

Data pipeline architecture showing a choice of AWS ELT services

AWS Glue Studio

According to the documentation, “AWS Glue Studio is a new graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. You can visually compose data transformation workflows and seamlessly run them on AWS Glue’s Apache Spark-based serverless ETL engine. You can inspect the schema and data results in each step of the job.

AWS Glue Studio’s visual job creation capability uses the AWS Glue PySpark Extensions, an extension of the PySpark Python dialect for scripting ETL jobs. The extensions provide easier integration with AWS Glue Data Catalog and other AWS-managed data services. As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Spark scripts with AWS Glue Studio. In fact, we can use the exact same scripts run on Amazon EMR.

For the tests, we are using the G.2X worker type, Glue version 3.0 (Spark 3.1.1 and Python 3.7), and Python as the language choice for this comparison. We will test three worker configurations using both UI-driven job creation with AWS Glue PySpark Extensions and Apache Spark script options:

  • 10 workers with a maximum of 20 DPUs
  • 20 workers with a maximum of 40 DPUs
  • 40 workers with a maximum of 80 DPUs
AWS Glue Studio visual job creation UI for Test Case 3: Sinusitis Study

AWS Glue Studio Spark job details for Test Case 2: Observations

AWS Glue Studio job runs for Test Case 2: Observations

AWS Glue DataBrew

According to the documentation, “AWS Glue DataBrew is a visual data preparation tool that enables users to clean and normalize data without writing any code. Using DataBrew helps reduce the time it takes to prepare data for analytics and machine learning (ML) by up to 80 percent, compared to custom-developed data preparation. You can choose from over 250 ready-made transformations to automate data preparation tasks, such as filtering anomalies, converting data to standard formats, and correcting invalid values.

DataBrew allows you to set the maximum number of DataBrew nodes that can be allocated when a job runs. For this comparison, we will test three different node configurations:

  • 3 maximum nodes
  • 10 maximum nodes
  • 20 maximum nodes
AWS Glue DataBrew Project for Test Case 3: Sinusitis Study

AWS Glue DataBrew Recipe for Test Case 1: Encounters for Symptom

AWS Glue DataBrew recipe job runs for Test Case 1: Encounters for Symptom

Amazon Athena

According to the documentation, “Athena helps you analyze unstructured, semi-structured, and structured data stored in Amazon S3. Examples include CSV, JSON, or columnar data formats such as Apache Parquet and Apache ORC. You can use Athena to run ad-hoc queries using ANSI SQL, without the need to aggregate or load the data into Athena.

Although Athena is classified as an ad-hoc query engine, using a CREATE TABLE AS SELECT (CTAS) query, we can create a new table in the AWS Glue Data Catalog and write to Amazon S3 from the results of a SELECT statement from another query. That other query statement performs a transformation on the data using SQL.

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 2: Observations

Purpose: Process data for sinusitis study using Amazon Athena
Author: Gary A. Stafford (January 2022)
CREATE TABLE "sinusitis_athena" WITH (
format = 'Parquet',
write_compression = 'SNAPPY',
external_location = 's3://databrew-demo-111222333444-us-east-1/sinusitis_athena/',
bucketed_by = ARRAY['patient'],
bucket_count = 1
) AS
SELECT DISTINCT
patient,
code,
description,
date_diff(
'day',
date(substr(birthdate, 1, 10)),
date(substr(start, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions AS c,
patients AS p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%'
ORDER BY patient, code;
CTAS query for Test Case 3: Sinusitis Study

Amazon Athena is a fully managed AWS service and has no performance settings to adjust or monitor.

Amazon Athena CTAS statement for Test Case 1: Encounters for Symptom

Parquet data partitioned by year in Amazon S3 for Test Case 1: Encounters for Symptom, using Athena

CTAS and Partitions

A notable limitation of Amazon Athena for the batch use case is the 100 partition limit with CTAS queries. Athena [only] supports writing to 100 unique partition and bucket combinations with CTAS. Partitioned by year, month, and day, the observations test case requires 2,558 partitions, and the observations test case requires 10,433 partitions. There is a recommended workaround using an INSERT INTO statement. However, the workaround requires additional SQL logic, computation, and most important cost. It is not practical, in my opinion, compared to other methods when a higher number of partitions are needed. To avoid the partition limit with CTAS, we will only partition by year and bucket by month when using Athena. Take this limitation into account when comparing the final results.

Amazon EMR with Apache Spark

According to the documentation, “Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. You can quickly and easily create managed Spark clusters from the AWS Management Console, AWS CLI, or the Amazon EMR API.

For this comparison, we are using two different Spark 3.1.2 EMR clusters:

  • (1) r5.xlarge Master node and (2) r5.2xlarge Core nodes
  • (1) r5.2xlarge Master node and (4) r5.2xlarge Core nodes

All Spark jobs are written in both Python (PySpark) and Scala. We are using the AWS Glue Data Catalog as the metastore for Spark SQL instead of Apache Hive.

4-node Amazon EMR cluster shown in Amazon EMR Management Console

Completed EMR Steps (Spark Jobs) on 4-node Amazon EMR cluster

# Purpose: Process data for sinusitis study using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "sinusitis_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
patient,
code,
description,
datediff(
date(substr(start, 1, 10)),
date(substr(birthdate, 1, 10))
) as condition_age,
marital,
race,
ethnicity,
gender
FROM conditions as c, patients as p
WHERE c.patient = p.id
AND gender <> ''
AND ethnicity <> ''
AND race <> ''
AND marital <> ''
AND description LIKE '%sinusitis%';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.bucketBy(1, "patient") \
.sortBy("patient", "code") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 3: Sinusitis Study

# Purpose: Process encounters dataset using either Amazon EMR and AWS Glue with PySpark
# Author: Gary A. Stafford (January 2022)
from pyspark.sql import SparkSession
table_name = "encounter_emr_spark"
spark = SparkSession \
.builder \
.appName(table_name) \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hive.exec.dynamic.partition",
"true") \
.config("hive.exec.dynamic.partition.mode",
"nonstrict") \
.config("hive.exec.max.dynamic.partitions",
"10000") \
.config("hive.exec.max.dynamic.partitions.pernode",
"10000") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("USE synthea_patient_big_data;")
sql_query_data = """
SELECT DISTINCT
id,
patient,
code,
description,
reasoncode,
reasondescription,
year(date) as year,
month(date) as month,
day(date) as day
FROM encounters
WHERE description='Encounter for symptom';
"""
df_data = spark.sql(sql_query_data)
df_data \
.coalesce(1) \
.write \
.partitionBy("year", "month", "day") \
.bucketBy(1, "patient") \
.sortBy("patient") \
.mode("overwrite") \
.format("parquet") \
.option("path", f"s3://databrew-demo-111222333444-us-east-1/{table_name}/") \
.saveAsTable(name=table_name)
# update glue table
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('classification'='parquet');")
Amazon EMR PySpark script for Test Case 1: Encounters for Symptom

package main.spark.demo
// Purpose: Process observations dataset using Spark on Amazon EMR with Scala
// Author: Gary A. Stafford
// Date: 2022-03-06
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object Observations {
def main(args: Array[String]): Unit = {
val (spark: SparkSession, sc: SparkContext) = createSession
performELT(spark, sc)
}
private def createSession = {
val spark: SparkSession = SparkSession.builder
.appName("Observations ELT App")
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
.config("hive.exec.dynamic.partition",
"true")
.config("hive.exec.dynamic.partition.mode",
"nonstrict")
.config("hive.exec.max.dynamic.partitions",
"10000")
.config("hive.exec.max.dynamic.partitions.pernode",
"10000")
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("INFO")
(spark, sc)
}
private def performELT(spark: SparkSession, sc: SparkContext) = {
val tableName: String = sc.getConf.get("spark.executorEnv.TABLE_NAME")
val dataLakeBucket: String = sc.getConf.get("spark.executorEnv.DATA_LAKE_BUCKET")
spark.sql("USE synthea_patient_big_data;")
val sql_query_data: String =
"""
SELECT DISTINCT
patient,
encounter,
code,
description,
value,
units,
year(date) as year,
month(date) as month,
day(date) as day
FROM observations
WHERE date <> 'date';
"""
val observationsDF: DataFrame = spark
.sql(sql_query_data)
observationsDF
.coalesce(1)
.write
.partitionBy("year", "month", "day")
.bucketBy(1, "patient")
.sortBy("patient")
.mode("overwrite")
.format("parquet")
.option("path", s"s3://${dataLakeBucket}/${tableName}/")
.saveAsTable(tableName = tableName)
spark.sql(s"ALTER TABLE ${tableName} SET TBLPROPERTIES ('classification'='parquet');")
}
}
Spark jobs written in Scala had nearly identical execution times, such as Test Case 2: Observations

Partitions in the AWS Glue Data Catalog table for Test Case 1: Encounters for Symptom

Results

Data pipelines were developed and tested for each of the three test cases using the five chosen AWS ELT services and configuration variations. Each pipeline was then run 3–5 times, for a total of approximately 150 runs. The resulting AWS Glue Data Catalog table and data in Amazon S3 were deleted between each pipeline run. Each new run created a new data catalog table and wrote new results to Amazon S3. The median execution times from these tests are shown below.

Number of raw and processed records for each test case

Overall results (see details below) — lower times are better

Although we can make some general observations about the execution times of the chosen AWS services, the results are not meant to be a definitive guide to performance. An accurate comparison would require a deeper understanding of how each of these managed services works under the hood, in order to both optimize and balance their compute profiles correctly.

Amazon Athena

The Resultset column contains the final number of records written to Amazon S3 by Athena. The results contain the data pipeline’s median execution time and any additional data points.

Results for Amazon Athena data pipelines

AWS Glue Studio (AWS Glue PySpark Extensions)

Tests were run with three different configurations for AWS Glue Studio using the graphical interface for creating jobs with AWS Glue PySpark Extensions. Times for each configuration were nearly identical.

Results for data pipelines using AWS Glue Studio with AWS Glue PySpark Extensions

AWS Glue Studio (Apache PySpark script)

As opposed to using the graphical interface for creating jobs with AWS Glue PySpark Extensions, you can also run your Apache Spark scripts with AWS Glue Studio. The tests were run with the same three configurations as above. The execution times compared to the Amazon EMR tests, below, are almost identical.

Results for data pipelines using PySpark scripts on AWS Glue Studio

Amazon EMR with Apache Spark

Tests were run with three different configurations for Amazon EMR with Apache Spark using PySpark. The first set of results is for the 2-node EMR cluster. The second set of results is for the 4-node cluster. The third set of results is for the same 4-node cluster in which the data was not bucketed into a single file within each partition. Compare the execution times and the number of objects against the previous set of results. Too many small files can negatively impact query performance.

Results for data pipelines using Amazon EMR with Apache Spark — times for PySpark scripts

It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had similar execution times for all three test cases.

Results for data pipelines using Amazon EMR with Apache Spark — Python vs. Scala

Amazon Glue DataBrew

Tests were run with three different configurations Amazon Glue DataBrew, including 3, 10, and 20 maximum nodes. Times for each configuration were nearly identical.

Results for data pipelines using Amazon Glue DataBrew

Observations

  1. All tested AWS services can read and write to an AWS Glue Data Catalog and the underlying datastore, Amazon S3. In addition, they all work with the most common analytics data file formats.
  2. All tested AWS services have rich APIs providing access through the AWS CLI and SDKs, which support multiple programming languages.
  3. Overall, AWS Glue Studio, using the AWS Glue PySpark Extensions, appears to be the most capable ELT tool of the five services tested and with the best performance.
  4. Both AWS Glue DataBrew and AWS Glue Studio are no-code or low-code services, democratizing access to data for non-programmers. Conversely, Amazon Athena requires knowledge of ANSI SQL, and Amazon EMR with Apache Spark requires knowledge of Scala or Python. Be cognizant of the potential trade-offs from using no-code or low-code services on observability, configuration control, and automation.
  5. Both AWS Glue DataBrew and AWS Glue Studio can write a custom Parquet writer type optimized for Dynamic Frames, GlueParquet. One potential advantage, a pre-computed schema is not required before writing.
  6. There is a slight ‘cold-start’ with Glue Studio. Studio startup times ranged from 7 seconds to 2 minutes and 4 seconds in the tests. However, the lower execution time of AWS Glue Studio compared to Amazon EMR with Spark and AWS Glue DataBrew in the tests offsets any initial cold-start time, in my opinion.
  7. Changing the maximum number of units from 3 to 10 to 20 for AWS Glue DataBrew made negligible differences in job execution times. Given the nearly identical execution times, it is unclear exactly how many units are being used by the job. More importantly, how many DataBrew node hours we are being billed for. These are some of the trade-offs with a fully-managed service — visibility and fine-tuning configuration.
  8. Similarly, with AWS Glue Studio, using either 10 workers w/ max. 20 DPUs, 20 workers w/ max. 40 DPUs, or 40 workers w/ max. 80 DPUs resulted in nearly identical executions times.
  9. Amazon Athena had the fastest execution times but is limited by the 100 partition limit for large CTAS resultsets. Athena is not practical, in my opinion, compared to other ELT methods, when a higher number of partitions are needed.
  10. It is commonly stated that “Scala is almost ten times faster than Python.” However, with Amazon EMR, jobs written in Python (PySpark) and Scala had almost identical execution times for all three test cases.
  11. Using Amazon EMR with EC2 instances takes about 9 minutes to provision a new cluster for this comparison fully. Given nearly identical execution times to AWS Glue Studio with Apache Spark scripts, Glue has the clear advantage of nearly instantaneous startup times.
  12. AWS recently announced Amazon EMR Serverless. Although this service is still in Preview, this new version of EMR could potentially reduce or eliminate the lengthy startup time for ephemeral clusters requirements.
  13. Although not discussed, scheduling the data pipelines to run each night was a requirement for our use case. AWS Glue Studio jobs and AWS Glue DataBrew jobs are schedulable from those services. For Amazon EMR and Amazon Athena, we could use Amazon Managed Workflows for Apache Airflow (MWAA), AWS Data Pipeline, or AWS Step Functions combined with Amazon CloudWatch Events Rules to schedule the data pipelines.

Conclusion

Customers have many options for ELT — the cleansing, deduplication, refinement, and normalization of raw data. We examined chosen services on AWS, each capable of handling the analytics use case presented. The best choice of tools depends on your specific ELT use case and performance requirements.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Capturing Data Analytics Workflows and System Requirements

Implement an effective, consistent, and repeatable strategy for documenting data analytics workflows and capturing system requirements

Data analytics applications involve more than just analyzing data, particularly on advanced analytics projects. Much of the required work takes place upfront, in collecting, integrating, and preparing data and then developing, testing, and revising analytical models to ensure that they produce accurate results. In addition to data scientists and other data analysts, analytics teams often include data engineers, who create data pipelines and help prepare data sets for analysis.” — TechTarget

Audio version of the post on YouTube

Introduction

Successful consultants, project managers, and product owners use well-proven and systematic approaches to achieve desired outcomes, including successful customer engagements, project results, and product and service launches. Modern data stacks and analytics workflows are increasingly complex. This technology-agnostic discovery process aims to help an organization efficiently and repeatably capture a concise record of existing analytics workflows, business and technical goals and constraints, and measures of success. If applicable, the discovery process is also used to compile and clarify requirements for new data analytics workflows.

Animation of the discovery process

Analytics Workflow Stages

There are many patterns organizations use to delineate the stages of their analytics workflows. This process utilizes six stages of a typical analytics workflow:

  1. Generate: All the ways data is generated and the systems of record where it is stored or originates from, also referred to as data ingress
  2. Collect: All the ways data is collected or ingested
  3. Prepare: All the ways data is transformed, including ETL, ELT, reverse ETL, and ML
  4. Store: All the ways data is stored, organized, and secured for analytics purposes
  5. Analyze: All the ways data is analyzed
  6. Deliver: All the ways data is delivered and how it is consumed, also referred to as data egress or data products

The precise nomenclature is not critical to this process as long as all major functionality is considered.

The Process

The discovery process starts by working backward. It first identifies existing goals and desired outcomes. It then identifies existing and anticipated future constraints. Next, it breaks down the current analytics workflows, examining the four stages of collect, prepare, store, and analyze, the steps required to get from data sources to deliverables. Finally, it captures the inputs and the outputs for the workflows and the data producers and consumers.

Collect, prepare, store, and analyze — the steps required to get from data sources to deliverables.

Specifically, the process identifies and documents the following:

  1. Business and technical goals and desired outcomes
  2. Business and technical constraints also referred to as limitations or restrictions
  3. Analytics workflows: tools, techniques, procedures, and organizational structure
  4. Outputs also referred to as deliverables, required to achieve desired outcomes
  5. Inputs, also referred to as data sources, required to achieve desired outcomes
  6. Data producers and consumers
  7. Measures of success
  8. Recommended next steps

Outcomes

Capture business and technical goals and desired outcomes driving the necessity to rearchitect current analytics processes. For example:

  1. Re-architect analytics processes to modernize, reduce complexity, or add new capabilities
  2. Reduce or control costs
  3. Increase performance, scalability, speed
  4. Migrate on-premises workloads, workflows, processes to the Cloud
  5. Migrate from one cloud provider or SaaS provider to another
  6. Move away from proprietary software products to open source software (OSS) or commercial open source software (COSS)
  7. Migrate away from custom-built software to commercial off-the-shelf (COTS), OSS, or COSS solutions
  8. Integrate DevOps, GitOps, DataOps, or MLOps practices
  9. Integrate on-premises, multi-cloud, and SaaS-based hybrid architectures
  10. Develop new analytics product or service offerings
  11. Standardize analytics processes
  12. Leverage the data for AI and ML purposes
  13. Provide stakeholders with a real-time business KPIs dashboard
  14. Construct a data lake, data warehouse, data lakehouse, or data mesh

If migration is involved, review the 6 R’s of Cloud Migration: Rehost, Replatform, Repurchase, Refactor, Retain, or Retire.

Constraints

Identify the existing and potential future business and technical constraints that impact analytics workflows. For example:

  1. Budgets
  2. Cost attribution
  3. Timelines
  4. Access to skilled resources
  5. Internal and external regulatory requirements, such as HIPAA, SOC2, FedRAMP, GDPR, PCI DSS, CCPA, and FISMA
  6. Business Continuity and Disaster Recovery (BCDR) requirements
  7. Architecture Review Board (ARB), Center of Excellence (CoE), Change Advisory Board (CAB), and Release Management standards and guidelines
  8. Data residency and data sovereignty requirements
  9. Security policies
  10. Service Level Agreements (SLAs); see ‘Measures of Success’ section
  11. Existing vendor, partner, cloud-provider, and SaaS relationships
  12. Existing licensing and contractual obligations
  13. Deprecated code dependencies and other technical debt
  14. Must-keep aspects of existing processes
  15. Build versus buy propensity
  16. Proprietary versus open source software propensity
  17. Insourcing versus outsourcing propensity
  18. Managed, hosted, SaaS versus self-managed software propensity

Analytics Workflows

Capture analytics workflows using the four stages of collect, prepare, store, and analyze, as a way to organize the discussion:

  1. High- and low-level architecture, process flow diagrams, sequence diagrams
  2. Recent architectural assessments such as reviews based on the AWS Data Analytics Lens, AWS Well-Architected Framework, Microsoft Azure Well-Architected Framework, or Google Cloud Architecture Framework
  3. Analytics tools, including hardware and commercial, custom, and open-source software
  4. Security policies, processes, standards, and technologies
  5. Observability, logging, monitoring, alerting, and notification
  6. Teams, including roles, responsibilities, and skillsets
  7. Partners, including consultants, vendors, SaaS providers, and Managed Service Providers (MSP)
  8. SDLC environments, such as Local, Sandbox, Development, Testing, Staging, Production, and Disaster Recovery (DR)
  9. Business Continuity Planning (BCP) policies, processes, standards, and technologies
  10. Primary analytics programming languages
  11. External system dependencies
  12. DataOps, MLOps, DevOps, CI/CD, SCM/VCS, and Infrastructure-as-Code (IaC) automation policies, processes, standards, and technologies
  13. Data governance and data lineage policies, processes, standards, and technologies
  14. Data quality (or data assurance) policies, processes, standards, technologies, and testing methodologies
  15. Data anomaly detection policies, processes, standards, and technologies
  16. Intellectual property (IP), the ‘secret sauce’ that differentiates the organization’s processes and provides a competitive advantage, such as ML models, proprietary algorithms, datasets, highly specialized knowledge, and patents
  17. Overall effectiveness and customer satisfaction with existing analytics platform (document sources of customer feedback)
  18. Known deficiencies with current analytics processes

Outputs

Identify the deliverables required to meet the desired outcomes. For example, prepare and provide data for:

  1. Data analytics purposes
  2. Business Intelligence (BI), visualizations, and dashboards
  3. Machine Learning (ML) and Artificial Intelligence (AI)
  4. Data exports and data feeds, such as Excel or CSV-format files
  5. Hosted datasets for external or internal consumption
  6. Data APIs for external or internal consumption
  7. Documentation, Data API guides, data dictionaries, example code such as Notebooks
  8. SaaS-based product offering

Inputs

Capture sources of data that are required to produce the outputs. For example:

  1. Batch sources such as flat files from legacy systems, third-party providers, and enterprise platforms
  2. Streaming sources such as message queues, change data capture (CDC), IoT device telemetry, operational metrics, real time logs, clickstream data, connected devices, mobile, and gaming feeds
  3. Databases, including relational, NoSQL, key-value, document, in-memory, graph, time series, and wide column (OLTP data stores)
  4. Data warehouses (OLAP data stores)
  5. Data lakes
  6. API endpoints
  7. Internal, public, and licensed datasets

Use the 5 V’s of big data to dive deep into each data source: Volume, Velocity, Variety, Veracity (or Validity), and Value.

Data Producers and Consumers

Capture all producers and consumers of data:

  1. Data producers
  2. Data consumers
  3. Data access patterns
  4. Data usage patterns
  5. Consumer and producer requirements and constraints

Measures of Success

Identify how success is measured for the analytics workflows and by whom. For example:

  1. Key Performance Indicators (KPIs)
  2. Service Level Agreements (SLAs)
  3. Customer Satisfaction Score (CSAT)
  4. Net Promoter Score (NPS)
  5. SaaS growth metrics: churn, activation rate, MRR, ARR, CAC, CLV, expansion revenue (source: appcues.com)
  6. Data quality guarantees
  7. How are measurements determined, calculated, and weighted?
  8. What are the business and technical actions resulting from missed measures of success?

Results

The immediate artifact of the data analytics discovery process is a clear and concise document that captures all feedback and inputs. In addition, the document contains all customer-supplied artifacts, such as architectural and process flow diagrams. The document should be thoroughly reviewed for accuracy and completeness by the process participants. This artifact serves as a record of current data analytics workflows and a basis for making workflow improvement recommendations or architecting new workflows.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , ,

Leave a comment

The Art of Building Open Data Lakes with Apache Hudi, Kafka, Hive, and Debezium

Build near real-time, open-source data lakes on AWS using a combination of Apache Kafka, Hudi, Spark, Hive, and Debezium

Introduction

In the following post, we will learn how to build a data lake on AWS using a combination of open-source software (OSS), including Red Hat’s Debezium, Apache Kafka, Kafka Connect, Apache Hive, Apache Spark, Apache Hudi, and Hudi DeltaStreamer. We will use fully-managed AWS services to host the datasource, the data lake, and the open-source tools. These services include Amazon RDS, MKS, EKS, EMR, and S3.

The architecture and workflow demonstrated in this post

This post is an in-depth follow-up to the video demonstration, Building Open Data Lakes on AWS with Debezium and Apache Hudi.

Workflow

As shown in the architectural diagram above, these are the high-level steps in the demonstration’s workflow:

  1. Changes (inserts, updates, and deletes) are made to the datasource, a PostgreSQL database running on Amazon RDS;
  2. Kafka Connect Source Connector, utilizing Debezium and running on Amazon EKS (Kubernetes), continuously reads data from PostgreSQL WAL using Debezium;
  3. Source Connector creates and stores message schemas in Apicurio Registry, also running on Amazon EKS, in Avro format;
  4. Source Connector transforms and writes data in Apache Avro format to Apache Kafka, running on Amazon MSK;
  5. Kafka Connect Sink Connector, using Confluent S3 Sink Connector, reads messages from Kafka topics using schemas from Apicurio Registry;
  6. Sink Connector writes data to Amazon S3 in Apache Avro format;
  7. Apache Spark, using Hudi DeltaStreamer and running on Amazon EMR, reads message schemas from Apicurio Registry;
  8. DeltaStreamer reads raw Avro-format data from Amazon S3;
  9. DeltaStreamer writes data to Amazon S3 as both Copy on Write (CoW) and Merge on Read (MoR) table types;
  10. DeltaStreamer syncs Hudi tables and partitions to Apache Hive running on Amazon EMR;
  11. Queries are executed against Apache Hive Metastore or directly against Hudi tables using Apache Spark, with data returned from Hudi tables in Amazon S3;

The workflow described above actually contains two independent processes running simultaneously. Steps 2–6 represent the first process, the change data capture (CDC) process. Kafka Connect is used to continuously move changes from the database to Amazon S3. Steps 7–10 represent the second process, the data lake ingestion process. Hudi’s DeltaStreamer reads raw CDC data from Amazon S3 and writes the data back to another location in S3 (the data lake) in Apache Hudi table format. When combined, these processes can give us near real-time, incremental data ingestion of changes from the datasource to the Hudi-managed data lake.

Alternatives

This demonstration’s workflow is only one of many possible workflows to achieve similar outcomes. Alternatives include:

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. The specific resources used in this post are found in the debezium_hudi_demo directory of the GitHub repository. There are also two copies of the Museum of Modern Art (MoMA) Collection dataset from Kaggle, specifically prepared for this post, located in the moma_data directory. One copy is a nearly full dataset, and the other is a smaller, cost-effective dev/test version.

Kafka Connect

In this demonstration, Kafka Connect runs on Kubernetes, hosted on the fully-managed Amazon Elastic Kubernetes Service (Amazon EKS). Kafka Connect runs the Source and Sink Connectors.

Source Connector

The Kafka Connect Source Connector, source_connector_moma_postgres_kafka.json, used in steps 2–4 of the workflow, utilizes Debezium to continuously read changes to an Amazon RDS for PostgreSQL database. The PostgreSQL database hosts the MoMA Collection in two tables: artists and artworks.

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<your_database_hostname>",
"database.port": "5432",
"database.user": "<your_username>",
"database.password": "<your_password>",
"database.dbname": "moma",
"database.server.name": "moma",
"table.include.list": "public.artists,public.artworks",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,db,table,schema,lsn,source.ts_ms"
}

The Debezium Connector for PostgreSQL reads record-level insert, update, and delete entries from PostgreSQL’s write-ahead log (WAL). According to the PostgreSQL documentation, changes to data files must be written only after log records describing the changes have been flushed to permanent storage, thus the name, write-ahead log. The Source Connector then creates and stores Apache Avro message schemas in Apicurio Registry also running on Amazon EKS.

Apicurio Registry UI showing Avro-format Kafka message schemas
Apicurio Registry UI showing part of Avro-format Kafka message value schema for artists

Finally, the Source Connector transforms and writes Avro format messages to Apache Kafka running on the fully-managed Amazon Managed Streaming for Apache Kafka (Amazon MSK). Assuming Kafka’s topic.creation.enable property is set to true, Kafka Connect will create any necessary Kafka topics, one per database table.

Below, we see an example of a Kafka message representing an insert of a record with the artist_id 1 in the MoMA Collection database’s artists table. The record was read from the PostgreSQL WAL, transformed, and written to a corresponding Kafka topic, using the Debezium Connector for PostgreSQL. The first version represents the raw data before being transformed by Debezium. Note that the type of operation (_op) indicates a read (r). Possible values include c for create (or insert), u for update, d for delete, and r for read (applies to snapshots).

{
"payload": {
"before": null,
"after": {
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992
},
"source": {
"version": "1.7.0.Final",
"connector": "postgresql",
"name": "moma",
"ts_ms": 1640703877051,
"snapshot": "true",
"db": "moma",
"sequence": "[null,\"3668170506336\"]",
"schema": "public",
"table": "artists",
"txId": 217094,
"lsn": 3668170506336,
"xmin": null
},
"op": "r",
"ts_ms": 1640703877051,
"transaction": null
}
}

The next version represents the same record after being transformed by Debezium using the event flattening single message transformation (unwrap SMT). The final message structure represents the schema stored in Apicurio Registry. The message structure is identical to the structure of the data written to Amazon S3 by the Sink Connector.

{
"payload": {
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992,
"__op": "r",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3668438941792,
"__source_ts_ms": 1640705109121,
"__deleted": "false"
}
}

Sink Connector

The Kafka Connect Sink Connector, sink_connector_moma_kafka_s3.json, used in steps 5–6 of the workflow, implements the Confluent S3 Sink Connector. The Sink Connector reads the Avro-format messages from Kafka using the schemas stored in Apicurio Registry. It then writes the data to Amazon S3, also in Apache Avro format, based on the same schemas.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "moma.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "<your_data_lake_bucket>",
"s3.part.size": 5242880,
"flush.size": 10000,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"behavior.on.null.values": "ignore",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}

Running Kafka Connect

We first start Kafka Connect in the background to be the CDC process.

bin/connect-distributed.sh \
config/connect-distributed.properties \
> /dev/null 2>&1 &
tail -f logs/connect.log

Then, deploy the Kafka Connect Source and Sink Connectors using Kafka Connect’s RESTful API. Using the API, we can also confirm the status of the Connectors.

curl -s -d @"config/source_connector_moma_postgres_kafka.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/source_connector_moma_postgres_kafka/config | jq
curl -s -d @"config/sink_connector_moma_kafka_s3.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/sink_connector_moma_kafka_s3/config | jq
curl -s -X GET http://localhost:8083/connectors | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/source_connector_moma_postgres_kafka/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/sink_connector_moma_kafka_s3/status | jq

To confirm the two Kafka topics, moma.public.artists and moma.public.artworks, were created and contain Avro messages, we can use Kafka’s command-line tools.

# list kafka topics
bin/kafka-topics.sh –list \
–bootstrap-server $BBROKERS \
–command-config config/client-iam.properties
# read first 5 avro-format (binary) messages from topic
bin/kafka-console-consumer.sh \
–topic moma.public.artists \
–from-beginning \
–max-messages 5 \
–property print.value=true \
–property print.offset=true \
–bootstrap-server $BBROKERS \
–consumer.config config/client-iam.properties

In the short video-only clip below, we see the process of deploying the Kafka Connect Source and Sink Connectors and confirming they are working as expected.

Deploying and starting the Kafka Connect Source and Sink Connectors

The Sink Connector writes data to Amazon S3 in batches of 10k messages or every 60 seconds (one-minute intervals). These settings are configurable and highly dependent on your requirements, including message volume, message velocity, real-time analytics requirements, and available compute resources.

Amazon S3 objects containing MoMA Collection artwork records from PostgreSQL

Since we will not be querying this raw Avro-format CDC data in Amazon S3 directly, there is no need to catalog this data in Apache Hive or AWS Glue Data Catalog, a fully-managed Hive-compatible metastore.

Apache Hudi

According to the overview, Apache Hudi (pronounced “hoodie”) is the next-generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality to data lakes. Hudi provides tables, transactions, efficient upserts and deletes, advanced indexes, streaming ingestion services, data clustering, compaction optimizations, and concurrency, all while keeping data in open source file formats.

Without Hudi or an equivalent open-source data lake table format such as Apache Iceberg or Databrick’s Delta Lake, most data lakes are just of bunch of unmanaged flat files. Amazon S3 cannot natively maintain the latest view of the data, to the surprise of many who are more familiar with OLTP-style databases or OLAP-style data warehouses.

DeltaStreamer

DeltaStreamer, aka the HoodieDeltaStreamer utility (part of the hudi-utilities-bundle), used in steps 7–10 of the workflow, provides the way to perform streaming ingestion of data from different sources such as Distributed File System (DFS) and Apache Kafka.

Optionally, HoodieMultiTableDeltaStreamer, a wrapper on top of HoodieDeltaStreamer, ingests multiple tables in a single Spark job, into Hudi datasets. Currently, it only supports sequential processing of tables to be ingested and Copy on Write table type.

We are using HoodieDeltaStreamer to write to both Merge on Read (MoR) and Copy on Write (CoW) table types for demonstration purposes only. The MoR table type is a superset of the CoW table type, which stores data using a combination of columnar-based (e.g., Apache Parquet) plus row-based (e.g., Apache Avro) file formats. Updates are logged to delta files and later compacted to produce new versions of columnar files synchronously or asynchronously. Again, the choice of table types depends on your requirements.

Trade-offs between Hudi table types (table courtesy Apache Hudi documentation)
Trade-offs between Hudi table types (table courtesy Apache Hudi documentation)

Amazon EMR

For this demonstration, I’ve used the recently released Amazon EMR version 6.5.0 configured with Apache Spark 3.1.2 and Apache Hive 3.1.2. EMR 6.5.0 runs Scala version 2.12.10, Python 3.7.10, and OpenJDK Corretto-8.312. I have included the AWS CloudFormation template and parameters file used to create the EMR cluster, on GitHub.

When choosing Apache Spark, Apache Hive, or Presto on EMR 6.5.0, Apache Hudi release 0.9.0 is automatically installed.

Amazon EMR Master Node showing Apache Hudi related resources

DeltaStreamer Configuration

Below, we see the DeltaStreamer properties file, deltastreamer_artists_apicurio_mor.properties. This properties file is referenced by the Spark job that runs DeltaStreamer, shown next. The file contains properties related to the datasource, the data sink, and Apache Hive. The source of the data for DeltaStreamer is the CDC data written to Amazon S3. In this case, the datasource is the objects located in the /topics/moma.public.artworks/partition=0/ S3 object prefix. The data sink is a Hudi MoR table type in Amazon S3. DeltaStreamer will write Parquet data, partitioned by the artist’s nationality, to the /moma_mor/artists/ S3 object prefix. Lastly, DeltaStreamer will sync all tables and table partitions to Apache Hive, including creating the Hive databases and tables if they do not already exist.

# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and SchemaRegistryProvider
include=base.properties
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=moma_mor
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields=nationality
hoodie.datasource.hive_sync.table=artists
hoodie.datasource.write.drop.partition.columns=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=nationality
hoodie.datasource.write.recordkey.field=artist_id
hoodie.deltastreamer.schemaprovider.registry.url=http://<your_registry_url:post>/apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_bucket>/topics/moma.public.artists/partition=0/
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
# https://dacort.dev/posts/updating-partition-values-with-apache-hudi/
# This is required if we want to ensure we upsert a record, even if the partition changes
hoodie.index.type=GLOBAL_BLOOM
# This is required to write the data into the new partition
# defaults to false in Apache Hudi 0.8.0 (EMR 6.4.0), true in Hudi 0.9.0 (EMR 6.5.0)
# hoodie.bloom.index.update.partition.path=true
DeltaStreamer properties file for artists data using MoR table type

Below, we see the equivalent DeltaStreamer properties file for the MoMA artworks, deltastreamer_artworks_apicurio_mor.properties. There are also comparable DeltaStreamer property files for the Hudi CoW tables on GitHub.

# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and SchemaRegistryProvider
include=base.properties
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=moma_mor
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields=classification
hoodie.datasource.hive_sync.table=artworks
hoodie.datasource.write.drop.partition.columns=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=classification
hoodie.datasource.write.recordkey.field=artwork_id
hoodie.deltastreamer.schemaprovider.registry.url=http://<your_registry_url:post>/apis/ccompat/v6/subjects/moma.public.artworks-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_bucket>/topics/moma.public.artworks/partition=0/
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
# https://dacort.dev/posts/updating-partition-values-with-apache-hudi/
# This is required if we want to ensure we upsert a record, even if the partition changes
hoodie.index.type=GLOBAL_BLOOM
# This is required to write the data into the new partition
# defaults to false in Apache Hudi 0.8.0 (EMR 6.4.0), true in Hudi 0.9.0 (EMR 6.5.0)
# hoodie.bloom.index.update.partition.path=true
DeltaStreamer properties file for artworks data using MoR table type

All DeltaStreamer property files reference Apicurio Registry for the location of the Avro schemas. The schemas are used by both the Kafka Avro-format messages and the CDC-created Avro-format files in Amazon S3. Due to DeltaStreamer’s coupling with Confluent Schema Registry, as opposed to other registries, we must use Apicurio Registry’s Confluent Schema Registry API (Version 6) compatibility API endpoints (e.g., /apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest) when using the org.apache.hudi.utilities.schema.SchemaRegistryProvider datasource option with DeltaStreamer. According to Apicurio, to provide compatibility with Confluent SerDes (Serializer/Deserializer) and other clients, Apicurio Registry implements the API defined by the Confluent Schema Registry.

Apicurio Registry exposes multiple APIs

Running DeltaStreamer

The properties files are loaded by Spark jobs that call the DeltaStreamer library, using spark-submit. Below, we see an example Spark job that calls the DeltaStreamer class. DeltaStreamer reads the raw Avro-format CDC data from S3 and writes the data using the Hudi MoR table type into the /moma_mor/artists/ S3 object prefix. In this Spark particular job, we are using the continuous option. DeltaStreamer runs in continuous mode using this option, running source-fetch, transform, and write in a loop. We are also using the UPSERT write operation (op). Operation options include UPSERT, INSERT, and BULK_INSERT. This set of options is ideal for inserting ongoing changes to CDC data into Hudi tables. You can run jobs in the foreground or background on EMR’s Master Node or as EMR Steps from the Amazon EMR console.

export DATA_LAKE_BUCKET="<your_data_lake_bucket_name>"
# artists data, MoR table type, continuous upserts
spark-submit \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–props "s3://${DATA_LAKE_BUCKET}/hudi/deltastreamer_artists_apicurio_mor.properties" \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–target-base-path "s3://${DATA_LAKE_BUCKET}/moma/artists_mor/" \
–target-table moma_mor.artists \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–enable-sync \
–continuous \
–op UPSERT

Below, we see another example DeltaStreamer Spark job that reads the raw Avro-format CDC data from S3 and writes the data using the MoR table type into the /moma_mor/artworks/ S3 object prefix. This example uses the BULK_INSERT write operation (op) and the filter-dupes option. The filter-dupes option ensures that should duplicate records from the source are dropped/filtered out before INSERT or BULK_INSERT. This set of options is ideal for the initial bulk inserting of existing data into Hudi tables. The job runs one time and completes, unlike the previous example that ran continuously.

export DATA_LAKE_BUCKET="<your_data_lake_bucket_name>"
# artworks data, MoR table type, 1x bulk insert
spark-submit \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–props "s3://${DATA_LAKE_BUCKET}/hudi/deltastreamer_artworks_apicurio_mor.properties" \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–target-base-path "s3://${DATA_LAKE_BUCKET}/moma/artworks_mor/" \
–target-table moma_mor.artworks \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–enable-sync \
–op BULK_INSERT \
–filter-dupes

Syncing with Hive

The following abridged, video-only clip demonstrates the differences between the Hudi CoW and MoR table types with respect to Apache Hive. In the video, we run the deltastreamer_jobs_bulk_bkgd.sh script, included on GitHub. This script runs four different Apache Spark jobs, using Hudi DeltaStreamer to bulk-ingest all the artists and artworks CDC data from Amazon S3 into both Hudi CoW and MoR table types. Once the four Spark jobs are complete, the script queries Apache Hive and displays the new Hive databases and database tables created by DeltaStreamer.

Hudi DeltaStreamer Spark jobs running on the Amazon EMR

In both the video above and terminal screengrab below, note the difference in the tables created within the two Hive databases, the Hudi CoW table type (moma_cow) and the MoR table type (moma_mor). The MoR table type creates both a read-optimized table (_ro) as well as a real-time table (_rt) for each datasource (e.g., artists_ro and artists_rt).

View of the Apache Hive CoW and MoR database tables

According to documentation, Hudi creates two tables in the Hive metastore for the MoR table type. The first, a table which is a read-optimized view appended with _ro and the second, a table with the same name appended with _rt which is a real-time view. According to Hudi, the read-optimized view exposes columnar Parquet while the real-time view exposes columnar Parquet and/or row-based logs; you can query both tables. The CoW table type creates a single table without a suffix for each datasource (e.g., artists). Below, we see the Hive table structure for the artists_rt table, created by DeltaStreamer, using SHOW CREATE TABLE moma_mor.artists_rt;.

CREATE EXTERNAL TABLE `moma_mor.artists_rt`(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`artist_id` int,
`name` string,
`gender` string,
`birth_year` int,
`death_year` int,
`__op` string,
`__db` string,
`__table` string,
`__schema` string,
`__lsn` bigint,
`__source_ts_ms` bigint,
`__deleted` string)
PARTITIONED BY (
`nationality` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'hoodie.query.as.ro.table'='false',
'path'='s3://<your_data_lake_bucket>/moma/artists_mor')
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://<your_data_lake_bucket>/moma/artists_mor'
TBLPROPERTIES (
'bucketing_version'='2',
'last_commit_time_sync'='20211230180429',
'spark.sql.partitionProvider'='catalog',
'spark.sql.sources.provider'='hudi',
'spark.sql.sources.schema.numPartCols'='1',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"artist_id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"gender","type":"string","nullable":true,"metadata":{}},{"name":"birth_year","type":"integer","nullable":true,"metadata":{}},{"name":"death_year","type":"integer","nullable":true,"metadata":{}},{"name":"__op","type":"string","nullable":true,"metadata":{}},{"name":"__db","type":"string","nullable":true,"metadata":{}},{"name":"__table","type":"string","nullable":true,"metadata":{}},{"name":"__schema","type":"string","nullable":true,"metadata":{}},{"name":"__lsn","type":"long","nullable":true,"metadata":{}},{"name":"__source_ts_ms","type":"long","nullable":true,"metadata":{}},{"name":"__deleted","type":"string","nullable":true,"metadata":{}},{"name":"nationality","type":"string","nullable":true,"metadata":{}}]}',
'spark.sql.sources.schema.partCol.0'='nationality',
'transient_lastDdlTime'='1640919578')

Having run the demonstration’s deltastreamer_jobs_bulk_bkgd.sh script, the resulting object structure in the Hudi-managed section of the Amazon S3 bucket looks as follows.

S3 object structure in Hudi-managed Amazon S3 bucket

Below is an example of Hudi files created in the /moma/artists_cow/ S3 object prefix. When using data lake table formats like Hudi, given its specialized directory structure and the high number of objects, interactions with the data should be abstracted through Hudi’s programming interfaces. Generally speaking, you do not interact directly with the objects in a data lake.

"moma/artists_cow/.hoodie/.aux/.bootstrap/.fileids_$folder$",
"moma/artists_cow/.hoodie/.aux/.bootstrap/.partitions_$folder$",
"moma/artists_cow/.hoodie/.aux/.bootstrap_$folder$",
"moma/artists_cow/.hoodie/.aux_$folder$",
"moma/artists_cow/.hoodie/.temp_$folder$",
"moma/artists_cow/.hoodie/20211231203737.commit",
"moma/artists_cow/.hoodie/20211231203737.commit.requested",
"moma/artists_cow/.hoodie/20211231203737.inflight",
"moma/artists_cow/.hoodie/20211231203738.rollback",
"moma/artists_cow/.hoodie/20211231203738.rollback.inflight",
"moma/artists_cow/.hoodie/archived_$folder$",
"moma/artists_cow/.hoodie/hoodie.properties",
"moma/artists_cow/.hoodie_$folder$",
"moma/artists_cow/nationality=Afghan/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Afghan/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-0_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Afghan_$folder$",
"moma/artists_cow/nationality=Albanian/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Albanian/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-1_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Albanian_$folder$",
"moma/artists_cow/nationality=Algerian/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Algerian/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-2_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Algerian_$folder$",
"moma/artists_cow/nationality=American/.hoodie_partition_metadata",
"moma/artists_cow/nationality=American/0065ed77-4a6c-4755-b133-45126310936d-0_502-28-3854_20211231203737.parquet",
"moma/artists_cow/nationality=American/011d5c57-c918-40d8-8518-c3cb56747133-0_15-28-3367_20211231203737.parquet"
Hudi CLI commands used in the next video

Hudi CLI

Optionally, we can inspect the Hudi tables using the Hudi CLI (hudi-cli). The CLI offers an extensive list of available commands. Using the CLI, we can inspect the Hudi tables and their schemas, and review operational statistics like write amplification (the number of bytes written for 1 byte of incoming data), commits, and compactions.

> hudi-cli
help
connect –path s3://<your_data_lake_bucket>/moma/artworks_mor/
connect –path s3://<your_data_lake_bucket>/moma/artworks_cow/
desc
fetch table schema
commits show
stats wa
compactions show all
Using the Hudi CLI from the Amazon EMR Master Node

The following short video-only clip shows the use of the Hudi CLI, running on the Amazon EMR Master Node, to inspect the Hudi tables in S3.

Using the Hudi CLI from the Amazon EMR Master Node

Hudi Data Structure

Recall the sample Kafka message we saw earlier in the post representing an insert of an artist record with the artist_id 1. Below, we see what the same record looks like after being ingested by Hudi DeltaStreamer. Note the five additional fields added by Hudi with the _hoodie_ prefix.

{
"_hoodie_commit_time": "20211227215352",
"_hoodie_commit_seqno": "20211227215352_63_7301",
"_hoodie_record_key": "1",
"_hoodie_partition_path": "nationality=American",
"_hoodie_file_name": "0e91bb5b-aa93-42a9-933d-242f5fda1b8f-0_63-24-4710_20211227215352.parquet",
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992,
"__op": "r",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3637434647944,
"__source_ts_ms": 1640566580452,
"__deleted": "false"
}

Querying Hudi-managed Data

With the initial data ingestion complete and the CDC and DeltaStreamer processes monitoring for future changes, we can query the resulting data stored in Hudi tables. First, we will make some changes to the PostgreSQL MoMA Collection database to see how Hudi manages the data mutations. We could also make changes directly to the Hudi tables using Hive, Spark, or Presto. However, that would cause our datasource to be out of sync with the Hudi tables, potentially negating the entire CDC process. When developing a data lake, this is a critically important consideration — how changes are introduced to Hudi tables, especially when CDC is involved, and whether data continuity between datasources and the data lake is essential.

For the demonstration, I have made a series of arbitrary updates to a piece of artwork in the MoMA Collection database, ‘Picador (La Pique)’ by Pablo Picasso.

'Picador (La Pique)', by Pablo Picasso
SELECT *
FROM artworks
WHERE artwork_id = 128447 AND classification = 'Print';
firts update (creation date)
UPDATE artworks
SET date = 1959
WHERE artwork_id = 128447;
second update (acquisition date)
UPDATE artworks
SET acquisition_date = '2009-04-15'
WHERE artwork_id = 128447;
third update (in vs. '')
UPDATE artworks
SET dimensions = 'composition: 20 13/16 x 25 3/16 in (52.9 x 64 cm); sheet: 24 7/16 x 29 1/2 in (62.1 x 75 cm)'
WHERE artwork_id = 128447;
fourth update (acquisition date)
UPDATE artworks
SET acquisition_date = '2009-04-19'
WHERE artwork_id = 128447;

Below, note the last four objects shown in S3. Judging by the file names and dates, we can see that the CDC process, using Kafka Connect, has picked up the four updates I made to the record in the database. The Source Connector first wrote the changes to Kafka. The Sink Connector then read those Kafka messages and wrote the data to Amazon S3 in Avro format, as shown below.

Looking again at S3, we can also observe that DeltaStreamer picked up the new CDC objects in Amazon S3 and wrote them to both the Hudi CoW and MoR tables. Note the file types shown below. Given Hudi’s MoR table type structure, Hudi first logged the changes to row-based delta files and later compacted them to produce a new version of the columnar-format Parquet file.

Hudi MoR row-based delta log files and compacted Parquet files

Querying Results from Apache Hive

There are several ways to query Hudi-managed data in S3. In this demonstration, they include against Apache Hive using the hive client from the command line, against Hive using Spark, and against the Hudi tables also using Spark. We could also install Presto on EMR to query the Hudi data directly or via Hive.

Querying the real-time artwork_rt table in Hive after we make each database change, we can observe the data in Hudi reflects the updates. Note that the value of the _hoodie_file_name field for the first three updates is a Hudi delta log file, while the value for the last update is a Parquet file. The Parquet file signifies compaction occurred between the fourth update was made, and the time the Hive query was executed. Lastly, note the type of operation (_op) indicates an update change (u) for all records.

Querying the data in the Hudi MoR real-time table as we make changes to the database

Once all fours database updates are complete and compaction has occurred, we should observe identical results from all Hive tables. Below, note the _hoodie_file_name field for all three tables is a Parquet file. Logically, the Parquet file for the MoR read-optimized and real-time Hive tables is the same.

Querying the same record in all three Hive tables: Hudi MoR _ro and _rt tables and CoW table

Had we queried the data previous to compaction, the results would have differed. Below we have three queries. I further updated the artwork record, changing the date field from 1959 to 1960. The read-optimized MoR table, artworks_ro, still reflects the original date value, 1959, before the update and prior to compaction. The real-time table,artworks_rt , reflects the latest update to the date field, 1960. Note that the value of the _hoodie_file_name field for the read-optimized table is a Parquet file, while the value for the real-time table (artworks_rt), the third and final query, is a delta log file. The delta log allows the real-time table to display the most current state of the data in Hudi.

Querying the same record in all three Hive tables

Below are a few useful Hive commands to query the changes in Hudi.

beeline or hive
beeline connect
!connect jdbc:hive2://localhost:10000/default
SHOW DATABASES;
DESCRIBE DATABASE moma_mor;
USE moma_cow;SHOW TABLES;
USE moma_mor;SHOW TABLES;
USE moma_mor;DESCRIBE artworks_ro;
MSCK REPAIR TABLE moma_mor.artworks_ro;
SHOW PARTITIONS moma_mor.artworks_ro;
ANALYZE TABLE moma_mor.artists_rt COMPUTE STATISTICS;
DESCRIBE EXTENDED moma_mor.artists_rt;
test query performance without caching
set hive.query.results.cache.enabled=false;
100 rows selected (1.394 seconds) <- read-optimized vs. real-time table
SELECT * FROM moma_mor.artworks_ro WHERE department='Prints & Illustrated Books' LIMIT 100;
100 rows selected (2.371 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE department='Prints & Illustrated Books' LIMIT 100;
10 rows selected (0.719 seconds) <- read-optimized vs. real-time table, classification is partitioned
SELECT * FROM moma_mor.artworks_ro WHERE classification='Print' LIMIT 10;
10 rows selected (1.482 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE classification='Print' LIMIT 10;
EXPLAIN EXTENDED SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447 AND classification='Print';
1 row selected (14.126 seconds) <- read-optimized vs. real-time table
SELECT * FROM moma_mor.artworks_ro WHERE artwork_id=128447;
1 row selected (32.877 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447;
1 row selected (1.491 seconds) <- classification is partitioned
SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447 AND classification='Print';
84 rows selected (8.618 seconds)
SELECT artworks.title AS title,
artworks.`date` AS created,
artworks.name AS artist,
artists.nationality AS nationality,
artworks.classification AS classification
FROM moma_cow.artworks artworks
JOIN moma_cow.artists artists ON (artworks.artist_id = artists.artist_id)
WHERE artworks.artist_id = 4609
AND nationality = 'Spanish'
AND classification = 'Print'
AND artworks.`date` IS NOT NULL
ORDER BY created, title;

Deletes with Hudi

In addition to inserts and updates (upserts), Apache Hudi can manage deletes. Hudi supports implementing two types of deletes on data stored in Hudi tables: soft deletes and hard deletes. Given this demonstration’s specific configuration for CDC and DeltaStreamer, we will use soft deletes. Soft deletes retain the record key and nullify the other field’s values. Hard deletes, a stronger form of deletion, physically remove any record trace from the Hudi table.

Below, we see the CDC record for the artist with artist_id 441. The event flattening single message transformation (SMT), used by the Debezium-based Kafka Connect Source Connector, adds the __deleted field with a value of true and nullifies all fields except the record’s key, artist_id, which is required.

{
"artist_id" : 441,
"name" : null,
"nationality" : null,
"gender" : null,
"birth_year" : null,
"death_year" : null,
"__op" : {
"string" : "d"
},
"__db" : {
"string" : "moma"
},
"__table" : {
"string" : "artists"
},
"__schema" : {
"string" : "public"
},
"__lsn" : {
"long" : 3692866569488
},
"__source_ts_ms" : {
"long" : 1640814436010
},
"__deleted" : {
"string" : "true"
}
}

Below, we see the same delete record for the artist with artist_id 441 in the Hudi MoR table. All the null fields have been removed.

{
"_hoodie_commit_time": "20211229225047",
"_hoodie_commit_seqno": "20211229225047_1_1",
"_hoodie_record_key": "441",
"_hoodie_partition_path": "nationality=default",
"_hoodie_file_name": "2a98931a-6015-438e-be78-1eff80a75f83-2_1-24-15431_20211229225047.parquet",
"artist_id": 441,
"__op": "d",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3692866569488,
"__source_ts_ms": 1640814436010,
"__deleted": "true"
}

Below, we see how the deleted record appears in the three Hive CoW and MoR artwork tables. Note the query results from the read-optimized MoR table, artworks_ro, contains two records — the original record (r) and the deleted record (d). The data is partitioned by nationality, and since the record was deleted, the nationality field is changed to null. In S3, Hudi represents this partition as nationality=default. The record now exists in two different Parquet files, within two separate partitions, something to be aware of when querying the read-optimized MoR table.

Results of a database delete as shown in Hive CoW and MoR tables

Time Travel

According to the documentation, Hudi has supported time travel queries since version 0.9.0. With time travel, you can query the previous state of your data. Time travel is particularly useful for use cases, including rollbacks, debugging, and audit history.

To demonstrate time travel queries in Hudi, we start by making some additional changes to the source database. For this demonstration, I made a series of five updates and finally a delete to the artist record with artist_id 299 in the PostgreSQL database over a few-hour period.

first update (birth)
UPDATE public.artists
SET birth_year = 1907
WHERE artist_id = 299;
second update (death)
UPDATE public.artists
SET death_year = 1989
WHERE artist_id = 299;
third update (middle initial)
UPDATE public.artists
SET name = 'Gerhard M. Bakker'
WHERE artist_id = 299;
fourth update (nationality – impacts partitions)
UPDATE public.artists
SET nationality = 'German'
WHERE artist_id = 299;
fifth update (birth)
UPDATE public.artists
SET birth_year = 1905
WHERE artist_id = 299;
delete
DELETE
FROM public.artists
WHERE artist_id = 299;

Once the CDC and DeltaStreamer ingestion processes are complete, we can use Hudi’s time travel query capability to view the state of data in Hudi at different points in time (instants). To do so, we need to provide an as.an.instant date/time value to Spark (see line 21 below).

Based on the time period in which I made the five updates and the delete, I have chosen six instants during that period where I want to examine the state of the record. Below is an example of the PySpark code from a Jupyter Notebook used to perform the six time travel queries against the Hudi MoR artist’s table.

from datetime import timedelta
from dateutil import parser
base_path = "s3://open-data-lake-demo-us-east-1/moma/artists_mor"
instances = [ # times in EST
"2021-12-30 08:00:00", # reflects original record (r)
"2021-12-30 09:00:00", # refects updates 1 and 2 (u)
"2021-12-30 09:30:00", # refects updates 3 (u)
"2021-12-30 11:00:00", # refects updates 4 (u)
"2021-12-30 12:30:00", # refects updates 5 (u)
"2021-12-30 14:00:00", # refects delete (d)
]
for instant in instants:
as_of_instant = parser.parse(instant) + timedelta(hours=5) # adjust EST for UTC
print(f"Record state as of: {as_of_instant}")
artistsSnapshotDF = (
spark.read.format("hudi").option("as.of.instant", as_of_instant).load(base_path)
)
artistsSnapshotDF.createOrReplaceTempView("hudi_artists_snapshot")
spark.sql(
"""
SELECT _hoodie_commit_time, __op, _hoodie_partition_path, name, nationality, gender, birth_year, death_year
FROM hudi_artists_snapshot
WHERE artist_id=299;
"""
).show()

Below, we see the results of the time travel queries. At each instant, we can observe the mutating state of the data in the Hudi MoR Artist’s table, including the initial bulk insert of the existing snapshot of data (r) and the delete record (d). Since the delete made in the PostgreSQL database was recorded as a soft delete in Hudi, as opposed to a hard delete, we are still able to retrieve the record at any instant.

Record state as of: 2021-12-30 13:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230034812| r| nationality=American|Gerhard H. Bakker| American| Male| 1906| 1988|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 14:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230132628| u| nationality=American|Gerhard H. Bakker| American| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 14:30:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230142035| u| nationality=American|Gerhard M. Bakker| American| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 16:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230144237| u| nationality=German|Gerhard M. Bakker| German| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 17:30:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230171925| u| nationality=German|Gerhard M. Bakker| German| Male| 1905| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 19:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230180429| d| nationality=default| null| null| null| null| null|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Results of the time travel queries, ordered by commit time

In addition to time travel queries, Hudi also offers incremental queries and point in time queries.

Conclusion

Although this post only scratches the surface of the capabilities of Debezium and Hudi, you can see the power of CDC using Kafka Connect and Debezium, combined with Hudi, to build and manage open data lakes on AWS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

DevOps for DataOps: Building a CI/CD Pipeline for Apache Airflow DAGs

Build an effective CI/CD pipeline to test and deploy your Apache Airflow DAGs to Amazon MWAA using GitHub Actions

Introduction

In this post, we will learn how to use GitHub Actions to build an effective CI/CD workflow for our Apache Airflow DAGs. We will use the DevOps concepts of Continuous Integration and Continuous Delivery to automate the testing and deployment of Airflow DAGs to Amazon Managed Workflows for Apache Airflow (Amazon MWAA) on AWS.

Fork and pull model of collaborative Airflow development used in this post

Technologies

Apache Airflow

According to the documentation, Apache Airflow is an open-source platform to author, schedule, and monitor workflows programmatically. With Airflow, you author workflows as Directed Acyclic Graphs (DAGs) of tasks written in Python.

Amazon Managed Workflows for Apache Airflow

According to AWS, Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a highly available, secure, and fully-managed workflow orchestration for Apache Airflow. MWAA automatically scales its workflow execution capacity to meet your needs and is integrated with AWS security services to help provide fast and secure access to data.

Example of Apache Airflow UI within Amazon MWAA Environment

GitHub Actions

According to GitHub, GitHub Actions makes it easy to automate software workflows with CI/CD. GitHub Actions allow you to build, test, and deploy code right from GitHub. GitHub Actions are workflows triggered by GitHub events like push, issue creation, or a new release. You can leverage GitHub Actions prebuilt and maintained by the community.

Example of GitHub Action workflow running in the GitHub repository used in this post

If you are new to GitHub Actions, I recommend my previous post, Continuous Integration and Deployment of Docker Images using GitHub Actions.

Terminology

DataOps

According to Wikipedia, DataOps is an automated, process-oriented methodology used by analytic and data teams to improve the quality and reduce the cycle time of data analytics. While DataOps began as a set of best practices, it has now matured to become a new approach to data analytics.

DataOps applies to the entire data lifecycle from data preparation to reporting and recognizes the interconnected nature of the data analytics team and IT operations. DataOps incorporates the Agile methodology to shorten the software development life cycle (SDLC) of analytics development.

DevOps

According to Wikipedia, DevOps is a set of practices that combines software development (Dev) and IT operations (Ops). It aims to shorten the systems development life cycle and provide continuous delivery with high software quality.

DevOps is a set of practices intended to reduce the time between committing a change to a system and the change being placed into normal production, while ensuring high quality. -Wikipedia

Fail Fast

According to Wikipedia, a fail-fast system is one that immediately reports any condition that is likely to indicate a failure. Using the DevOps concept of fail fast, we build steps into our workflows to uncover errors sooner in the SDLC. We shift testing as far to the left as possible (referring to the pipeline of steps moving from left to right) and test at multiple points along the way.

Source Code

All source code for this demonstration, including the GitHub Actions, Pytest unit tests, and Git Hooks, is open-sourced and located on GitHub.

Architecture

The diagram below represents the architecture for a recent blog post and video demonstration, Lakehouse Automation on AWS with Apache Airflow. The post and video show how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based data lake using Apache Airflow.

Architecture for the post and video, Lakehouse Automation on AWS with Apache Airflow

In this post, we will review how the DAGs from the previous were developed, tested, and deployed to MWAA using a variety of progressively more effective CI/CD workflows. The workflows demonstrated could also be easily applied to other Airflow resources in addition to DAGs, such as SQL scripts, configuration and data files, Python requirement files, and plugins.

Workflows

No DevOps

Below we see a minimally viable workflow for loading DAGs into Amazon MWAA, which does not use the principles of CI/CD. Changes are made in the local Airflow developer’s environment. The modified DAGs are copied directly to the Amazon S3 bucket, which are then automatically synced with Amazon MWAA, barring any errors. Those changes are also (hopefully) pushed back to the centralized version control or source code management (SCM) system, which is GitHub in this post.

Error-prone, non-DevOps workflow for modifying and syncing DAGs to MWAA

There are at least two significant issues with this error-prone workflow. First, the DAGs are always out of sync between the Amazon S3 bucket and GitHub. These are two independent steps — copying or syncing the DAGs to S3 and pushing the DAGs to GitHub. A developer might continue making changes and pushing DAGs to S3 without pushing to GitHub or vice versa.

Secondly, the DevOps concept of fail-fast is missing. The first time you know your DAG contains errors is likely when it is synced to MWAA and throws an Import Error. By then, the DAG has already been copied to S3, synced to MWAA, and possibly pushed to GitHub, which other developers could then pull.

Example of a typical DAG Import Error, easily caught with a simple test

GitHub Actions

A significant step up from the previous workflow is using GitHub Actions to test and deploy your code after pushing it to GitHub. Although in this workflow, code is still ‘pushed straight to Trunk’ (the main branch in GitHub) and risks other developers in a collaborative environment pulling potentially erroneous code, you have far less chance of DAG errors making it to MWAA.

GitHub Actions allow you to fail faster and catch errors sooner

Using GitHub Actions, you also eliminate human error that could result in the changes to DAGs not being synced to Amazon S3. Lastly, using this workflow improves security by eliminating the need to provide direct access to the Airflow Amazon S3 bucket to Airflow Developers.

Fork and pull model of collaborative Airflow development used in this post (video only)

Types of Tests

The first GitHub Action, test_dags.yml, is triggered on a push to the dags directory in the main branch of the repository. It is also triggered whenever a pull request is made for the main branch. The first GitHub Action runs a battery of tests, including checking Python dependencies, code style, code quality, DAG import errors, and unit tests. The tests catch issues with DAGs before being synced to S3 by a second GitHub Action.

name: Test DAGs

on:
  push:
    paths:
      - 'dags/**'
  pull_request:
    branches:
      - main

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.7'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements/requirements.txt
        pip check
    - name: Lint with Flake8
      run: |
        pip install flake8
        flake8 --ignore E501 dags --benchmark -v
    - name: Confirm Black code compliance (psf/black)
      run: |
        pip install pytest-black
        pytest dags --black -v
    - name: Test with Pytest
      run: |
        pip install pytest
        cd tests || exit
        pytest tests.py -v

Successful runs of the ‘Test DAGs’ GitHub Action, shown in the Actions Console

Python Dependencies

The first test installs the modules listed in the requirements.txt file used locally to develop the application. This test is designed to uncover any missing or conflicting modules.

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements/requirements.txt
pip check

It is essential to develop your DAGs against the same version of Python and with the same version of the Python modules used in your Airflow environment. You can use the BashOperator to run shell commands to obtain the versions of Python and module installed in your Airflow environment:

python3 --version; python3 -m pip list

A snippet of log output from DAG showing Python version and Python modules available in MWAA 2.0.2:

Python version and Python modules available in MWAA 2.0.2

The latest stable release of Airflow is currently version 2.2.2, released 2021-11-15. However, as of December 2021, Amazon’s latest version of MWAA 2.x is version 2.0.2, released 2021-04-19. MWAA 2.0.2 currently runs Python3 version 3.7.10.

Available versions of Amazon MWAA as of December 2021

Flake8

Known as ‘your tool for style guide enforcement,’ Flake8 is described as the modular source code checker. It is a command-line utility for enforcing style consistency across Python projects. Flake8 is a wrapper around PyFlakes, pycodestyle, and Ned Batchelder’s McCabe script. The module, pycodestyle, is a tool to check your Python code against some of the style conventions in PEP 8.

Flake8 is highly configurable, with options to ignore specific rules if not required by your development team. For example, in this demonstration, I intentionally ignored rule E501, which states that ‘line length should be limited to 72 characters.

- name: Lint with Flake8
run: |
pip install flake8
flake8 --ignore E501 dags --benchmark -v

Black

Known as ‘the uncompromising code formatter,’ Python code formatted using Black (referred to as Blackened code) looks the same regardless of the project you’re reading. Formatting becomes transparent, allowing teams to focus on the content instead. Black makes code review faster by producing the smallest diffs possible, assuming all developers are using black to format their code.

The Airflow DAGs in this GitHub repository are automatically formatted with black using a pre-commit Git Hooks before being committed and pushed to GitHub. The test confirms black code compliance.

- name: Confirm Black code compliance (psf/black)
run: |
pip install pytest-black
pytest dags --black -v

Pytest

The pytest framework describes itself as a mature, fully-featured Python testing tool that helps you write better programs. The Pytest framework makes it easy to write small tests yet scales to support complex functional testing for applications and libraries.

The GitHub Action in the GitHub project, test_dags.yml, calls the tests.py file, also contained in the project.

- name: Test with Pytest
run: |
pip install pytest
cd tests || exit
pytest tests.py -v

The tests.py file contains several pytest unit tests. The tests are based on my project requirements; your tests will vary. These tests confirm that all DAGs:

  1. Do not contain DAG Import Errors (test catches 75% of my errors);
  2. Follow specific file naming conventions;
  3. Include a description and an owner other than ‘airflow’;
  4. Contain required project tags;
  5. Do not send emails (my projects use SNS or Slack for notifications);
  6. Do not retry more than three times;
import os
import sys
import pytest
from airflow.models import DagBag
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags"))
sys.path.append(os.path.join(os.path.dirname(__file__), "../dags/utilities"))
# Airflow variables called from DAGs under test are stubbed out
os.environ["AIRFLOW_VAR_DATA_LAKE_BUCKET"] = "test_bucket"
os.environ["AIRFLOW_VAR_ATHENA_QUERY_RESULTS"] = "SELECT 1;"
os.environ["AIRFLOW_VAR_SNS_TOPIC"] = "test_topic"
os.environ["AIRFLOW_VAR_REDSHIFT_UNLOAD_IAM_ROLE"] = "test_role_1"
os.environ["AIRFLOW_VAR_GLUE_CRAWLER_IAM_ROLE"] = "test_role_2"
@pytest.fixture(params=["../dags/"])
def dag_bag(request):
return DagBag(dag_folder=request.param, include_examples=False)
def test_no_import_errors(dag_bag):
assert not dag_bag.import_errors
def test_requires_tags(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.tags
def test_requires_specific_tag(dag_bag):
for dag_id, dag in dag_bag.dags.items():
try:
assert dag.tags.index("data lake demo") >= 0
except ValueError:
assert dag.tags.index("redshift demo") >= 0
def test_desc_len_greater_than_fifteen(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.description) > 15
def test_owner_len_greater_than_five(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert len(dag.owner) > 5
def test_owner_not_airflow(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag.owner) != "airflow"
def test_no_emails_on_retry(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_retry"]
def test_no_emails_on_failure(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert not dag.default_args["email_on_failure"]
def test_three_or_less_retries(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.default_args["retries"] <= 3
def test_dag_id_contains_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).find("__") != -1
def test_dag_id_requires_specific_prefix(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert str.lower(dag_id).startswith("data_lake__") \
or str.lower(dag_id).startswith("redshift_demo__")

If you are building custom Airflow Operators, additional unit, functional, and integration tests are recommended.

Fork and Pull

We can improve on the practice of pushing directly to Trunk by implementing one of two collaborative development models, recommended by GitHub:

  1. The Shared repository model: uses ‘topic’ branches, which are reviewed, approved, and merged into the main branch.
  2. Fork and pull model: a repo is forked, changes are made, a pull request is created, the request is reviewed, and if approved, merged into the main branch.

In the fork and pull model, we create a fork of the DAG repository where we make our changes. We then commit and push those changes back to the forked repository. When ready, we create a pull request. If the pull request is approved and passes all the tests, it is manually or automatically merged into the main branch. DAGs are then synced to S3 and, eventually, to MWAA. I usually prefer to trigger merges manually once all tests have passed.

The fork and pull model greatly reduces the chance that bad code is merged to the main branch before passing all tests.

Errors are caught early in the fork and pull model prior to merging code changes

Syncing DAGs to S3

The second GitHub Action in the GitHub project, sync_dags.yml, is triggered when the previous Action, test_dags.yml, completes successfully, or in the case of the folk and pull method, the merge to the main branch is successful.

name: Sync DAGs

on:
workflow_run:
workflows:
- 'Test DAGs'
types:
- completed
pull_request:
types:
- closed

jobs:
deploy:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- uses: actions/checkout@master
- uses: jakejarvis/s3-sync-action@master
env:
AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: 'us-east-1'
SOURCE_DIR: 'dags'
DEST_DIR: 'dags'

The GitHub Action, sync_dags.yml, requires three GitHub encrypted secrets, created in advance and associated with the GitHub repository. According to GitHub, secrets are encrypted environment variables you create in an organization, repository, or repository environment. Encrypted secrets allow you to store sensitive information, such as access tokens, in your repository. The secrets that you create are available to use in GitHub Actions workflows.

Encrypted repository secrets used by GitHub Action to sync with Amazon S3

The DAGs are synced to Amazon S3 and, eventually, automatically synced to MWAA.

GitHub Action syncs DAGs to Amazon S3 if tests are successful

Local Testing and Git Hooks

To further improve your CI/CD workflows, you should consider using Git Hooks. Using Git Hooks, we can ensure code is tested locally before committing and pushing changes to GitHub. Testing locally allows us to fail-faster, catching errors during development instead of once code is pushed to GitHub.

Errors are caught even early using Git Hooks

According to the documentation, Git has a way to fire off custom scripts when certain important actions occur. There are two types of hooks: client-side and server-side. Client-side hooks are triggered by operations such as committing and merging, while server-side hooks run on network operations such as receiving pushed commits.

You can use these hooks for all sorts of reasons. I often use a client-side pre-commit hook to format DAGs using black. Using a client-side pre-push Git Hook, we will ensure that tests are run before pushing the DAGs to GitHub. According to Git, The pre-push hook runs when the git push command is executed after the remote refs have been updated but before any objects have been transferred. You can use it to validate a set of ref updates before a push occurs. A non-zero exit code will abort the push. The test could instead be run as part of the pre-commit hook if they are not too time-consuming.

To use the pre-push hook, create the following file within the local repository, .git/hooks/pre-push:

#!/bin/sh
# do nothing if there are no commits to push
if [ -z "$(git log @{u}..)" ]; then
exit 0
fi
sh ./run_tests_locally.sh

Then, run the following chmod command to make the hook executable:

chmod 755 .git/hooks/pre-push

The the pre-push hook runs the shell script, run_tests_locally.sh. The script executes nearly identical tests, locally, as the GitHub Action, test_dags.yml, does remotely on GitHub:

#!/bin/sh
echo "Starting Flake8 test..."
flake8 --ignore E501 dags --benchmark || exit 1
echo "Starting Black test..."
python3 -m pytest --cache-clear
python3 -m pytest dags/ --black -v || exit 1
echo "Starting Pytest tests..."
cd tests || exit
python3 -m pytest tests.py -v || exit 1
echo "All tests completed successfully! 🥳"

Complete CI/CD workflow including running tests locally using a Git Hook (video only)

References

Here are some additional references for testing and deploying Airflow DAGs and the use of GitHub Actions:

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Video Demonstration: Lakehouse Automation on AWS with Apache Airflow

Programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow

Introduction

In the following video demonstration, we will learn how to programmatically load and upload data from Amazon Redshift to an Amazon S3-based Data Lake using Apache Airflow. Since we are on AWS, we will be using the fully-managed Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Using Airflow, we will COPY raw data into staging tables, then merge that staging data into a series of tables. We will then load incremental data into Redshift on a regular schedule. Next, we will join and aggregate data from several tables and UNLOAD the resulting dataset to an Amazon S3-based data lake. Lastly, we will catalog the data in S3 using AWS Glue and query with Amazon Athena.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL statements, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs included in the GitHub project are:

Demonstration DAGs as seen in MWAA Airflow UI

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Video Demonstration: Building a Data Lake with Apache Airflow

Build a simple Data Lake on AWS using a combination of services, including Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue, AWS Glue Studio, Amazon Athena, and Amazon S3

Introduction

In the following video demonstration, we will build a simple data lake on AWS using a combination of services, including Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue Data Catalog, AWS Glue Crawlers, AWS Glue Jobs, AWS Glue Studio, Amazon Athena, Amazon Relational Database Service (Amazon RDS), and Amazon S3.

Using a series of Airflow DAGs (Directed Acyclic Graphs), we will catalog and move data from three separate data sources into our Amazon S3-based data lake. Once in the data lake, we will perform ETL (or more accurately ELT) on the raw data — cleansing, augmenting, and preparing it for data analytics. Finally, we will perform aggregations on the refined data and write those final datasets back to our data lake. The data lake will be organized around the data lake pattern of bronze (aka raw), silver (aka refined), and gold (aka aggregated) data, popularized by Databricks.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the Airflow DAGsSQL files, and data files, is open-sourced and located on GitHub.

DAGs

The DAGs shown in the video demonstration have been renamed for easier project management within the Airflow UI. The DAGs included in the GitHub project are as follows:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Video Demonstration: Building a Data Lake on AWS

Build a simple Data Lake on AWS using a combination of services, including AWS Glue, AWS Glue Studio, Amazon Athena, and Amazon S3

Introduction

In the following video demonstration, we will build a simple data lake on AWS using a combination of services, including AWS Glue Data Catalog, AWS Glue Crawlers, AWS Glue Jobs, AWS Glue Studio, Amazon Athena, Amazon Relational Database Service (Amazon RDS), and Amazon S3.

We will catalog and move data from three separate data sources into our Amazon S3-based data lake. Once in the data lake, we will perform ETL (or more accurately ELT) on the raw data — cleansing, augmenting, and preparing it for data analytics. Finally, we will perform aggregations on the refined data and write those final datasets back to our data lake. The data lake will be organized around the data lake pattern of bronze (aka raw), silver (aka refined), and gold (aka aggregated) data, popularized by Databricks.

Architecture and workflow demonstrated in the video

Demonstration

For best results, view at 1080p HD on YouTube

Source Code

The source code for this demonstration, including the SQL statements, is open-sourced and located on GitHub.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Video Demonstration: Ahana Cloud for Presto on AWS using Apache Hive and AWS Glue

Using Ahana Cloud for Presto to perform analytics on AWS using both Apache Hive and AWS Glue as metastores

Introduction

The following series of five videos are an extended version of the demonstration featured in the October 2021 webinar, Build an Open Data Lake on AWS with Presto. An on-demand copy of the live webinar is available on Ahana.io, featuring Dipti Borkar (Ahana Co-Founder and CPO) and I.

In the demonstration, we will build a data lake on AWS using a combination of Ahana Cloud for Presto, Apache Hive, Apache Superset, Amazon S3, AWS Glue, and Amazon Athena. We then analyze the data in Apache Superset using Ahana Cloud for Presto.

Build an Open Data Lake on AWS with Presto

Demonstration

The demonstration is divided into five YouTube videos (playlist):

Ahana Cloud for Presto Demo — Part 1/5: Public GitHub Resources

Ahana Cloud for Presto Demo — Part 2/5: MoMa Datasource

Ahana Cloud for Presto Demo — Part 3/5: Ahana SaaS

Ahana Cloud for Presto Demo — Part 4/5: AWS Glue & Amazon

Ahana Cloud for Presto Demo — Part 5/5: PrestoDB & Apache Hive

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. In the webinar and the videos, the Apache Hive and AWS Glue data catalog tables contain an _athena or _presto suffix. For clarity, in the source code, I have changed those to indicate the metastore they are associated with, _hive or _glue, since either set of tables can be queried Presto. Additionally, in the webinar and the videos, the raw data files were uploaded to Amazon S3 in uncompressed CSV format; this is unnecessary. The CTAS SQL statements both expect GZIP-compressed CSV files. To save time and cost, upload the compressed files, as they are, to Amazon S3.

The following files are used in the demonstration:


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , , , ,

Leave a comment

Stream Processing with Apache Spark, Kafka, Avro, and Apicurio Registry on Amazon EMR and Amazon MSK

Using a registry to decouple schemas from messages in an event streaming analytics architecture

Introduction

In the last post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR, we learned about Apache Spark and Spark Structured Streaming on Amazon EMR (fka Amazon Elastic MapReduce) with Amazon Managed Streaming for Apache Kafka (Amazon MSK). We consumed messages from and published messages to Kafka using both batch and streaming queries. In that post, we serialized and deserialized messages to and from JSON using schemas we defined as a StructType (pyspark.sql.types.StructType) in each PySpark script. Likewise, we constructed similar structs for CSV-format data files we read from and wrote to Amazon S3.

schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", TimestampType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
])

In this follow-up post, we will read and write messages to and from Amazon MSK in Apache Avro format. We will store the Avro-format Kafka message’s key and value schemas in Apicurio Registry and retrieve the schemas instead of hard-coding the schemas in the PySpark scripts. We will also use the registry to store schemas for CSV-format data files.

Note the addition of the registry to the architecture for this post’s demonstration

Video Demonstration

In addition to this post, there is now a video demonstration available on YouTube.

For best results, view at 1080p HD on YouTube

Technologies

In the last post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR, we learned about Apache Spark, Apache Kafka, Amazon EMR, and Amazon MSK.

In a previous post, Hydrating a Data Lake using Log-based Change Data Capture (CDC) with Debezium, Apicurio, and Kafka Connect on AWS, we explored Apache Avro and Apicurio Registry.

Apache Spark

Apache Spark, according to the documentation, is a unified analytics engine for large-scale data processing. Spark provides high-level APIs in Java, Scala, Python (PySpark), and R. Spark provides an optimized engine that supports general execution graphs (aka directed acyclic graphs or DAGs). In addition, Spark supports a rich set of higher-level tools, including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Interest over time in Apache Spark and PySpark compared to Hive and Presto, according to Google Trends

Spark Structured Streaming

Spark Structured Streaming, according to the documentation, is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end, exactly-once stream processing without the user having to reason about streaming.

Apache Avro

Apache Avro describes itself as a data serialization system. Apache Avro is a compact, fast, binary data format similar to Apache Parquet, Apache Thrift, MongoDB’s BSON, and Google’s Protocol Buffers (protobuf). However, Apache Avro is a row-based storage format compared to columnar storage formats like Apache Parquet and Apache ORC.

Undecoded Avro-format messages with their keys and values shown in non-human readable binary format

Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. According to the documentation, schemas permit each datum to be written with no per-value overheads, making serialization fast and small. Schemas also facilitate use with dynamic scripting languages since data, together with its schema, is fully self-describing.

Interest over time in Apache Avro compared to Parquet and ORC, according to Google Trends

Apicurio Registry

We can decouple the data from its schema by using schema registries such as Confluent Schema Registry or Apicurio Registry. According to Apicurio, in a messaging and event streaming architecture, data published to topics and queues must often be serialized or validated using a schema (e.g., Apache Avro, JSON Schema, or Google Protocol Buffers). Of course, schemas can be packaged in each application. Still, it is often a better architectural pattern to register schemas in an external system [schema registry] and then reference them from each application.

It is often a better architectural pattern to register schemas in an external system and then reference them from each application.

Amazon EMR

According to AWS documentation, Amazon EMR (fka Amazon Elastic MapReduce) is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hadoop, Hive, HBase, Flink, Hudi, and Presto. Amazon EMR is a fully managed AWS service that makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters.

Amazon EMR on EKS, a deployment option for Amazon EMR since December 2020, allows you to run Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). With the EKS deployment option, you can focus on running analytics workloads while Amazon EMR on EKS builds, configures, and manages containers for open-source applications.

If you are new to Amazon EMR for Spark, specifically PySpark, I recommend a recent two-part series of posts, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

Apache Kafka

According to the documentation, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Amazon MSK

Apache Kafka clusters are challenging to set up, scale, and manage in production. According to AWS documentation, Amazon MSK is a fully managed AWS service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning and analytics applications.

Prerequisites

Similar to the previous post, this post will focus primarily on configuring and running Apache Spark jobs on Amazon EMR. To follow along, you will need the following resources deployed and configured on AWS:

  1. Amazon S3 bucket (holds all Spark/EMR resources);
  2. Amazon MSK cluster (using IAM Access Control);
  3. Amazon EKS container or an EC2 instance with the Kafka APIs installed and capable of connecting to Amazon MSK;
  4. Amazon EKS container or an EC2 instance with Apicurio Registry installed and capable of connecting to Amazon MSK (if using Kafka for backend storage) and being accessed by Amazon EMR;
  5. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true; this setting is false by default;

The architectural diagram below shows that the demonstration uses three separate VPCs within the same AWS account and AWS Region us-east-1, for Amazon EMR, Amazon MSK, and Amazon EKS. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports and the corresponding CIDR ranges within your Amazon EMR, Amazon MSK, and Amazon EKS Security Groups. For additional security and cost savings, use a VPC endpoint for private communications between Amazon EMR and Amazon S3.

High-level architecture for this post’s demonstration

Source Code

All source code for this post and the three previous posts in the Amazon MSK series, including the Python and PySpark scripts demonstrated herein, are open-sourced and located on GitHub.

Objective

We will run a Spark Structured Streaming PySpark job to consume a simulated event stream of real-time sales data from Apache Kafka. Next, we will enrich (join) that sales data with the sales region and aggregate the sales and order volumes by region within a sliding event-time window. Next, we will continuously stream those aggregated results back to Kafka. Finally, a batch query will consume the aggregated results from Kafka and display the sales results in the console.

DataOps pipeline demonstrated in this post

Kafka messages will be written in Apache Avro format. The schemas for the Kafka message keys and values and the schemas for the CSV-format sales and sales regions data will all be stored in Apricurio Registry. The Python and PySpark scripts will use Apricurio Registry’s REST API to read, write, and manage the Avro schema artifacts.

We are writing the Kafka message keys in Avro format and storing an Avro key schema in the registry. This is only done for demonstration purposes and not a requirement. Kafka message keys are not required, nor is it necessary to store both the key and the value in a common format of Avro in Kafka.

Schema evolution, compatibility, and validation are important considerations, but out of scope for this post.

PySpark Scripts

PySpark, according to the documentation, is an interface for Apache Spark in Python. PySpark allows you to write Spark applications using the Python API. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core. There are three PySpark scripts and one new helper Python script covered in this post:

  1. 10_create_schemas.py: Python script creates all Avro schemas in Apricurio Registry using the REST API;
  2. 11_incremental_sales_avro.py: PySpark script simulates an event stream of sales data being published to Kafka over 15–20 minutes;
  3. 12_streaming_enrichment_avro.py: PySpark script uses a streaming query to read messages from Kafka in real-time, enriches sales data, aggregates regional sales results, and writes results back to Kafka as a stream;
  4. 13_batch_read_results_avro.py: PySpark script uses a batch query to read aggregated regional sales results from Kafka and display them in the console;

Preparation

To prepare your Amazon EMR resources, review the instructions in the previous post, Getting Started with Spark Structured Streaming and Kafka on AWS using Amazon MSK and Amazon EMR. Here is a recap, with a few additions required for this post.

Amazon S3

We will start by gathering and copying the necessary files to your Amazon S3 bucket. The bucket will serve as the location for the Amazon EMR bootstrap script, additional JAR files required by Spark, PySpark scripts, and CSV-format data files.

There are a set of additional JAR files required by the Spark jobs we will be running. Download the JARs from Maven Central and GitHub, and place them in the emr_jars project directory. The JARs will include AWS MSK IAM Auth, AWS SDK, Kafka Client, Spark SQL for Kafka, Spark Streaming, and other dependencies. Compared to the last post, there is one additional JAR for Avro.

Update the SPARK_BUCKET environment variable, then upload the JARs, PySpark scripts, sample data, and EMR bootstrap script from your local copy of the GitHub project repository to your Amazon S3 bucket using the AWS s3 API.

cd ./pyspark/
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
aws s3 cp emr_jars/ "s3://${SPARK_BUCKET}/jars/" –recursive
aws s3 cp pyspark_scripts/ "s3://${SPARK_BUCKET}/spark/" –recursive
aws s3 cp emr_bootstrap/ "s3://${SPARK_BUCKET}/spark/" –recursive
aws s3 cp data/ "s3://${SPARK_BUCKET}/spark/" –recursive
view raw copy_to_s3.sh hosted with ❤ by GitHub

Amazon EMR

The GitHub project repository includes a sample AWS CloudFormation template and an associated JSON-format CloudFormation parameters file. The CloudFormation template, stack.yml, accepts several environment parameters. To match your environment, you will need to update the parameter values such as SSK key, Subnet, and S3 bucket. The template will build a minimally-sized Amazon EMR cluster with one master and two core nodes in an existing VPC. You can easily modify the template and parameters to meet your requirements and budget.

aws cloudformation deploy \
--stack-name spark-kafka-demo-dev \
--template-file ./cloudformation/stack.yml \
--parameter-overrides file://cloudformation/dev.json \
--capabilities CAPABILITY_NAMED_IAM

The CloudFormation template has two essential Spark configuration items — the list of applications to install on EMR and the bootstrap script deployment.

Applications:
Name: 'Hadoop'
Name: 'Spark'
Name: 'JupyterEnterpriseGateway'
Name: 'Livy'
BootstrapActions:
Name: bootstrap-script
ScriptBootstrapAction:
Path: !Join [ '', [ 's3://', !Ref ProjectBucket, '/spark/bootstrap_actions.sh' ] ]

Below, we see the EMR bootstrap shell script, bootstrap_actions.sh.

#!/bin/bash
# Purpose: EMR bootstrap script
# Author: Gary A. Stafford
# Date: 2021-09-10
# arg passed in by CloudFormation
if [ $# -eq 0 ]
then
echo "No arguments supplied"
fi
SPARK_BUCKET=$1
# update yum packages, install jq
sudo yum update -y
sudo yum install -y jq
# jsk truststore for connecting to msk
sudo cp /usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/jre/lib/security/cacerts \
/tmp/kafka.client.truststore.jks
# set region for boto3
aws configure set region \
"$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)"
# install python packages for pyspark scripts
sudo python3 -m pip install boto3 botocore ec2-metadata
# install required jars for spark
sudo aws s3 cp \
"s3://${SPARK_BUCKET}/jars/" /usr/lib/spark/jars/ \
–recursive –exclude "*" –include "*.jar"

The bootstrap script performed several tasks, including deploying the additional JAR files we copied to Amazon S3 earlier to EMR cluster nodes.

Amazon EMR cluster ‘bootstrap actions’ tab

Parameter Store

The PySpark scripts in this demonstration will obtain configuration values from the AWS Systems Manager (AWS SSM) Parameter Store. Configuration values include a list of Amazon MSK bootstrap brokers, the Amazon S3 bucket that contains the EMR/Spark assets, and the Apicurio Registry REST API base URL. Using the Parameter Store ensures that no sensitive or environment-specific configuration is hard-coded into the PySpark scripts. Modify and execute the ssm_params.sh script to create the AWS SSM Parameter Store parameters.

aws ssm put-parameter \
–name /kafka_spark_demo/kafka_servers \
–type String \
–value "<b-1.your-brokers.kafka.us-east-1.amazonaws.com:9098,b-2…>" \
–description "Amazon MSK Kafka broker list" \
–overwrite
aws ssm put-parameter \
–name /kafka_spark_demo/kafka_demo_bucket \
–type String \
–value "<your-bucket-111222333444-us-east-1>" \
–description "Amazon S3 bucket" \
–overwrite
aws ssm put-parameter \
–name /kafka_spark_demo/schema_resistry_url_int \
–type String \
–value "http://<your_host&gt;:<your_port>" \
–description "Apicurio Registry REST API base URL (Internal Address)" \
–overwrite
view raw ssm_params.sh hosted with ❤ by GitHub

Create Schemas in Apricurio Registry

To create the schemas necessary for this demonstration, a Python script is included in the project, 10_create_schemas.py. The script uses Apricurio Registry’s REST API to create six new Avro-based schema artifacts.

Apricurio Registry supports several common artifact types, including AsyncAPI specification, Apache Avro schema, GraphQL schema, JSON Schema, Apache Kafka Connect schema, OpenAPI specification, Google protocol buffers schema, Web Services Definition Language, and XML Schema Definition. We will use the registry to store Avro schemas for use with Kafka and CSV data sources and sinks.

Although Apricurio Registry does not support CSV Schema, we can store the schemas for the CSV-format sales and sales region data in the registry as JSON-format Avro schemas.

{
"name": "Sales",
"type": "record",
"doc": "Schema for CSV-format sales data",
"fields": [
{
"name": "payment_id",
"type": "int"
},
{
"name": "customer_id",
"type": "int"
},
{
"name": "amount",
"type": "float"
},
{
"name": "payment_date",
"type": "string"
},
{
"name": "city",
"type": [
"string",
"null"
]
},
{
"name": "district",
"type": [
"string",
"null"
]
},
{
"name": "country",
"type": "string"
}
]
}

We can then retrieve the JSON-format Avro schema from the registry, convert it to PySpark StructType, and associate it to the DataFrame used to persist the sales data from the CSV files.

root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- amount: float (nullable = true)
|-- payment_date: string (nullable = true)
|-- city: string (nullable = true)
|-- district: string (nullable = true)
|-- country: string (nullable = true)

Using the registry allows us to avoid hard-coding the schema as a StructType in the PySpark scripts in advance.

# Purpose: Create Avro schemas in Apicurio Registry.
# Author: Gary A. Stafford
# Date: 2021-09-28
import json
import os
import boto3
import requests
params = {}
os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
ssm_client = boto3.client("ssm")
def main():
global params
params = get_parameters()
artifact_id = "pagila.sales.csv"
data = '''{"name":"Sales","type":"record",
"doc":"Schema for CSV-format sales data",
"fields":[
{"name":"payment_id","type":"int"},
{"name":"customer_id","type":"int"},
{"name":"amount","type":"float"},
{"name":"payment_date","type":"string"},
{"name":"city","type":["string","null"]},
{"name":"district","type":["string","null"]},
{"name":"country","type":"string"}]}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.regions.csv"
data = '''{"name":"Regions","type":"record",
"doc":"Schema for CSV-format sales regions data",
"fields":[
{"name":"country","type":"string"},
{"name":"region","type":"string"}]}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.avro-key"
data = '''{"name":"Key","type":"int",
"doc":"Schema for pagila.sales.avro Kafka topic key"}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.avro-value"
data = '''{"name":"Value","type":"record",
"doc":"Schema for pagila.sales.avro Kafka topic value",
"fields":[
{"name":"payment_id","type":"int"},
{"name":"customer_id","type":"int"},
{"name":"amount","type":"float"},
{"name":"payment_date","type":"long","logicalType":"timestamp-millis"},
{"name":"city","type":["string","null"]},
{"name":"district","type":["string","null"]},
{"name":"country","type":"string"}]}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.summary.avro-key"
data = '''{"name":"Key","type":"int",
"doc":"Schema for pagila.sales.summary.avro Kafka topic key"}'''
create_schema(artifact_id, data)
artifact_id = "pagila.sales.summary.avro-value"
data = '''{"name":"Value","type":"record",
"doc":"Schema for pagila.sales.summary.avro Kafka topic value",
"fields":[
{"name":"region","type":"string"},
{"name":"sales","type":"float"},
{"name":"orders","type":"int"},
{"name":"window_start","type":"long","logicalType":"timestamp-millis"},
{"name":"window_end","type":"long","logicalType":"timestamp-millis"}]}'''
create_schema(artifact_id, data)
def create_schema(artifact_id, data):
"""Delete existing Avro schema, create new schema, and retrieve the schema"""
delete_schema(artifact_id)
print(json.dumps(json.loads(post_schema(artifact_id, data)), indent=4))
print(json.dumps(json.loads(get_schema(artifact_id)), indent=4))
def post_schema(artifact_id, data):
"""Post Avro schema to Apicurio Registry"""
response = requests.post(
url=f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts",
data=data,
headers={"X-Registry-ArtifactId": artifact_id})
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def get_schema(artifact_id):
"""Get Avro schema from Apicurio Registry"""
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def delete_schema(artifact_id):
"""Delete Avro schema from Apicurio Registry"""
try:
response = requests.delete(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
return response.content.decode("utf-8")
except:
return f"Schema not found: {artifact_id}"
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
parameters = {
"schema_registry_url": ssm_client.get_parameter(
Name="/kafka_spark_demo/schema_registry_url_int")["Parameter"]["Value"],
}
return parameters
if __name__ == "__main__":
main()

Add the PySpark script as an EMR Step. EMR will run the Python script the same way it runs PySpark jobs.

export CLUSTER_ID="<your-cluster-id>"
export SPARK_BUCKET="<your-bucket-111222333444-us-east-1>"
aws emr add-steps \
–cluster-id ${CLUSTER_ID} \
–steps """Type=Spark,Name='create-schemas',ActionOnFailure=CONTINUE,
Args=[s3a://${SPARK_BUCKET}/spark/10_create_schemas.py]"""

The Python script creates six schema artifacts in Apricurio Registry, shown below in Apricurio Registry’s browser-based user interface. Schemas include two key/value pairs for two Kafka topics and two for CSV-format sales and sales region data.

Artifacts in Apricurio Registry’s browser-based UI

You have the option of enabling validation and compatibility rules for each schema with Apricurio Registry.

Content Rules options in Apricurio Registry’s browser-based UI

Each Avro schema artifact is stored as a JSON object in the registry.

Detailed view of Avro schema as JSON in Apricurio Registry’s browser-based UI

Simulate Sales Event Stream

Next, we will simulate an event stream of sales data published to Kafka over 15–20 minutes. The PySpark script, 11_incremental_sales_avro.py, reads 1,800 sales records into a DataFrame (pyspark.sql.DataFrame) from a CSV file located in S3. The script then takes each Row (pyspark.sql.Row) of the DataFrame, one row at a time, and writes them to the Kafka topic, pagila.sales.avro, adding a slight delay between each write.

# Purpose: Write sales data from CSV to a new Kafka topic in Avro format.
# Use a delay between each message to simulate an event stream of sales data.
# Author: Gary A. Stafford
# Date: 2021-09-28
import os
import time
import boto3
import pyspark.sql.functions as F
import requests
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import to_avro
from pyspark.sql.types import LongType
sink_topic = "pagila.sales.avro"
# 1800 messages * .75 second delay = ~22.5 minutes added latency
delay_between_messages = 0.75
params = {}
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
global params
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-incremental-sales") \
.getOrCreate()
csv_sales_schema = get_schema("pagila.sales.csv")
schema = struct_from_json(spark, csv_sales_schema)
df_sales = read_from_csv(spark, "sales_incremental_large.csv", schema, "|")
df_sales.show(5, truncate=False)
write_to_kafka(spark, df_sales)
def write_to_kafka(spark, df_sales):
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
sink_topic,
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
sales_schema_key = get_schema("pagila.sales.avro-key")
sales_schema_value = get_schema("pagila.sales.avro-value")
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message \
.drop("payment_date") \
.withColumn("payment_date",
F.unix_timestamp(F.current_timestamp()).cast(LongType())) \
.select(to_avro("customer_id", sales_schema_key).alias("key"),
to_avro(F.struct("*"), sales_schema_value).alias("value")) \
.write \
.format("kafka") \
.options(**options_write) \
.save()
time.sleep(delay_between_messages)
# ***** utility methods *****
def read_from_csv(spark, source_data, schema, sep):
"""Read CSV data from S3"""
df = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{source_data}",
schema=schema, header=True, sep=sep)
return df
def struct_from_json(spark, json_format_schema):
"""Returns a schema as a pyspark.sql.types.StructType from Avro schema"""
df = spark \
.read \
.format("avro") \
.option("avroSchema", json_format_schema) \
.load()
df.printSchema()
return df.schema
def get_schema(artifact_id):
"""Get Avro schema from Apicurio Registry"""
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
parameters = {
"kafka_servers": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_servers")["Parameter"]["Value"],
"kafka_demo_bucket": ssm_client.get_parameter(
Name="/kafka_spark_demo/kafka_demo_bucket")["Parameter"]["Value"],
"schema_registry_url": ssm_client.get_parameter(
Name="/kafka_spark_demo/schema_registry_url_int")["Parameter"]["Value"],
}
return parameters
if __name__ == "__main__":
main()

The PySpark scripts first retrieve the JSON-format Avro schema for the CSV data from Apricurio Registry using the Python requests module and Apricurio Registry’s REST API (get_schema()).

{
"name": "Sales",
"type": "record",
"doc": "Schema for CSV-format sales data",
"fields": [
{
"name": "payment_id",
"type": "int"
},
{
"name": "customer_id",
"type": "int"
},
{
"name": "amount",
"type": "float"
},
{
"name": "payment_date",
"type": "string"
},
{
"name": "city",
"type": [
"string",
"null"
]
},
{
"name": "district",
"type": [
"string",
"null"
]
},
{
"name": "country",
"type": "string"
}
]
}

The script then creates a StructType from the JSON-format Avro schema using an empty DataFrame (struct_from_json()). Avro column types are converted to Spark SQL types. The only apparent issue is how Spark mishandles the nullable value for each column. Recognize, column nullability in Spark is an optimization statement, not an enforcement of the object type.

root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- amount: float (nullable = true)
|-- payment_date: string (nullable = true)
|-- city: string (nullable = true)
|-- district: string (nullable = true)
|-- country: string (nullable = true)

The resulting StructType is used to read the CSV data into a DataFrame (read_from_csv()).

csv_sales_schema = get_schema("pagila.sales.csv")
schema = struct_from_json(spark, csv_sales_schema)
df_sales = read_from_csv(spark, "sales_incremental_large.csv", schema, "|")
write_to_kafka(spark, df_sales)
def get_schema(artifact_id):
"""Get Avro schema from Apicurio Registry"""
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def struct_from_json(spark, json_format_schema):
"""Returns a schema as a pyspark.sql.types.StructType from Avro schema"""
df = spark \
.read \
.format("avro") \
.option("avroSchema", json_format_schema) \
.load()
df.printSchema()
return df.schema
def read_from_csv(spark, source_data, schema, sep):
"""Read CSV data from S3"""
df = spark.read \
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{source_data}",
schema=schema, header=True, sep=sep)
return df
Code snippet from PySpark script, 10_create_schemas.py

For Avro-format Kafka key and value schemas, we use the same method, get_schema(). The resulting JSON-format schemas are then passed to the to_avro() and from_avro() methods to read and write Avro-format messages to Kafka. Both methods are part of the pyspark.sql.avro.functions module. Avro column types are converted to and from Spark SQL types.

def get_schema(artifact_id):
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def write_to_kafka(spark, df_sales):
sales_schema_key = get_schema("pagila.sales.avro-key")
sales_schema_value = get_schema("pagila.sales.avro-value")
df_message \
.select(to_avro("customer_id", sales_schema_key).alias("key"),
to_avro(F.struct("*"), sales_schema_value).alias("value")) \
.write \
.format("kafka") \
.options(**options_write) \
.save()
Code snippet from PySpark script, 11_incremental_sales_avro.py

We must run this PySpark script, 11_incremental_sales_avro.py, concurrently with the PySpark script, 12_streaming_enrichment_avro.py, to simulate an event stream. We will start both scripts in the next part of the post.

Stream Processing with Structured Streaming

The PySpark script, 12_streaming_enrichment_avro.py, uses a streaming query to read sales data messages from the Kafka topic, pagila.sales.avro, in real-time, enriches the sales data, aggregates regional sales results, and writes the results back to Kafka in micro-batches every two minutes.

# Purpose: Streaming read from Kafka topic in Avro format. Enrich and aggregate
# current sales by sales region to second Kafka topic every n minutes.
# Author: Gary A. Stafford
# Date: 2021-09-28
import os
import boto3
import pyspark.sql.functions as F
import requests
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.types import IntegerType, FloatType, LongType
source_topic = "pagila.sales.avro"
sink_topic = "pagila.sales.summary.avro"
params = {}
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
global params
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-streaming-sales") \
.getOrCreate()
csv_sales_regions_schema = get_schema("pagila.sales.regions.csv")
schema = struct_from_json(spark, csv_sales_regions_schema)
df_regions = read_from_csv(spark, "sales_regions.csv", schema, ",")
df_regions.cache()
df_regions.show(5, truncate=False)
df_sales = read_from_kafka(spark)
summarize_sales(df_sales, df_regions)
def read_from_kafka(spark):
sales_schema_value = get_schema("pagila.sales.avro-value")
options_read = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"subscribe":
source_topic,
"startingOffsets":
"earliest",
"kafka.ssl.truststore.location":
"/tmp/kafka.client.truststore.jks",
"kafka.security.protocol":
"SASL_SSL",
"kafka.sasl.mechanism":
"AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.sasl.client.callback.handler.class":
"software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
df_sales = spark.readStream \
.format("kafka") \
.options(**options_read) \
.load() \
.select(from_avro("value", sales_schema_value).alias("data"), "timestamp") \
.select("data.*", "timestamp")
return df_sales
def summarize_sales(df_sales, df_regions):
sales_summary_key = get_schema("pagila.sales.summary.avro-key")
sales_summary_value = get_schema("pagila.sales.summary.avro-value")
options_write = {
"kafka.bootstrap.servers":
params["kafka_servers"],
"topic":
sink_topic,
"kafka.ssl.truststore.location":