Archive for category Bash Scripting

End-to-End Data Discovery, Observability, and Governance on AWS with LinkedIn’s Open-source DataHub

Use DataHub’s data catalog capabilities to collect, organize, enrich, and search for metadata across multiple platforms

Introduction

According to Shirshanka Das, Founder of LinkedIn DataHub, Apache Gobblin, and Acryl Data, one of the simplest definitions for a data catalog can be found on the Oracle website: “Simply put, a data catalog is an organized inventory of data assets in the organization. It uses metadata to help organizations manage their data. It also helps data professionals collect, organize, access, and enrich metadata to support data discovery and governance.

Another succinct description of a data catalog’s purpose comes from Alation: “a collection of metadata, combined with data management and search tools, that helps analysts and other data users to find the data that they need, serves as an inventory of available data, and provides information to evaluate the fitness of data for intended uses.

Working with many organizations in the area of Analytics, one of the more common requests I receive regards choosing and implementing a data catalog. Organizations have datasources hosted in corporate data centers, on AWS, by SaaS providers, and with other Cloud Service Providers. Several of these organizations have recently gravitated to DataHub, the open-source metadata platform for the modern data stack, originally developed by LinkedIn.

View of DataHub’s home screen showing a variety of datasources

In this post, we will explore the capabilities of DataHub to build a centralized data catalog on AWS for datasources hosted in multiple AWS accounts, SaaS providers, cloud service providers, and corporate data centers. I will demonstrate how to build a DataHub data catalog using out-of-the-box data source plugins for automated metadata ingestion.

Another example of searching for cataloged entities in DataHub’s browser-based UI

Data Catalog Competitors

Data catalogs are not new; technologies such as data dictionaries have been around as far back as the 1980’s. Gartner publishes their Metadata Management (EMM) Solutions Reviews and Ratings and Metadata Management Magic Quadrant. These reports contain a comprehensive list of traditional commercial enterprise players, modern cloud-native SaaS vendors, and Cloud Service Provider (CSP) offerings. DBMS Tools also hosts a comprehensive list of 30 data catalogs. A sampling of current data catalogs includes:

Open Source Software

Commercial

Cloud Service Providers

Data Catalog Features

DataHub describes itself as “a modern data catalog built to enable end-to-end data discovery, data observability, and data governance.” Sorting through vendor’s marketing jargon and hype, standard features of leading data catalogs include:

  • Metadata ingestion
  • Data discovery
  • Data governance
  • Data observability
  • Data lineage
  • Data dictionary
  • Data classification
  • Usage/popularity statistics
  • Sensitive data handling
  • Data fitness (aka data quality or data profiling)
  • Manage both technical and business metadata
  • Business glossary
  • Tagging
  • Natively supported datasource integrations
  • Advanced metadata search
  • Fine-grain authentication and authorization
  • UI- and API-based interaction

Datasources

When considering a data catalog solution, in my experience, the most common datasources that customers want to discover, inventory, and search include:

  • Relational databases and other OLTP datasources such as PostgreSQL, MySQL, Microsoft SQL Server, and Oracle
  • Cloud Data Warehouses and other OLAP datasources such as Amazon Redshift, Snowflake, and Google BigQuery
  • NoSQL datasources such as MongoDB, MongoDB Atlas, and Azure Cosmos DB
  • Persistent event-streaming platforms such as Apache Kafka (Amazon MSK and Confluent)
  • Distributed storage datasets (e.g., Data Lakes) such as Amazon S3, Apache Hive, and AWS Glue Data Catalogs
  • Business Intelligence (BI), dashboards, and data visualization sources such as Looker, Tableau, and Microsoft Power BI
  • ETL sources, such as Apache Spark, Apache Airflow, Apache NiFi, and dbt

DataHub on AWS

DataHub’s convenient AWS setup guide covers options to deploy DataHub to AWS. For this post, I have hosted DataHub on Kubernetes, using Amazon Elastic Kubernetes Service (Amazon EKS). Alternately, you could choose Google Kubernetes Engine (GKE) on Google Cloud or Azure Kubernetes Service (AKS) on Microsoft Azure.

Conveniently, DataHub offers a Helm chart, making deployment to Kubernetes straightforward. Furthermore, Helm charts are easily integrated with popular CI/CD tools. For this post, I’ve used ArgoCD, the declarative GitOps continuous delivery tool for Kubernetes, to deploy the DataHub Helm charts to Amazon EKS.

ArgoCD UI showing DataHub and its dependencies deployed to Amazon EKS

According to the documentation, DataHub consists of four main components: GMS, MAE Consumer (optional), MCE Consumer (optional), and Frontend. Kubernetes deployment for each of the components is defined as sub-charts under the main DataHub Helm chart.

External Storage Layer Dependencies

Four external storage layer dependencies power the main DataHub components: Kafka, Local DB (MySQL, Postgres, or MariaDB), Search Index (Elasticsearch), and Graph Index (Neo4j or Elasticsearch). DataHub has provided a separate DataHub Prerequisites Helm chart for the dependencies. The dependencies must be deployed before deploying DataHub.

Alternately, you can substitute AWS managed services for the external storage layer dependencies, which is also detailed in the Deploying to AWS documentation. AWS managed service dependency substitutions include Amazon RDS for MySQL, Amazon OpenSearch (fka Amazon Elasticsearch), and Amazon Managed Streaming for Apache Kafka (Amazon MSK). According to DataHub, support for using AWS Neptune as the Graph Index is coming soon.

DataHub CLI and Plug-ins

DataHub comes with the datahub CLI, allowing you to perform many common operations on the command line. You can install and use the DataHub CLI within your development environment or integrate it with your CI/CD tooling.

Available DataHub CLI commands

DataHub uses a plugin architecture. Plugins allow you to install only the datasource dependencies you need. For example, if you want to ingest metadata from Amazon Athena, just install the Athena plugin: pip install 'acryl-datahub[athena]'. DataHub Source, Sink, and Transformer plugins can be displayed using the datahub check plugins CLI command.

Example list of DataHub Source plugins installed
Example list of DataHub Sink and Transformer plugins installed

Secure Metadata Ingestion

Often, datasources are not externally accessible for security reasons. Further, many datasources may not be accessible to individual users, especially in higher environments like UAT, Staging, and Production. They are only accessible to applications or CI/CD tooling. To overcome these limitations when extracting metadata with DataHub, I prefer to perform my DataHub-related development and testing locally but execute all DataHub ingestion securely on AWS.

In my local development environment, I use JetBrains PyCharm to author the Python and YAML-based DataHub configuration files and ingestion pipeline recipes, then commit those files to git and push them to a private GitHub repository. Finally, I use GitHub Actions to test DataHub files.

To run DataHub ingestion jobs and push the results to DataHub running in Kubernetes on Amazon EKS, I have built a custom Python-based Docker container. The container runs the DataHub CLI, required DataHub plugins, and any additional Python dependencies. The container’s pod has the appropriate AWS IAM permissions, using IAM Roles for Service Accounts (IRSA), to securely access datasources to ingest and the DataHub application.

Schedule and Monitor Pipelines

Scheduling and managing multiple metadata ingestion jobs on AWS is best handled with Apache Airflow with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Ingestion jobs run as Airflow DAG tasks, which call the EKS-based DataHub CLI container. With MWAA, datasource connections, credentials, and other sensitive configurations can be kept secure and not be exposed externally or in plain text.

When running the ingestion pipelines on AWS with DataHub, all communications between AWS-based datasources, ingestion jobs running in Airflow, and DataHub, should use secure private IP addressing and DNS resolution instead of transferring metadata over the Internet. Make sure to create all the necessary VPC peering connections, network route table configurations, and VPC endpoints to connect all relevant services.

SaaS services such as Snowflake or MongoDB Atlas, services provided by other Cloud Service Providers such as Google Cloud and Microsoft Azure, and datasources in corporate datasources require alternate networking and security strategies to access metadata securely.

AWS-based DataHub high-level architecture

Markup or Code?

According to the documentation, a DataHub recipe is a configuration file that tells ingestion scripts where to pull data from (source) and where to put it (sink). Recipes normally contain a source, sink, and transformers configuration section. Mark-up language-based job automation written in YAML, JSON, or Domain Specific Languages (DSLs) is often an alternative to writing code. DataHub recipes can be written in YAML. The example recipe shown below is used to ingest metadata from an Amazon RDS for PostgreSQL database, running on AWS.

YAML-based recipes can also use automatic environment variable expansion for convenience, automation, and security. It is considered best practice to secure sensitive configuration values, such as database credentials, in a secure location and reference them as environment variables. For example, note the server: ${DATAHUB_REST_ENDPOINT} entry in the sink section below. The DATAHUB_REST_ENDPOINT environment variable is set ahead of time and re-used for all ingestion jobs. Sensitive database connection information has also been variablized and stored separately.

# Purpose: DataHub example recipe for PostgreSQL datasource
# Author: Gary A. Stafford
# Date: March 2022
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/postgres
source:
type: postgres
config:
# Coordinates
host_port: ${DB_HOST_PORT}
database: tickit
# Credentials
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
# Options
profiling:
enabled: true
# Environment
env: DEV
# see https://datahubproject.io/docs/metadata-ingestion/transformers/#adding-a-set-of-tags
transformers:
type: "simple_add_dataset_tags"
config:
tag_urns:
"urn:li:tag:AWS"
"urn:li:tag:${ACCOUNT_ID}"
"urn:li:tag:us-east-1"
type: "pattern_add_dataset_terms"
config:
term_pattern:
rules:
".*users.*": ["urn:li:glossaryTerm:Classification.Sensitive"]
type: "simple_add_dataset_ownership"
config:
owner_urns:
"urn:li:corpuser:Database Administrators"
ownership_type: "DATAOWNER"
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
config:
server: ${DATAHUB_REST_ENDPOINT}
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/reporting_telemetry/
pipeline_name: "postgres-pipeline-tickit"
reporting:
type: "datahub"
config:
datahub_api:
server: ${DATAHUB_REST_ENDPOINT}

Using Python

You can configure and run a pipeline entirely from within a custom Python script using DataHub’s Python API as an alternative to YAML. Below, we see two nearly identical ingestion recipes to the YAML above, written in Python. Writing ingestion pipeline logic programmatically gives you increased flexibility for automation, error checking, unit-testing, and notification. Below is a basic pipeline written in Python. The code is functional, but not very Pythonic, secure, scalable, or Production ready.

# Purpose: Simple programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
# Reference: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/library/programatic_pipeline.py
from datahub.ingestion.run.pipeline import Pipeline
# The pipeline configuration is similar to the recipe YAML files provided to the CLI tool.
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": "demo-instance.abcd1234.us-east-1.rds.amazonaws.com:5432",
"database": "tickit",
"username": "datahub",
"password": "My5up3r53cr3tPa55w0rd",
"env": "DEV",
"profiling": {
"enabled": "true"
}
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:AWS",
f"urn:li:tag:111222333444",
f"urn:li:tag:us-east-1"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:Database Administrators"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://192.168.111.222:33333"
}
}
}
)
# Run the pipeline and report the results.
pipeline.run()
pipeline.pretty_print_summary()

The second version of the same pipeline is more Production ready. The code is more Pythonic in nature and makes use of error checking, logging, and the AWS Systems Manager (SSM) Parameter Store. Like recipes written in YAML, environment variables can be used for convenience and security. In this example, commonly reused and sensitive connection configuration items have been extracted and placed in the SSM Parameter Store. Additional configuration is pulled from the environment, such as AWS Account ID and AWS Region. The script loads these values at runtime.

# Purpose: Programmatic DataHub pipline example
# Author: Gary A. Stafford
# Date: March 2022
import json
import logging
import boto3
from botocore.exceptions import ClientError
from datahub.ingestion.run.pipeline import Pipeline
logging.basicConfig(
format="[%(asctime)s] %(levelname)s – %(message)s", level=logging.INFO
)
def main():
sts_client = boto3.client("sts")
params = get_parameters()
params["owner"] = "Database Administrators"
params["environment"] = "DEV"
params["database"] = "tickit"
params["region"] = sts_client.meta.region_name
params["account"] = sts_client.get_caller_identity()["Account"]
logging.info(f"Params: {json.dumps(params, indent=4, sort_keys=True)}")
ingestion_pipeline = create_pipeline(params)
run_pipeline(ingestion_pipeline)
def create_pipeline(params) -> Pipeline:
"""Constructs a Pipeline for a PostgreSQL Source and a DataHub Sink
:return: instance of datahub.ingestion.run.pipeline
"""
pipeline = Pipeline.create(
{
"run_id": "postgres-run",
"source": {
"type": "postgres",
"config": {
"host_port": params.get("/datahub_demo/postgres_host_port_tickit"),
"database": params.get("database"),
"username": params.get("/datahub_demo/postgres_username_tickit"),
"password": params.get("/datahub_demo/postgres_password_tickit"),
"profiling": {
"enabled": "true"
},
"env": params.get("environment"),
}
},
"transformers": [
{
"type": "simple_add_dataset_tags",
"config": {
"tag_urns": [
f"urn:li:tag:{params.get('account')}",
f"urn:li:tag:{params.get('region')}"
]
}
},
{
"type": "pattern_add_dataset_terms",
"config": {
"term_pattern": {
"rules": {
".*users.*": [
"urn:li:glossaryTerm:Classification.Sensitive"
]
}
}
}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
f"urn:li:corpuser:{params.get('owner')}"
],
"ownership_type": "DATAOWNER"
}
}
],
"sink": {
"type": "datahub-rest",
"config": {
"server": params.get("/datahub_demo/datahub_rest_endpoint_public")
}
}
}
)
return pipeline
def run_pipeline(pipeline):
"""Runs the ingestion pipeline and prints summary of the results
:param pipeline: instance of datahub.ingestion.run.pipeline
:return:
"""
pipeline.run()
pipeline.pretty_print_summary()
def get_parameters() -> dict:
"""
Load parameter values from AWS Systems Manager (SSM) Parameter Store
:return: dict of parameter k/v's
"""
ssm_client = boto3.client("ssm")
params: dict = {}
try:
# make a single SSM API call for all parameters
response = ssm_client.get_parameters_by_path(
Path="/datahub_demo"
)
# create a dictionary of parameter k/v's
for param in response.get("Parameters"):
params[param["Name"]] = param["Value"]
logging.debug(f"Params: {params}")
except ClientError as e:
logging.error(e)
exit(1)
return params
if __name__ == '__main__':
main()

Sinking to DataHub

When syncing metadata to DataHub, you have two choices, the GMS REST API or Kafka. According to DataHub, the advantage of the REST-based interface is that any errors can immediately be reported. On the other hand, the advantage of the Kafka-based interface is that it is asynchronous and can handle higher throughput. For this post, I am DataHub’s REST API.

DataHub ingestion pipeline results for a Microsoft SQL Server datasource
Another example of a DataHub ingestion pipeline results for a Google BigQuery datasource

Column-level Metadata

In addition to column names and data types, it is possible to extract column descriptions and key types from certain datasources. Column descriptions, tags, and glossary terms can also be input through the DataHub UI. Below, we see an example of an Amazon Redshift fact table, whose table and column descriptions were ingested as part of the metadata.

Amazon Redshift fact table showing column-level metadata, tags, owners, and documentation

Business Glossary

DataHub can assign business glossary terms to entities. The DataHub Business Glossary plugin pulls business glossary metadata from a YAML-based configuration file.

# see sample: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/bootstrap_data/business_glossary.yml
version: 1
source: DataHub
owners:
users:
datahub
url: "https://github.com/datahub-project/datahub/"
nodes:
name: Classification
description: A set of terms related to Data Classification
terms:
name: Sensitive
description: Sensitive Data
custom_properties:
is_confidential: false
name: Confidential
description: Confidential Data
custom_properties:
is_confidential: true
name: HighlyConfidential
description: Highly Confidential Data
custom_properties:
is_confidential: true
name: PersonalInformation
description: All terms related to personal information
owners:
users:
datahub
terms:
name: ID
description: An individual's unqiue identifier
inherits:
Classification.Sensitive
name: Name
description: An individual's Name
inherits:
Classification.Sensitive
name: SSN
description: An individual's SSN
inherits:
Classification.Confidential
name: DriverLicense
description: An individual's Driver License ID
inherits:
Classification.Confidential
name: Email
description: An individual's email address
inherits:
Classification.Confidential
name: Address
description: A physical address
name: Gender
description: The gender identity of the individual
inherits:
Classification.Sensitive

Business glossary terms can be reviewed in the Glossary Terms tab of the DataHub’s UI. Below, we see the three terms associated with the Classification glossary node: Confidential, HighlyConfidential, and Sensitive.

Example of a related set of terms in DataHub’s Business Glossary

We can search for entities inventoried in DataHub using their assigned business glossary terms.

Dataset search results based on a term in DataHub’s Business Glossary

Finally, we see an example of an AWS Athena data catalog table with business glossary terms applied to columns within the table’s schema.

AWS Athena table showing column-level descriptions, glossary terms, tags, owners, and documentation

SQL-based Profiler

DataHub also can extract statistics about entities in DataHub using the SQL-based Profiler. According to the DataHub documentation, the Profiler can extract the following:

  • Row and column counts for each table
  • Column null counts and proportions
  • Column distinct counts and proportions
  • Column min, max, mean, median, standard deviation, quantile values
  • Column histograms or frequencies of unique values

In addition, we can also track the historical stats for each profiled entity each time metadata is ingested.

Amazon Redshift fact table showing SQL-based profiler column-level statistics
Another example, a Google BigQuery table showing SQL-based profiler column-level statistics

Data Lineage

DataHub’s data lineage features allow us to view upstream and downstream relationships between different types of entities. DataHub can trace lineage across multiple platforms, datasets, pipelines, charts, and dashboards.

Below, we see a simple example of dataset entity-to-entity lineage in Amazon Redshift and then Apache Spark on Amazon EMR. The fact table has a downstream relationship to four database views. The views are based on SQL queries that include the upstream table as a datasource.

Visual lineage view of Amazon Redshift fact table and its four downstream view dependencies
Another visual lineage example of an Apache Spark job with Apache Hive tables as both the source and sink

DataHub Analytics

DataHub provides basic metadata quality and usage analytics in the DataHub UI: user activity, counts of datasource types, business glossary terms, environments, and actions.

Examples of DataHub’s metadata quality and usage analytics capabilities
More examples of DataHub’s metadata quality and usage analytics capabilities

Conclusion

In this post, we explored the features of a data catalog and learned about some of the leading commercial and open-source data catalogs. Next, we learned how DataHub could collect, organize, enrich, and search metadata across multiple datasources. Lastly, we discovered how easy it is to catalog metadata from datasources spread across multiple CSP, SaaS providers, and corporate data centers, and centralize those results in DataHub.

In addition to the basic features reviewed in this post, DataHub offers a growing number of additional capabilities, including GraphQL and Timeline APIs, robust authentication and authorization, application monitoring observability, and Great Expectations integration. All these qualities make DataHub an excellent choice for a data catalog.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Eventual Consistency with Spring for Apache Kafka: Part 1 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

Given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Introduction

Apache Kafka is an open-source distributed event streaming platform capable of handling trillions of messages. According to Confluent, initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform.

Eventual consistency, according to Wikipedia, is a consistency model used in distributed computing to achieve high availability that informally guarantees that if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. I previously covered the topic of eventual consistency in a distributed system using RabbitMQ in the May 2017 post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. The post was featured on Pivotal’s RabbitMQ website.

Domain-driven Design

To ground the discussion, let’s examine a common example — an online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

Given this problem domain, we can assume we have the concept of a Customer. Further, we can assume the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of the Customer will require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program and online shopping activity. Fulfillment may maintain a record of all orders being shipped to the customer. Security likely holds the customer’s access credentials, account access history, and privacy settings.

Below are the Customer data objects are shown in yellow. Orange represents the logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts or even between services within the same context, then we must ensure data consistency. Take, for example, the case where a customer changes their home address or email. Let us assume that the Accounting context is the system of record for these data fields. However, to fulfill orders, the Shipping context might also need to maintain the customer’s current home address. Likewise, the Marketing context, responsible for opt-in email advertising, also needs to be aware of the email change and update its customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change without expecting a response. They are stating a fact, not asking a question. Interested parties can choose if and how to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, defined by Martin Fowler of ThoughtWorks in his insightful post, What do you mean by “Event-Driven”?. Changes to a piece of data can be thought of as a state change event — events that contain details of the data that changed. Coincidentally, Fowler uses a customer’s address change as an example of Event-Carried State Transfer in the post. Fellow former ThoughtWorker Graham Brooks also detailed the concept in his post, Event-Carried State Transfer Pattern.

Consistency Strategies

Multiple architectural approaches can be taken to solve for data consistency in a distributed system. For example, you could use a single relational database with shared schemas to persist data, avoiding the distributed data model altogether. However, it could be argued that using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages persist in Kafka, the service has the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity to the system.

In this post’s somewhat simplistic architecture, the business microservices will maintain consistency across their respective domains by producing and consuming messages from multiple Kafka topics to which they are subscribed. Kafka Producers may also be Consumers within our domain.

Storefront Example

In this post, our online storefront API will be built in Java using Spring Boot and OpenJDK 16. We will ensure the uniformity of distributed data by using a publish/subscribe model with Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot microservice, if appropriate, that state change will trigger a state change event, which will be shared with other microservices using Kafka topics.

View of the Storefront API from Kiali

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from one another while still ensuring the data is distributed.

Given the use case of placing an order, we will examine the interactions of three services that compose our storefront API: the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other in a completely decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at three event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service.

Below is a view of the online storefront through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you an idea of where Kafka and Zookeeper, Kafka’s current cluster manager, might sit in a typical, highly-available, microservice-based, distributed application platform.

This post will focus on the storefront’s backend API — its services, databases, and messaging sub-systems.

Storefront Microservices

We will explore the functionality of each of the three microservices and how they share state change events using Kafka 2.8. Each storefront API service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream are not part of this post.

Source Code

The storefront’s microservices source code is publicly available on GitHub. The four GitHub projects can be cloned using the following commands:

git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-accounts.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-orders.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-fulfillment.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. Below we see the representation of a Customer, as a BSON document in the customer.accounts MongoDB database collection.

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

Since the CustomerChangeEvent domain event object does not persist in MongoDB, we can look at its JSON message payload in Kafka to examine its structure. Note the differences in the data structure (schema) between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload.

{
"id": "5b189af9a8d05613315b0212",
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}, {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}]
}

For simplicity, we will assume that other services do not make changes to the customer’s name, contact information, or addresses — this is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub. Use the latest 2021-istio branch of the project.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows:

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. The FulfillmentRequestEvent object only contains the information it needs to share. Our example shares a single Order, along with the customer’s name, contact information, and shipping address.

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at its JSON message payload in Kafka. Again, note the schema differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka.

{
"timestamp": 1528334218821,
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"address": {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
"order": {
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvents": [{
"timestamp": 1528333926586,
"orderStatusType": "CREATED",
"note": null
}, {
"timestamp": 1528333926586,
"orderStatusType": "APPROVED",
"note": null
}],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": 11.99
},
"quantity": 5
}]
}
}

Source code for the Orders service is available on GitHub. Use the latest 2021-istio branch of the project.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store the latest shipping status, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping address are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object represented as a BSON document in the fulfillment.requests database collection looks as follows:

{
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"),
"timestamp": NumberLong("1528553706260"),
"name": {
"title": "Ms.",
"firstName": "Susan",
"lastName": "Blackstone"
},
"contact": {
"primaryPhone": "433-544-6555",
"secondaryPhone": "223-445-6767",
"email": "susan.m.blackstone@emailisus.com"
},
"address": {
"type": "SHIPPING",
"description": "Home Sweet Home",
"address1": "33 Oak Avenue",
"city": "Nowhere",
"state": "VT",
"postalCode": "444556-9090"
},
"order": {
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44",
"orderStatusEvents": [{
"timestamp": NumberLong("1528558453686"),
"orderStatusType": "RECEIVED"
}],
"orderItems": [{
"product": {
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa",
"title": "Purple Widget"
},
"quantity": 2
},
{
"product": {
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897",
"title": "Blue Widget"
},
"quantity": 5
},
{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget"
},
"quantity": 2
}
]
},
"shippingMethod": "Drone",
"_class": "com.storefront.model.Fulfillment"
}

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, again, we can again look at its JSON message payload in Kafka.

{
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvent": {
"timestamp": 1528334452746,
"orderStatusType": "PROCESSING",
"note": null
}
}

Source code for the Fulfillment service is available on GitHub. Use the latest 2021-istio branch of the project.

State Change Event Messaging Flows

There are three state change event messaging flows illustrated in this post.

  1. Changes to a Customer triggers an event message produced by the Accounts service, which is published on the accounts.customer.change Kafka topic and consumed by the Orders service;
  2. Order Approved triggers an event message produced by the Orders service, which is published on the orders.order.fulfill Kafka topic, and is consumed by the Fulfillment service;
  3. Changes to the status of an Order triggers an event message produced by the Fulfillment Service, which is published on the fulfillment.order.change Kafka topic, and is consumed by the Orders service;

Each of these state change event messaging flows follows the same architectural pattern on both the Kafka topic’s producer and consumer sides.

Let us examine each state change event messaging flow and the code behind it.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. By way of Kafka, it can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information.

There are different methods to trigger a message to be sent to Kafka. For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity.

@Slf4j
@Controller
public class AfterSaveListener extends AbstractMongoEventListener<Customer> {
@Value("${spring.kafka.topic.accounts-customer}")
private String topic;
private Sender sender;
@Autowired
public AfterSaveListener(Sender sender) {
this.sender = sender;
}
@Override
public void onAfterSave(AfterSaveEvent<Customer> event) {
log.info("onAfterSave event='{}'", event);
Customer customer = event.getSource();
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent();
customerChangeEvent.setId(customer.getId());
customerChangeEvent.setName(customer.getName());
customerChangeEvent.setContact(customer.getContact());
customerChangeEvent.setAddresses(customer.getAddresses());
sender.send(topic, customerChangeEvent);
}
}

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate;
public void send(String topic, CustomerChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender uses a KafkaTemplate to send the message to the accounts.customer.change Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

The Orders service’s Receiver class consumes the CustomerChangeEvent messages produced by the Accounts service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Orders service’s Receiver class is configured differently compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as input, denoting the object type into which the message payload needs to be deserialized. This way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file in each service’s resources directory contains the Kafka configuration (lines 11–19).

server:
port: 8080
spring:
main:
allow-bean-definition-overriding: true
application:
name: orders
data:
mongodb:
uri: mongodb://mongo:27017/orders
kafka:
bootstrap-servers: kafka:9092
topic:
accounts-customer: accounts.customer.change
orders-order: orders.order.fulfill
fulfillment-order: fulfillment.order.change
consumer:
group-id: orders
auto-offset-reset: earliest
zipkin:
sender:
type: kafka
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: INFO
spring:
config:
activate:
on-profile: local
data:
mongodb:
uri: mongodb://localhost:27017/orders
kafka:
bootstrap-servers: localhost:9092
server:
port: 8090
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
spring:
config:
activate:
on-profile: confluent
server:
port: 8080
logging:
level:
root: INFO
server:
port: 8080
spring:
config:
activate:
on-profile: minikube
data:
mongodb:
uri: mongodb://mongo.dev:27017/orders
kafka:
bootstrap-servers: kafka-cluster.dev:9092
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG

Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the orders.order.fulfill Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate;
public void send(String topic, FulfillmentRequestEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender class uses a KafkaTemplate to send the message to the orders.order.fulfill Kafka topic, as shown below. Since message order is not critical, messages can be sent to a topic with multiple partitions if the volume of messages required it.

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. The Fulfillment object includes the order to be fulfilled and the customer’s contact and shipping information.

@Slf4j
@Component
public class Receiver {
@Autowired
private FulfillmentRepository fulfillmentRepository;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.orders-order}")
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) {
log.info("received payload='{}'", fulfillmentRequestEvent.toString());
latch.countDown();
Fulfillment fulfillment = new Fulfillment();
fulfillment.setId(fulfillmentRequestEvent.getId());
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp());
fulfillment.setName(fulfillmentRequestEvent.getName());
fulfillment.setContact(fulfillmentRequestEvent.getContact());
fulfillment.setAddress(fulfillmentRequestEvent.getAddress());
fulfillment.setOrder(fulfillmentRequestEvent.getOrder());
fulfillmentRepository.save(fulfillment);
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(FulfillmentRequestEvent.class));
}
@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Fulfillment Order Status State Change

When the Order status in a Fulfillment entity is changed to anything other than Approved, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial Created status to the final Received status.

The Fulfillment service exposes several endpoints via the FulfillmentController class, which simulates a change in order status. They allow an order’s status to be changed from Approved to Processing, to Shipped, to In Transit, and finally to Received. This change applies to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates a Kafka message, containing the OrderStatusChangeEvent in the message payload. The Fulfillment service’s Sender class handles this.

Note in this example that these two events are not handled in an atomic transaction. Either updating the database or sending the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure that both these independent actions succeed or fail as a single transaction to ensure data consistency, using any of a handful of common architectural patterns.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;
public void send(String topic, OrderStatusChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender class uses a KafkaTemplate to send the message to the fulfillment.order.change Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages can be sent to a topic with multiple partitions if the volume of messages requires it.

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message produced by the Fulfillment service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service receives messages from more than one topic. The ReceiverConfig class deserializes all messages using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory class’s setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components into a local development environment running on Kubernetes with Istio, using Minikube. To provide operational visibility, we will add observability tools, like Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo Express, Kiali, Prometheus, and Grafana to our system.

View of the Storefront API from Kiali

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Cross-Account Amazon Elastic Container Registry (ECR) Access for ECS

Deploying containerized applications on Amazon ECS using cross-account elastic container registries

This is an updated version of a post, originally published in October 2019. This post uses AWS CLI version 2 and contains updated versions of all Docker images.

Introduction

There are two scenarios I frequently encounter that require sharing Amazon Elastic Container Registry (ECR)-based Docker images across multiple AWS Accounts. In the first scenario, a vendor wants to share a Docker image with their customer, stored in the vendor’s private container registry. Many popular container security and observability solutions function in this manner.

Below, we see an example of an application consisting of three containers. Two of the container images originated from the customer’s own ECR repositories (right side). The third image originated from their vendor’s ECR repository (left side).

In the second scenario, an enterprise operates multiple AWS accounts to create logical security boundaries between environments and responsibilities. The first AWS account contains the enterprise’s deployable assets, including their ECR image repositories. The enterprise has additional accounts, such as Development, Test, Staging, and Production, for each Software Development Life Cycle (SDLC) phase. The ECR images in the repository account need to be accessed from multiple AWS accounts and often across different AWS Regions for deployment.

Below, we see an example of a deployed application also consisting of three containers. All the container images originated from the ECR repositories account (left side). The images were pulled into the Production account during deployment to ECS (right side).

This post will explore the first scenario — a vendor who wants to share a private Docker image with their customer securely. The post will demonstrate how to share images across AWS accounts for use with Docker Swarm and Amazon Elastic Container Service (ECS) with AWS Fargate, both using ECR Repository Policies.

For the demonstration, we will use an existing application I have created, a RESTful, HTTP-based NLP (Natural Language Processing) API, consisting of four Golang microservices. The edge service, nlp-client, communicates with the rake-app, lang-app, and prose-app services. There is a fifth service, dyanmo-app, which is not discussed in this post, but easily added to the API.

A customer has developed the nlp-client, lang-app, and prose-app container-based microservices as part of their NLP application in the post’s scenario. Instead of developing their own implementation of the RAKE (Rapid Automatic Keyword Extraction) algorithm, they have licensed a version from a vendor. The vendor’s rake-app service is delivered in the form of a licensed Docker image. The acronym ISV (Independent Software Vendor) is used to represent the vendor throughout the code.

The NPL API exposes several endpoints accessible through the nlp-client service. The endpoints perform common NLP operations on text input, such as extracting keywords, tokens, entities, and sentences, and determining the language. All the endpoints can be listed using the /routes endpoint.

[
{
"method": "GET",
"path": "/error",
"name": "main.getError"
},
{
"method": "POST",
"path": "/keywords",
"name": "main.getKeywords"
},
{
"method": "POST",
"path": "/language",
"name": "main.getLanguage"
},
{
"method": "GET",
"path": "/health",
"name": "main.getHealth"
},
{
"method": "GET",
"path": "/health/:app",
"name": "main.getHealthUpstream"
},
{
"method": "GET",
"path": "/routes",
"name": "main.getRoutes"
},
{
"method": "POST",
"path": "/tokens",
"name": "main.getTokens"
},
{
"method": "POST",
"path": "/entities",
"name": "main.getEntities"
},
{
"method": "POST",
"path": "/sentences",
"name": "main.getSentences"
}
]

Requirements

To follow along with the post’s demonstration, you will need two AWS accounts, one representing the vendor and one representing one of their customers. It is relatively simple to create additional AWS accounts — all you need is a unique email address (easy with Gmail) and a credit card. Using AWS Organizations can make the task of creating and managing multiple accounts even easier.

I have intentionally used different AWS Regions to demonstrate how you can share ECR images across both AWS accounts and regions. You will need a current version of the AWS CLI version 2 and of Docker. Lastly, you will need adequate access to each AWS account to create resources.

Source Code

The demonstration’s source code is contained in five public GitHub repositories.

git clone --branch v2.0.0 \
    --single-branch --depth 1 \
    https://github.com/garystafford/ecr-cross-account-demo.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/nlp-client.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/prose-app.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/rake-app.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/lang-app.git

The v2.0.0 branch of the ecr-cross-account-demo GitHub repository contains all the CloudFormation templates and the Docker Compose Stack file.

.
├── LICENSE
├── README.md
├── cfn-templates
│ ├── development-user-group-customer.yml
│ ├── development-user-group-isv.yml
│ ├── ecr-repo-not-shared.yml
│ ├── ecr-repo-shared.yml
│ ├── public-subnet-public-loadbalancer.yml
│ └── public-vpc.yml
└── docker
└── stack.yml

Each of the other four GitHub repositories, such as the nlp-client repository, contains a Golang-based microservice, which together comprises the NLP API. Each repository also contains a Dockerfile.

.
├── Dockerfile
├── LICENSE
├── README.md
├── buildspec.yml
├── go.mod
├── go.sum
└── main.go

We will use AWS CloudFormation to create the necessary resources within both AWS accounts. We will also use CloudFormation to create an ECS Cluster and an Amazon ECS Task Definition for the customer account. Task Definition defines how ECS will deploy the application, consisting of four Docker containers, using AWS Fargate. In addition to ECS, we will create an Amazon Virtual Private Cloud (VPC) to house the ECS cluster and a public-facing, Layer 7 Application Load Balancer (ALB) to load-balance our ECS-based application.

Creating ECR Repositories

In the first AWS account, representing the vendor, we will execute two CloudFormation templates. The first template, development-user-group-isv.yml, creates the Development group and VendorDev user. The VendorDev user will be given explicit access to the vendor’s rake-app ECR repository. Change the DevUserPassword parameter’s value to something more secure.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export IAM_USER_PSWD=T0pS3cr3Tpa55w0rD
aws --region ${ISV_ECR_REGION} cloudformation create-stack \
--stack-name development-user-group-isv \
--template-body file://cfn-templates/development-user-group-isv.yml \
--parameters \
ParameterKey=DevUserPassword,ParameterValue=${IAM_USER_PSWD} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting CloudFormation Stack showing the new IAM Group and User.

Next, we will execute the second CloudFormation template, ecr-repo-shared.yml, which creates the vendor’s rake-app ECR image repository. The rake-app repository will house a copy of the vendor’s rake-app Docker Image. But first, let’s look at the CloudFormation template used to create the repository, specifically the RepositoryPolicyText section. Here we define two repository policies:

  • The AllowPushPull policy explicitly allows the VendorDev user to push and pull versions of the image to the ECR repository. We import the exported Amazon Resource Name (ARN) of the VendorDev user from the previous CloudFormation Stack Outputs. We have also allowed AWS CodeBuild service access to the ECR repository. This is known as a Service-Linked Role. We will not use CodeBuild in this brief post.
  • The AllowPull policy allows anyone in the customer’s AWS account (root) to pull any version of the image. They cannot push, only pull. Cross-account access can be restricted to a finer-grained set of the specific customer’s IAM Entities and source IP addresses.

Note the "ecr:GetAuthorizationToken" policy Action. This action will allow the customer’s user to log into the vendor’s ECR repository and receive an Authorization Token. The customer retrieves a token that is valid for a specified container registry for 12 hours.

RepositoryPolicyText:
Version: '2012-10-17'
Statement:
- Sid: AllowPushPull
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
AWS:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'DevUserArn']]
Action:
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:BatchGetImage'
- 'ecr:CompleteLayerUpload'
- 'ecr:DescribeImages'
- 'ecr:DescribeRepositories'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:GetRepositoryPolicy'
- 'ecr:InitiateLayerUpload'
- 'ecr:ListImages'
- 'ecr:PutImage'
- 'ecr:UploadLayerPart'
- Sid: AllowPull
Effect: Allow
Principal:
AWS: !Join [':', ['arn:aws:iam:', !Ref 'CustomerAccount', 'root']]
Action:
- 'ecr:GetAuthorizationToken'
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:BatchGetImage'
- 'ecr:DescribeRepositories' # optional permission
- 'ecr:DescribeImages' # optional permission

Before executing the following command to deploy the CloudFormation Stack, ecr-repo-shared.yml, replace the CUSTOMER_ACCOUNT value with your pseudo customer’s AWS account ID.

# change me
export CUSTOMER_ACCOUNT=999888777666
export CUSTOMER_ECR_REGION=us-west-2
# NLP Rake Microservice
REPO_NAME=rake-app
aws --region ${ISV_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-shared.yml \
--parameters \
ParameterKey=CustomerAccount,ParameterValue=${CUSTOMER_ACCOUNT} \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting CloudFormation Stack showing the new ECR repository.

Below, we see the ECR repository policies applied correctly in the Permissions tab of the rake-app repository. The first policy covers both the VendorDev user, referred to as an IAM Entity, as well as AWS CodeBuild, referred to as a Service Principal.

The second policy covers the customer’s AWS account ID.

Repeat this process in the customer’s AWS account. First, the CloudFormation template, development-user-group-customer.yml, containing the Development group and CustomerDev user.

# change me
export IAM_USER_PSWD=T0pS3cr3Tpa55w0rD
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name development-user-group-customer \
--template-body file://cfn-templates/development-user-group-customer.yml \
--parameters \
ParameterKey=DevUserPassword,ParameterValue=${IAM_USER_PSWD} \
--capabilities CAPABILITY_NAMED_IAM

Next, we will execute the second CloudFormation template, ecr-repo-not-shared.yml, three times, once for each of the customer’s three ECR repositories, nlp-client, lang-app, and prose-app. Note that in the RepositoryPolicyText section of the template we only define a single policy. Identical to the vendor’s policy, the AllowPushPull policy explicitly allows the previously-created CustomerDev user to push and pull versions of the image to the ECR repository. There is no cross-account access required to the customer’s two ECR repositories.

RepositoryPolicyText:
Version: '2012-10-17'
Statement:
- Sid: AllowPushPull
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
AWS:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'DevUserArn']]

Action:
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:BatchGetImage'
- 'ecr:CompleteLayerUpload'
- 'ecr:DescribeImages'
- 'ecr:DescribeRepositories'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:GetRepositoryPolicy'
- 'ecr:InitiateLayerUpload'
- 'ecr:ListImages'
- 'ecr:PutImage'
- 'ecr:UploadLayerPart'

Execute the following commands to create the three CloudFormation Stacks. The Stacks use the same template, ecr-repo-not-shared.yml, with a different Stack name and RepoName parameter values.

# NLP Client microservice
REPO_NAME=nlp-client
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

# NLP Prose microservice
REPO_NAME=prose-app
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM
# NLP Language microservice
REPO_NAME=lang-app
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting three ECR repositories.

At this point, we have our four ECR repositories across the two AWS accounts, with the proper ECR Repository Policies applied to each.

Building and Pushing Images to ECR

Next, we will build and push the three NLP application images to their corresponding ECR repositories. To confirm that the ECR policies are working correctly, log in as the VendorDev user and perform the below command.

aws ecr get-login-password --region ${ISV_ECR_REGION} \
| docker login --username AWS --password-stdin ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com

Logged in as the vendor’s VendorDev user, build and push the Docker image to the rake-app repository. The Dockerfile and Golang source code are located in each GitHub repository. With Golang and Docker multi-stage builds, we will create very small Docker images, based on Scratch, containing just the compiled Go executable binary. At a mere 7–15 MBs in size, pushing and pulling these Docker images across accounts is very fast.

docker build -t ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0 . --no-cache
docker push ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0

Below, we see the output from the vendor’s VendorDev user logging into the rake-app repository.

We see the vendor’s VendorDev user building results and pushing the Docker image to the rake-app repository.

Next, after logging in as the customer’s CustomerDev user, build and push the Docker images to the ECR nlp-client, lang-app, and prose-app repositories. Again, make sure you substitute the variable values below with your pseudo customer’s AWS account and preferred AWS region.

aws ecr get-login-password --region ${CUSTOMER_ECR_REGION} \
| docker login --username AWS --password-stdin ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com
# nlp-client
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0
# prose-app
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0
# lang-app
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0

At this point, each of the four customer ECR repositories has a Docker image pushed to them.

Deploying Locally to Docker Swarm

As a simple demonstration of cross-account ECS access, we will start with Docker Swarm. Logged in as the customer’s CustomerDev user and using the Docker Swarm Stack file included in the project, we can create and run a local copy of our NLP application in our customer’s account. First, we need to log into the vendor’s ECR repository in order to pull the image from the vendor’s ECR registry.

aws ecr get-login-password --region ${ISV_ECR_REGION} \
| docker login --username AWS --password-stdin ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com

Once logged in to the vendor’s ECR repository, we will pull the image. Using the docker describe-repositories and docker describe-images, we can list cross-account repositories and images your IAM user has access to if you are unsure.

aws ecr describe-repositories \
--registry-id ${ISV_ACCOUNT} \
--region ${ISV_ECR_REGION} \
--repository-name rake-app
aws ecr describe-images \
--registry-id ${ISV_ACCOUNT} \
--region ${ISV_ECR_REGION} \
--repository-name rake-app
docker pull ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0

Using the following command, you should see each of our four applications Docker images.

docker image ls --filter=reference='*amazonaws.com/*'

Below, we see an example of the expected terminal output from pulling the image and listing the images.

Build Docker Stack Locally

Next, build the Docker Swarm Stack. The Docker Compose file, stack.yml, is shown below. Note the location of the Docker images.

version: '3.9'
services:
nlp-client:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0
networks:
nlp-demo
ports:
target: 8080
published: 8080
protocol: tcp
mode: host
environment:
NLP_CLIENT_PORT
RAKE_ENDPOINT
PROSE_ENDPOINT
LANG_ENDPOINT
API_KEY
rake-app:
image: ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0
networks:
nlp-demo
environment:
RAKE_PORT
API_KEY
prose-app:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0
networks:
nlp-demo
environment:
PROSE_PORT
API_KEY
lang-app:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0
networks:
nlp-demo
environment:
LANG_PORT
API_KEY
networks:
nlp-demo:
volumes:
data: {}
view raw stack.yaml hosted with ❤ by GitHub

Execute the following commands to deploy the Docker Stack to Docker Swarm. Again, make sure you substitute the variable values below with your pseudo vendor and customer AWS accounts and regions. Additionally, the NLP API uses an API Key to protect all exposed endpoints, except the /health endpoint, across all four services. Change the default CloudFormation template’s API Key parameter to something more secure.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export CUSTOMER_ACCOUNT=999888777666
export CUSTOMER_ECR_REGION=us-west-2
export API_KEY=SuP3r5eCRetAutHK3y

# don't change me
export NLP_CLIENT_PORT=8080
export RAKE_PORT=8080
export PROSE_PORT=8080
export LANG_PORT=8080
export RAKE_ENDPOINT=http://rake-app:${RAKE_PORT}
export PROSE_ENDPOINT=http://prose-app:${PROSE_PORT}
export LANG_ENDPOINT=http://lang-app:${LANG_PORT}
export TEXT="The Nobel Prize is regarded as the most prestigious award in the World. Notable winners have included Marie Curie, Theodore Roosevelt, Albert Einstein, George Bernard Shaw, and Winston Churchill."
 
docker swarm init 
docker stack deploy --compose-file docker/stack.yml nlp

You can check the success of the deployment with either of the following commands:

docker stack ps nlp --no-trunc
docker container ls

Below, we see an example of the expected terminal output.

With the Docker Stack, you can hit the nlp-client service directly on localhost:8080. Unlike Fargate, which requires unique static ports for each container in the task, with Docker, we can choose to run all the containers on the same port without conflict since only the nlp-client service is exposing port :8080. Unlike with ECS, there is no load balancer in front of the Stack, since we only have a single node in our Swarm and thus a single container instance of each microservice for testing.

To test that the images were pulled successfully and the Docker Stack is running, we can execute a curl command against any of the API endpoints, such as /keywords. Below, I am using jq to pretty-print the JSON response payload.

curl -s -X POST \
"http://localhost:${NLP_CLIENT_PORT}/keywords" \
-H 'Content-Type: application/json' \
-H "X-API-Key: ${API_KEY}" \
-d '{"text": "The Internet is the global system of interconnected computer networks that use the Internet protocol suite to link devices worldwide."}' | jq

The resulting JSON response payload indicates that the nlp-client service was reached successfully and that it was then subsequently able to communicate with the rake-app service, whose container image originated from the vendor’s ECR repository.

[
{
"candidate": "interconnected computer networks",
"score": 9
},
{
"candidate": "link devices worldwide",
"score": 9
},
{
"candidate": "internet protocol suite",
"score": 8
},
{
"candidate": "global system",
"score": 4
},
{
"candidate": "internet",
"score": 2
}
]

Creating Amazon ECS Environment

Although using Docker Swarm locally is a great way to understand how cross-account ECR access works, it is not a typical use case for deploying containerized applications on the AWS Platform. More often, you could use Amazon ECS, Amazon Elastic Kubernetes Service (EKS), or enterprise versions of third-party orchestrators such as RedHat OpenShift or Rancher.

Using CloudFormation and some very convenient CloudFormation templates supplied by Amazon as a starting point, we will create a complete ECS environment for our application. First, we will create a VPC to house the ECS cluster and a public-facing ALB to front our ECS-based application, using the public-vpc.yml template.

aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name public-vpc \
--template-body file://cfn-templates/public-vpc.yml \
--capabilities CAPABILITY_NAMED_IAM

Next, we will create the ECS cluster and an Amazon ECS Task Definition using the public-subnet-public-loadbalancer.yml template. Again, the Task Definition defines how ECS will deploy our application using AWS Fargate. Amazon Fargate allows you to run containers without having to manage servers or clusters. No EC2 instances to manage! Woot! Below, in the CloudFormation template, we see the ContainerDefinitions section of the TaskDefinition resource that contains three container definitions. Note the three images and their ECR locations.

TaskDefinition:
Type: AWS::ECS::TaskDefinition
DependsOn: CloudWatchLogsGroup
Properties:
Family: !Ref 'ServiceNameClient'
Cpu: !Ref 'ContainerCpu'
Memory: !Ref 'ContainerMemory'
NetworkMode: awsvpc
RequiresCompatibilities:
FARGATE
EC2
ExecutionRoleArn:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'ECSTaskExecutionRole']]
TaskRoleArn:
Fn::If:
'HasCustomRole'
!Ref 'Role'
!Ref 'AWS::NoValue'
ContainerDefinitions:
Name: nlp-client
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/nlp-client:1.1.0']]
PortMappings:
ContainerPort: !Ref ContainerPortClient
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: NLP_CLIENT_PORT
Value: !Ref ContainerPortClient
Name: RAKE_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortRake]]
Name: PROSE_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortProse]]
Name: LANG_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortLang]]
Name: API_KEY
Value: !Ref ApiKey
Name: rake-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref VendorAccountId, 'dkr.ecr', !Ref VendorEcrRegion, 'amazonaws.com/rake-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: RAKE_PORT
Value: !Ref ContainerPortRake
Name: API_KEY
Value: !Ref ApiKey
Name: prose-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/prose-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: PROSE_PORT
Value: !Ref ContainerPortProse
Name: API_KEY
Value: !Ref ApiKey
Name: lang-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/lang-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: LANG_PORT
Value: !Ref ContainerPortLang
Name: API_KEY
Value: !Ref ApiKey

Execute the following command to create the ECS cluster and an ECS Task Definition using the CloudFormation template.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export API_KEY=SuP3r5eCRetAutHK3y
aws cloudformation create-stack \
--stack-name public-subnet-public-loadbalancer \
--template-body file://cfn-templates/public-subnet-public-loadbalancer.yml \
--parameters \
ParameterKey=VendorAccountId,ParameterValue=${ISV_ACCOUNT} \
ParameterKey=VendorEcrRegion,ParameterValue=${ISV_ECR_REGION} \
ParameterKey=ApiKey,ParameterValue=${API_KEY} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the expected output from the CloudFormation Management Console.

If you want to update the ECS Task Definition, simply run the aws cloudformation update-stack command.

CloudWatch Container Insights

The CloudFormation template does not enable CloudWatch Container Insights by default. Container Insights collects, aggregates, and summarizes metrics and logs from your containerized applications. To enable Insights, execute the following command:

aws ecs put-account-setting \
--name "containerInsights" --value "enabled"

Confirming the Cross-account Policy

If everything went right in the previous steps, we should now have an ECS cluster running our containerized application, including the container built from the vendor’s Docker image. Below, we see an example of the ECS cluster displayed in the management console.

Within the ECR cluster, we should observe a single running ECS Service. According to AWS, Amazon ECS allows you to run and maintain a specified number of instances of a task definition simultaneously in an Amazon ECS cluster; this is referred to as a Service.

We are running two instances of each container on ECS, thus two copies of the task within a single service. Each task runs its containers in a different Availability Zone for high availability.

Drilling into the service, note the new ALB associated with the new VPC, two public subnets, and the corresponding security group.

Switching to the Task Definitions tab, note that the ECS Task contains four container definitions that comprise the NLP API. Three images originated from the customer’s ECR repositories, and one from the vendor’s ECR repository.

Drilling in further, we will see the details of each container definition, including environment variables, passed from ECR to the container and on to the Golang binary running in the container.

Accessing the NLP API on ECS

With our earlier Docker Swarm example, the curl command was issued against localhost. We now have a public-facing Application Load Balancer (ALB) in front of our ECS-based application. We will use the DNS name of the ALB to hit our application on ECS. The DNS address (A Record) can be obtained from the Load Balancer Management Console, as shown below, or from the Output tab of the public-vpc CloudFormation Stack.

Another difference between the earlier Docker Swarm example and ECS is the port. Although the edge service, nlp-client, runs on port :8080, the ALB acts as a reverse proxy, passing requests from port :80 on the ALB to port :8080 of the nlp-client container instances (actually, the shared ENI of the running task).

Although I did set up a custom domain name for the ALB using Route53 and enabled HTTPS (port 443 on the ELB), https://nlp-ecs.example-api.com, for the sake of brevity, I will not go into the details in this post.

To test our deployed ECS, we can use a tool like curl or Postman to test the API’s endpoints. Don’t forget to you will need to add the API Key for authentication using the X-API-Key header key/value pair. Below we see a successful GET against the /routes endpoint, using Postman.

Here we see a successful POST against the /keywords endpoint, using Postman.

Cleaning Up

To clean up the demonstration’s AWS resources and Docker Stack, run the following scripts in the appropriate AWS accounts. Importantly, similar to S3, you must delete all the Docker images in the ECR repositories first, before deleting the repository, or else you will receive a CloudFormation error. This includes untagged images.

# local docker stack
docker stack rm nlp
# customer account
aws ecr batch-delete-image \
--repository-name nlp-client \
--image-ids imageTag=1.1.0
aws ecr batch-delete-image \
--repository-name prose-app \
--image-ids imageTag=1.1.0
aws ecr batch-delete-image \
--repository-name lang-app \
--image-ids imageTag=1.1.0
aws cloudformation delete-stack \
--stack-name ecr-repo-nlp-client
aws cloudformation delete-stack \
--stack-name ecr-repo-prose-app
aws cloudformation delete-stack \
--stack-name ecr-repo-lang-app
aws cloudformation delete-stack \
--stack-name public-subnet-public-loadbalancer
aws cloudformation delete-stack \
--stack-name public-vpc
aws cloudformation delete-stack \
--stack-name development-user-group-customer
# vendor account
aws ecr batch-delete-image \
--repository-name rake-app \
--image-ids imageTag=1.1.0
aws cloudformation delete-stack \
--stack-name ecr-repo-rake-app
aws cloudformation delete-stack \
--stack-name development-user-group-isv

Conclusion

In the preceding post, we saw how multiple AWS accounts could share private ECR-based Docker images. There are variations and restrictions to the configuration of the ECR Repository Policies, depending on the deployment tools you are using, such as AWS CodeBuild, AWS CodeDeploy, or AWS Elastic Beanstalk. AWS does a good job of providing some examples in their documentation, including Amazon ECR Repository Policy Examples and Amazon Elastic Container Registry Identity-Based Policy Examples.

In late 2020, AWS released Amazon Elastic Container Registry Public (ECR Public). Although this post was about private images, for public images, ECR Public allows you to store, manage, share, and deploy container images for anyone to discover and download globally.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Employing Amazon Macie to Discover and Protect Sensitive Data in your Amazon S3-based Data Lake

Introduction

Working with Analytics customers, it’s not uncommon to see data lakes with a dozen or more discrete data sources. Data typically originates from sources both internal and external to the customer. Internal data may come from multiple teams, departments, divisions, and enterprise systems. External data comes from vendors, partners, public sources, and subscriptions to licensed data sources. The volume, velocity, variety, veracity, and method of delivery vary across the data sources. All this data is being fed into data lakes for purposes such as analytics, business intelligence, and machine learning.

Given the growing volumes of incoming data and variations amongst data sources, it is increasingly complex, expensive, and time-consuming for organizations to ensure compliance with relevant laws, policies, and regulations. Regulations that impact how data is handled in a data lake include the Organizations Health Insurance Portability and Accountability Act (HIPAA), General Data Privacy Regulation (GDPR), Payment Card Industry Data Security Standard (PCI DSS), California Consumer Privacy Act (CCPA), and the Federal Information Security Management Act (FISMA).

Data Lake

AWS defines a data lake as a centralized repository that allows you to store all your structured and unstructured data at any scale. Once in the data lake, you run different types of analytics — from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

Data in a data lake is regularly organized or separated by its stage in the analytics process. Incoming data is often referred to as raw data. Data is then processed — cleansed, filtered, enriched, and tokenized if necessary. Lastly, the data is analyzed and aggregated, and the results are written back to the data lake. The analyzed and aggregated data is used to build business intelligence dashboards and reports, machine learning models, and is delivered to downstream or external systems. The different categories of data — raw, processed, and aggregated, are frequently referred to as bronze, silver, and gold, a reference to their overall data quality or value.

Protecting the Data Lake

Imagine you’ve received a large volume of data from an external data source. The incoming data is cleansed, filtered, and enriched. The data is re-formatted, partitioned, compressed for analytical efficiency, and written back to the data lake. Your analytics pipelines run complex and time-consuming queries against the data. Unfortunately, while building reports for a set of stakeholders, you realize that the original data accidentally included credit card information and other sensitive information about your customers. In addition to being out of compliance, you have the wasted time and expense of the initial data processing, as well as the extra time and expense to replace and re-process the data. The solution — Amazon Macie.

Amazon Macie

According to AWS, Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to discover and protect your sensitive data stored in Amazon Simple Storage Service (Amazon S3). Macie’s alerts, or findings, can be searched, filtered, and sent to Amazon EventBridge, formerly called Amazon CloudWatch Events, for easy integration with existing workflow or event management systems, or to be used in combination with AWS services, such as AWS Step Functions or Amazon Managed Workflows for Apache Airflow (MWAA) to take automated remediation actions.

Amazon Macie’s Summary view

Data Discovery and Protection

In this post, we will deploy an automated data inspection workflow to examine sample data in an S3-based data lake. Amazon Macie will examine data files uploaded to an encrypted S3 bucket. If sensitive data is discovered within the files, the files will be moved to an encrypted isolation bucket for further investigation. Email and SMS text alerts will be sent. This workflow will leverage Amazon EventBridge, Amazon Simple Notification Service (Amazon SNS), AWS Lambda, and AWS Systems Manager Parameter Store.

Macie data inspection workflow architecture

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/macie-demo.git

AWS resources for this post can be deployed using AWS Cloud​Formation. To follow along, you will need recent versions of Python 3, Boto3, and the AWS CLI version 2, installed.

Sample Data

We will use synthetic patient data, freely available from the MITRE Corporation. The data was generated by Synthea, MITRE’s open-source, synthetic patient generator that models the medical history of synthetic patients. Synthea data is exported in a variety of data standards, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Download and unzip the CSV files from the Synthea website.

REMOTE_FILE="synthea_sample_data_csv_apr2020.zip"
wget "https://storage.googleapis.com/synthea-public/${REMOTE_FILE}"
unzip -j "${REMOTE_FILE}" -d synthea_data/

The sixteen CSV data files contain a total of 471,852 rows of data, including column headers.

> wc -l *.csv

      598 allergies.csv
    3,484 careplans.csv
    8,377 conditions.csv
       79 devices.csv
   53,347 encounters.csv
      856 imaging_studies.csv
   15,479 immunizations.csv
   42,990 medications.csv
  299,698 observations.csv
    1,120 organizations.csv
    1,172 patients.csv
    3,802 payer_transitions.csv
       11 payers.csv
   34,982 procedures.csv
    5,856 providers.csv
        1 supplies.csv
  ------------------------------
  471,852 total

Amazon Macie Custom Data Identifier

To demonstrate some of the advanced features of Amazon Macie, we will use three Custom Data Identifiers. According to Macie’s documentation, a custom data identifier is a set of criteria that you define that reflects your organization’s particular proprietary data — for example, employee IDs, customer account numbers, or internal data classifications. We will create three custom data identifiers to detect the specific Synthea-format Patient ID, US driver number, and US passport number columns.

Post’s three custom data identifiers

The custom data identifiers in this post use a combination of regular expressions (regex) and keywords. The identifiers are designed to work with structured data, such as CSV files. Macie reports text that matches the regex pattern if any of these keywords are in the name of the column or field that stores the text, or if the text is within the maximum match distance of one of these words in a field value. Macie supports a subset of the regex pattern syntax provided by the Perl Compatible Regular Expressions (PCRE) library.

Patient ID custom data identifier console

Enable Macie

Before creating a CloudFormation stack with this demonstration’s resources, you will need to enable Amazon Macie from the AWS Management Console, or use the macie2 API and the AWS CLI with the enable-macie command.

aws macie2 enable-macie

Macie can also be enabled for your multi-account AWS Organization. The enable-organization-admin-account command designates an account as the delegated Amazon Macie administrator account for an AWS organization. For more information, see Managing multiple accounts in Amazon Macie.

AWS_ACCOUNT=111222333444
aws macie2 enable-organization-admin-account \
--admin-account-id ${AWS_ACCOUNT}

CloudFormation Stack

To create the CloudFormation stack with the supplied template, cloudformation/macie_demo.yml, run the following AWS CLI command. You will need to include an email address and phone number as input parameters. These parameter values will be used to send email and text alerts when Macie produces a sensitive data finding.

Please make sure you understand all the potential cost and security implications of creating the CloudFormation stack before continuing.

SNS_PHONE="+12223334444"
SNS_EMAIL="your-email-address@email.com"

aws cloudformation create-stack \
--stack-name macie-demo \
--template-body file://cloudformation/macie_demo.yml \
--parameters ParameterKey=SNSTopicEndpointSms,ParameterValue=${SNS_PHONE} \
ParameterKey=SNSTopicEndpointEmail,ParameterValue=${SNS_EMAIL} \
--capabilities CAPABILITY_NAMED_IAM

As shown in the AWS CloudFormation console, the new macie-demo stack will contain twenty-one AWS resources.

CloudFormation stack successfully created

Upload Data

Next, with the stack deployed, upload the CSV format data files to the encrypted S3 bucket, representing your data lake. The target S3 bucket has the following naming convention, synthea-data-<aws_account_id>-<region>. You can retrieve the two new bucket names from AWS Systems Manager Parameter Store, which were written there by CloudFormation, using the ssm API.

aws ssm get-parameters-by-path \
--path /macie_demo/ \
--query 'Parameters[*].Value'

Use the following ssm and s3 API commands to upload the data files.

DATA_BUCKET=$(aws ssm get-parameter \
--name /macie_demo/patient_data_bucket \
--query 'Parameter.Value')
aws s3 cp synthea_data/ \
    "s3://$(eval echo ${DATA_BUCKET})/patient_data/" --recursive

You should end up with sixteen CSV files in the S3 bucket, totaling approximately 82.3 MB.

Synthea patient data files uploaded to in S3

Sensitive Data Discovery Jobs

With the CloudFormation stack created and the patient data files uploaded, we will create two sensitive data discovery jobs. These jobs will scan the contents of the encrypted S3 bucket for sensitive data and report the findings. According to the documentation, you can configure a sensitive data discovery job to run only once for on-demand analysis and assessment, or on a recurring basis for periodic analysis, assessment, and monitoring. For this demonstration, we will create a one-time sensitive data discovery job using the AWS CLI. We will also create a recurring sensitive data discovery job using the AWS SDK for Python (Boto3). Both jobs can also be created from within Macie’s Jobs console.

Creating a new job Macie’s Jobs console

For both sensitive data discovery jobs, we will include the three custom data identifiers. Each of the custom data identifiers has a unique ID. We will need all three IDs to create the two sensitive data discovery jobs. You can use the AWS CLI and the macie2 API to retrieve the values.

aws macie2 list-custom-data-identifiers --query 'items[*].id'

Next, modify the job_specs/macie_job_specs_1x.json file, adding the three custom data identifier IDs. Also, update your AWS account ID and S3 bucket name (lines 3–5, 12, and 14). Note that since all the patient data files are in CSV format, we will limit our inspection to only files with a csv file extension (lines 18–33).

{
"customDataIdentifierIds": [
"custom-data-identifier-id-1",
"custom-data-identifier-id-2",
"custom-data-identifier-id-3"
],
"description": "Review Synthea patient data (1x)",
"jobType": "ONE_TIME",
"s3JobDefinition": {
"bucketDefinitions": [
{
"accountId": "111222333444",
"buckets": [
"synthea-data-111222333444-us-east-1"
]
}
],
"scoping": {
"includes": {
"and": [
{
"simpleScopeTerm": {
"comparator": "EQ",
"key": "OBJECT_EXTENSION",
"values": [
"csv"
]
}
}
]
}
}
},
"tags": {
"KeyName": "Project",
"KeyValue": "Amazon Macie Demo"
}
}

The above JSON template was generated using the standard AWS CLI generate-cli-skeleton command.

aws macie2 create-classification-job --generate-cli-skeleton

To create a one-time sensitive data discovery job using the above JSON template, run the following AWS CLI command. The unique job name will be dynamically generated based on the current time.

aws macie2 create-classification-job \
--name $(echo "SyntheaPatientData_${EPOCHSECONDS}") \
--cli-input-json file://job_specs/macie_job_specs_1x.json

In the Amazon Macie Jobs console, we can see a one-time sensitive data discovery job running. With a sampling depth of 100, the job will take several minutes to run. The samplingPercentage job property can be adjusted to scan any percentage of the data. If this value is less than 100, Macie selects the objects to analyze at random, up to the specified percentage and analyzes all the data in those objects.

One-time sensitive data discovery job running

Once the job is completed, the findings will be available in Macie’s Findings console. Using the three custom data identifiers in addition to Macie’s managed data identifiers, there should be a total of fifteen findings from the Synthea patient data files in S3. There should be six High severity findings and nine Medium severity findings. Of those, three are of a Personal finding type, seven of a Custom Identifier finding type, and five of a Multiple finding type, having both Personal and Custom Identifier finding types.

Macie’s Findings console displaying the results of the one-time job

Isolating High Severity Findings

The data inspection workflow we have deployed uses an AWS Lambda function, macie-object-mover, to isolate all data files with High severity findings to a second S3 bucket. The offending files are copied to the isolation bucket and deleted from the source bucket.

#!/usr/bin/env python3
# Purpose: Lambda function that moves S3 objects flagged by Macie
# Author: Gary A. Stafford (March 2021)
import json
import logging
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
def lambda_handler(event, context):
logging.info(f'event: {json.dumps(event)}')
destination_bucket_name = 'macie-isolation-111222333444-us-east-1'
source_bucket_name = event['detail']['resourcesAffected']['s3Bucket']['name']
file_key_name = event['detail']['resourcesAffected']['s3Object']['key']
copy_source_object = {'Bucket': source_bucket_name, 'Key': file_key_name}
logging.debug(f'destination_bucket_name: {destination_bucket_name}')
logging.debug(f'source_bucket_name: {source_bucket_name}')
logging.debug(f'file_key_name: {file_key_name}')
try:
response = s3_client.copy_object(
CopySource=copy_source_object,
Bucket=destination_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
try:
response = s3_client.delete_object(
Bucket=source_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
return {
'statusCode': 200,
'body': json.dumps(copy_source_object)
}

Amazon EventBridge

According to Macie’s documentation, to support integration with other applications, services, and systems, such as monitoring or event management systems, Amazon Macie automatically publishes findings to Amazon EventBridge as finding events. Amazon EventBridge is a serverless event bus that makes it easier to build event-driven applications at scale using events generated from your applications, integrated Software-as-a-Service (SaaS) applications, and AWS services.

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on any of the custom data identifiers, macie-rule-custom, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

Post’s event rules, shown in the Amazon EventBridge console

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on one of the three custom data identifiers, macie-rule-high, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

{
"source": [
"aws.macie"
],
"detail-type": [
"Macie Finding"
],
"detail": {
"classificationDetails": {
"result": {
"customDataIdentifiers": {
"detections": {
"name": [
"Patient ID",
"US Passport",
"US Driver License"
]
}
}
}
}
}
}

Six data files, containing High severity findings, will be moved to the isolation bucket by the Lambda, triggered by EventBridge.

Isolation bucket containing data files with High severity findings

Scheduled Sensitive Data Discovery Jobs

Data sources commonly deliver data on a repeated basis, such as nightly data feeds. For these types of data sources, we can schedule sensitive data discovery jobs to run on a scheduled basis. For this demonstration, we will create a scheduled job using the AWS SDK for Python (Boto3). Unlike the AWS CLI-based one-time job, you don’t need to modify the project’s script, scripts/create_macie_job_daily.py. The Python script will retrieve your AWS account ID and three custom data identifier IDs. The Python script then runs the create_classification_job command.

#!/usr/bin/env python3
# Purpose: Create Daily Macie classification job – Synthea patient data
# Author: Gary A. Stafford (March 2021)
import logging
import sys
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
sts_client = boto3.client('sts')
macie_client = boto3.client('macie2')
def main():
params = get_parameters()
account_id = sts_client.get_caller_identity()['Account']
custom_data_identifiers = list_custom_data_identifiers()
create_classification_job(params['patient_data_bucket'], account_id, custom_data_identifiers)
def list_custom_data_identifiers():
"""Returns a list of all custom data identifier ids"""
custom_data_identifiers = []
try:
response = macie_client.list_custom_data_identifiers()
for item in response['items']:
custom_data_identifiers.append(item['id'])
return custom_data_identifiers
except ClientError as e:
logging.error(e)
sys.exit(e)
def create_classification_job(patient_data_bucket, account_id, custom_data_identifiers):
"""Create Daily Macie classification job"""
try:
response = macie_client.create_classification_job(
customDataIdentifierIds=custom_data_identifiers,
description='Review Synthea patient data (Daily)',
jobType='SCHEDULED',
initialRun=True,
name='SyntheaPatientData_Daily',
s3JobDefinition={
'bucketDefinitions': [
{
'accountId': account_id,
'buckets': [
patient_data_bucket
]
}
],
'scoping': {
'includes': {
'and': [
{
'simpleScopeTerm': {
'comparator': 'EQ',
'key': 'OBJECT_EXTENSION',
'values': [
'csv',
]
}
},
]
}
}
},
samplingPercentage=100,
scheduleFrequency={
'dailySchedule': {}
},
tags={
'Project': 'Amazon Macie Demo'
}
)
logging.debug(f'Response: {response}')
except ClientError as e:
logging.error(e)
sys.exit(e)
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'patient_data_bucket': ssm_client.get_parameter(Name='/macie_demo/patient_data_bucket')['Parameter']['Value']
}
return params
if __name__ == '__main__':
main()

To create the scheduled sensitive data discovery job, run the following command.

python3 ./scripts/create_macie_job_daily.py

The scheduleFrequency parameter is set to { 'dailySchedule': {} }. This value specifies a daily recurrence pattern for running the job. The initialRun parameter of the create_classification_job command is set to True. This will cause the new job to analyze all eligible objects immediately after the job is created, in addition to on a daily basis.

Scheduled sensitive data discovery job in an active/idle state

Conclusion

In this post, we learned how we can use Amazon Macie to discover and protect sensitive data in Amazon S3. We learned how to use automation to trigger alerts based on Macie’s findings and to isolate data files based on the types of findings. The post’s data inspection workflow can easily be incorporated into existing data lake ingestion pipelines to ensure the integrity of incoming data.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS

Introduction

In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.

Amazon EMR

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache SparkHiveHBaseFlinkHudi, and ZeppelinJupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.

Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation, while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache Airflow’s UI

Apache recently announced the release of Airflow 2.0.0 on December 17, 2020. The latest 1.x version of Airflow is 1.10.14, released December 12, 2020. However, at the time of this post, Amazon MWAA was running Airflow 1.10.12, released August 25, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow 1.10.12 documentation.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.

Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the HooksSecretsSensors, and Operators of Airflow codebase’s contrib section.

Getting Started

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/aws-airflow-demo.git

Preliminary Steps

This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.

Configuring Amazon MWAA

The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Amazon MWAA Environment Creation Process

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12.

Amazon MWAA Environment Creation Process

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-. You must also enable Bucket Versioning on the bucket. Specify a dags folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

Amazon MWAA Environment Creation Process

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

Amazon MWAA Environment Creation Process

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

AWS CloudFormation Create Stack Console

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.

AWS CloudFormation Create Stack Console
Amazon MWAA Environment Creation Process

As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.

You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.

Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small class will be sufficient for this demonstration.

Amazon MWAA Environment Creation Process

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Amazon MWAA Environment Creation Process

Airflow Execution Role

As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole and EMR_EC2_DefaultRole. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole and EMR_EC2_DemoRole. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject permission.

Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:RunJobFlow"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::123412341234:role/EMR_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DefaultRole",
"arn:aws:iam::123412341234:role/EMR_DefaultRole"
]
}
]
}

The Airflow service role, created by MWAA, is shown below with the new policy attached.

Airflow Execution Service Role with the new Policy Attached

Final Architecture

Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Demonstration’s Amazon MWAA and Amazon EMR Architecture

Amazon MWAA Environment

The new MWAA Environment will include a link to the Airflow UI.

Amazon MWAA Environment Console

Airflow UI

Using the supplied link, you should be able to access the Airflow UI using your web browser.

Apache Airflow UI

Our First DAG

The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.utils.dates import days_ago
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': 'emr-6.2.0',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker

Upload the DAG to the Airflow S3 bucket’s dags directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.

aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/

The DAG, spark_pi_example, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

Apache Airflow UI’s Trigger DAG Page

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Apache Airflow UI’s DAG Graph View

Switching to the EMR Console, you should see the single-node EMR cluster being created.

Amazon EMR Console’s Summary tab

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Amazon EMR Console’s Steps tab

Triggering DAGs Programmatically

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.

Below is an example of triggering the spark_pi_example DAG programmatically using Airflow’s trigger_dag CLI command. You will need to replace the WEB_SERVER_HOSTNAME variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME variable assumes only one MWAA environment is returned by jq.

export WEB_SERVER_HOSTNAME="<your_airflow_web_server.us-east-1.airflow.amazonaws.com>"
export ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
export DAG_NAME=spark_pi_example
aws mwaa create-cli-token –name "${ENVIRONMENT_NAME}" | \
export CLI_TOKEN=$(jq -r .CliToken)
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME}"
view raw trigger_dag.sh hosted with ❤ by GitHub

Analytics Job with Airflow

Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py PySpark application. This job should already exist in the processed data S3 bucket.

The DAG, dags/bakery_sales.py, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators and airflow.contrib.sensors packages for EMR.

Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
SPARK_STEPS = [
{
'Name': 'Bakery Sales',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'–deploy-mode',
'cluster',
'–master',
'yarn',
'–conf',
'spark.yarn.submit.waitAppCompletion=true',
's3a://{{ var.value.work_bucket }}/analyze/bakery_sales_ssm.py'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': '{{ var.value.release_label }}',
'LogUri': 's3n://{{ var.value.logs_bucket }}',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.xlarge',
},
],
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': '{{ var.value.emr_ec2_key_pair }}',
},
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh',
}
},
],
'Configurations': [
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
'VisibleToAllUsers': True,
'JobFlowRole': '{{ var.value.job_flow_role }}',
'ServiceRole': '{{ var.value.service_role }}',
'EbsRootVolumeSize': 32,
'StepConcurrencyLevel': 1,
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Analyze Bakery Sales with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw bakery_sales.py hosted with ❤ by GitHub

Import Variables into Airflow UI

First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json file. You will need to update the values for bootstrap_bucket, emr_ec2_key_pair, logs_bucket, and work_bucket. The three S3 buckets should all exist from the previous post.

{
"bootstrap_bucket": "emr-demo-bootstrap-123412341234-us-east-1",
"emr_ec2_key_pair": "emr-demo-123412341234-us-east-1",
"job_flow_role": "EMR_EC2_DemoRole",
"logs_bucket": "emr-demo-logs-123412341234-us-east-1",
"release_label": "emr-6.2.0",
"service_role": "EMR_DemoRole",
"work_bucket": "emr-demo-work-123412341234-us-east-1",
"ec2_subnet_id": "subnet-012abc456efg78900"
}

Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Apache Airflow UI’s Admin > Variables tab

Upload the DAG, dags/bakery_sales.py, to the Airflow S3 bucket, similar to the first DAG.

aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/

The second DAG, bakery_sales, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json.

{
    "airflow_email": "analytics_team@example.com",
    "email_on_failure": false,
    "email_on_retry": false
}

This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Apache Airflow UI’s Trigger DAG Screen

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Amazon EMR Console’s Steps tab

Multi-Step DAG

In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator steps parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator job_flow_overrides parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.

We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.

Below we see a snippet of two of the four Spark submit-job job definitions (steps), which have been moved to a separate JSON file, emr_steps/emr_steps.json.

[
{
"Name": "Movie Ratings",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/movies_avg_ratings_ssm.py",
"–start-date",
"2016-01-01 00:00:00",
"–end-date",
"2016-12-31 23:59:59"
]
}
},
{
"Name": "Stock Volatility",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/stock_volatility_ssm.py",
"–start-date",
"2017-01-01",
"–end-date",
"2018-12-31"
]
}
}
]
view raw emr_steps.json hosted with ❤ by GitHub

Below are the EMR cluster specifications (job_flow_overrides), which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json.

{
"Name": "demo-cluster-airflow",
"ReleaseLabel": "{{ var.value.release_label }}",
"LogUri": "s3n://{{ var.value.logs_bucket }}",
"Applications": [
{
"Name": "Spark"
}
],
"Instances": {
"InstanceFleets": [
{
"Name": "MASTER",
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"Name": "CORE",
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
],
"Ec2SubnetId": "{{ var.value.ec2_subnet_id }}",
"KeepJobFlowAliveWhenNoSteps": false,
"TerminationProtected": false,
"Ec2KeyName": "{{ var.value.emr_ec2_key_pair }}"
},
"BootstrapActions": [
{
"Name": "string",
"ScriptBootstrapAction": {
"Path": "s3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh"
}
}
],
"Configurations": [
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
],
"VisibleToAllUsers": true,
"JobFlowRole": "{{ var.value.job_flow_role }}",
"ServiceRole": "{{ var.value.service_role }}",
"EbsRootVolumeSize": 32,
"StepConcurrencyLevel": 5,
"Tags": [
{
"Key": "Environment",
"Value": "Development"
},
{
"Key": "Name",
"Value": "Airflow EMR Demo Project"
},
{
"Key": "Owner",
"Value": "Data Analytics Team"
}
]
}

Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name), which loads the JSON.

import json
import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
def get_object(key, bucket_name):
"""
Load S3 object as JSON
"""
hook = S3Hook()
content_object = hook.get_key(key=key, bucket_name=bucket_name)
file_content = content_object.get()['Body'].read().decode('utf-8')
return json.loads(file_content)
with DAG(
dag_id=DAG_ID,
description='Run multiple Spark jobs with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval=None,
tags=['emr', 'spark', 'pyspark']
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=get_object('job_flow_overrides/job_flow_overrides.json', work_bucket)
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=get_object('emr_steps/emr_steps.json', work_bucket)
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
cluster_creator >> step_adder >> step_checker

This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.

aws s3 cp emr_steps/emr_steps.json \
    s3://emr-demo-work-123412341234-us-east-1/emr_steps/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
    s3://emr-demo-work-123412341234-us-east-1/job_flow_overrides/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234-us-east-1/dags/

The second DAG, multiple_steps, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json.

Apache Airflow UI’s DAGs tab

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.