Posts Tagged AWS

Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS

Import data from an Amazon RDS database into an Amazon S3-based data lake using Amazon EKS, Amazon MSK, and Apache Kafka Connect

Introduction

A data lake, according to AWS, is a centralized repository that allows you to store all your structured and unstructured data at any scale. Data is collected from multiple sources and moved into the data lake. Once in the data lake, data is organized, cataloged, transformed, enriched, and converted to common file formats, optimized for analytics and machine learning.

One of an organization’s first challenges when building a data lake is how to continually import data from different data sources, such as relational and non-relational database engines, enterprise ERP, SCM, CRM, and SIEM software, flat-files, messaging platforms, IoT devices, and logging and metrics collection systems. Each data source will have its own unique method of connectivity, security, data storage format, and data export capabilities. There are many closed- and open-source tools available to help extract data from different data sources.

A popular open-source tool is Kafka Connect, part of the Apache Kafka ecosystem. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. Kafka Connect makes it simple to quickly define connectors that move large collections of data into and out of Kafka.

In the following post, we will learn how to use Kafka Connect to export data from our data source, an Amazon RDS for PostgreSQL relational database, into Kafka. We will then export that data from Kafka into our data sink — a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 will be converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect.

Best of all, to maintain data freshness of the data lake, as data is added or updated in PostgreSQL, Kafka Connect will automatically detect those changes and stream those changes into the data lake. This process is commonly referred to as Change Data Capture (CDC).

High-level architecture for this post’s demonstration

Change Data Capture

According to Gunnar Morling, Principal Software Engineer at Red Hat who works on the Debezium and Hibernate projects and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

Joker 2021: Change data capture pipelines with Debezium and Kafka Streams (image: YouTube)

You can find another good explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.

Query-based vs. Log-based CDC

To effectively demonstrate the difference between query-based and log-based CDC, examine the results of a SQL UPDATE statement, captured with both methods.

UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;

Here is how the change is represented as a JSON message payload using the query-based CDC method described in this post.

{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}

Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.

{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}

In an upcoming post, we will explore Debezium along with Apache Arvo and a schema registry to build a log-based CDC solution using PostgreSQL’s write-ahead log (WAL). In this post, we will examine query-based CDC using the ‘update timestamp’ technique.

Kafka Connect Connectors

In this post, we will use source and sink connectors from Confluent. Confluent is the undisputed leader in providing enterprise-grade managed Kafka through their Confluent Cloud and Confluent Platform products. Confluent offers dozens of source and sink connectors that cover the most popular data sources and sinks. Connectors used in this post will include:

  • Confluent’s Kafka Connect JDBC Source connector imports data from any relational database with a JDBC driver into an Apache Kafka topic. The Kafka Connect JDBC Sink connector exports data from Kafka topics to any relational database with a JDBC driver.
  • Confluent’s Kafka Connect Amazon S3 Sink connector exports data from Apache Kafka topics to S3 objects in either Avro, Parquet, JSON, or Raw Bytes.

Prerequisites

This post will focus on data movement with Kafka Connect, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:

  1. Amazon RDS for PostgreSQL instance (data source);
  2. Amazon S3 bucket (data sink);
  3. Amazon MSK cluster;
  4. Amazon EKS cluster;
  5. Connectivity between the Amazon RDS instance and Amazon MSK cluster;
  6. Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
  7. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true. This setting is false by default;
  8. IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);

As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.

Source Code

All source code for this post, including the Kafka Connect configuration files and the Helm chart, is open-sourced and located on GitHub.

Authentication and Authorization

Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK, Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.

Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (IRSA) allows EKS to access MSK and S3 using IAM (see more details below).

Sample PostgreSQL Database

There are many sample PostgreSQL databases we could use to explore Kafka Connect. One of my favorite, albeit a bit dated, is PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytics costs.

Pagila database schema diagram

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.

Last Updated Trigger

Each table in the Pagila database has a last_update field. A convenient way to detect changes in the Pagila database, and ensure those changes make it from RDS to S3, is to have Kafka Connect use the last_update field. This is a common technique to determine if and when changes were made to data using query-based CDC.

As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.

CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();

Kubernetes-based Kafka Connect

There are several options for deploying and managing Kafka Connect and other required Kafka management tools to Kubernetes on Amazon EKS. Popular solutions include Strimzi and Confluent for Kubernetes (CFK) or building your own Docker Image using the official Apache Kafka binaries. For this post, I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries. I then installed Confluent’s connectors and their dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf OSS container, building your own image can really teach you how Kafka and Kafka Connect work, in my opinion.

If you chose to use the same Kafka Connect Image used in this post, a Helm Chart is included in the post’s GitHub repository. The Helm chart will deploy a single Kubernetes pod to the kafka Namespace on Amazon EKS.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.0.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent

Before deploying the chart, update the value.yaml file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on MSK and to S3 from Kafka Connect running on EKS.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kafka-cluster:*",
"Resource": [
"arn:aws:kafka:us-east-1:111222333444:cluster/*/*",
"arn:aws:kafka:us-east-1:111222333444:group/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:transactional-id/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:topic/*/*/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>/*"
}
]
}

Once the Service Account variable is updated, use the following command to deploy the Helm chart:

helm install kafka-connect-msk ./kafka-connect-msk \
--namespace $NAMESPACE --create-namespace

To get a shell to the running Kafka Connect container, use the following kubectl exec command:

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash
Interacting with Kafka Connect container running on EKS

Configure Bootstrap Brokers

Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone and distributed modes. Since we will use Kafka Connect’s distributed mode, modify the config/connect-distributed.properties file. A complete sample of the configuration file I used in this post is shown below.

Kafka Connect will run within the pod’s container, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:

# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')

Update the config/connect-distributed.properties file.

# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

For convenience when executing Kafka commands, set the BBROKERS environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:

export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"

Confirm Access to Amazon MSK from Kafka Connect

To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties

You can also try listing the existing Kafka consumer groups:

bin/kafka-consumer-groups.sh --list \
  --bootstrap-server $BBROKERS \
  --command-config config/client-iam.properties

If either of these fails, you will likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.

Kafka Connect

I recommend starting Kafka Connect as a background process using either method shown below.

bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
# alternately use nohup
nohup bin/connect-distributed.sh \
config/connect-distributed.properties &

To confirm Kafka Connect started properly, immediately tail the connect.log file. The log will capture any startup errors for troubleshooting.

tail -f logs/connect.log
Kafka Connect log showing Kafka Connect starting as a background process

You can also examine the background process with the ps command to confirm Kafka Connect is running. Note the process with PID 4915, below. Use the kill command along with the PID to stop Kafka Connect if necessary.

Kafka Connect running as a background process

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, the first time it starts up, as defined in the config/connect-distributed.properties file: connect-configs, connect-offsets, and connect-status. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-

Kafka Connect Connectors

This post demonstrates three progressively more complex Kafka Connect source and sink connectors. Each will demonstrate different connector capabilities to import/export and transform data between Amazon RDS for PostgreSQL and Amazon S3.

Connector Source #1

Create a new file (or modify the existing file if using my Kafka Connect container) named config/jdbc_source_connector_postgresql_00.json. Modify lines 3–5, as shown below, to reflect your RDS instance’s JDBC connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.public.",
"poll.interval.ms": 5000,
"mode": "timestamp",
"catalog.pattern": "public",
"table.whitelist": "address, city, country",
"timestamp.column.name": "last_update"
}

This first Kafka Connect source connector uses Confluent’s Kafka Connect JDBC Source connector (io.confluent.connect.jdbc.JdbcSourceConnector) to export data from RDS with a JDBC driver and import that data into a series of Kafka topics. We will be exporting data from three tables in Pagila’s public schema: address, city, and country. We will write that data to a series of topics, arbitrarily prefixed with database name and schema, pagila.public.. The source connector will create the three new topics automatically: pagila.public.address , pagila.public.city , and pagila.public.country.

Note the connector’s mode property value is set to timestamp, and the last_update field is referenced in the timestamp.column.name property. Recall we added the database function and triggers to these three tables earlier in the post, which will update the last_update field whenever a record is created or updated in the Pagila database. In addition to an initial export of the entire table, the source connector will poll the database every 5 seconds (poll.interval.ms property), looking for changes that are newer than the most recently exported last_modified date. This is accomplished by the source connector, using a parameterized query, such as:

SELECT *
FROM "public"."address"
WHERE "public"."address"."last_update" > ?
AND "public"."address"."last_update" < ?
ORDER BY "public"."address"."last_update" ASC

Connector Sink #1

Next, create and configure the first Kafka Connect sink connector. Create a new file or modify config/s3_sink_connector_00.json. Modify line 7, as shown below to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "pagila.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "your-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}

This first Kafka Connect sink connector uses Confluent’s Kafka Connect Amazon S3 Sink connector (io.confluent.connect.s3.S3SinkConnector) to export data from Kafka topics to Amazon S3 objects in JSON format.

Deploy Connectors #1

Deploy the source and sink connectors using the Kafka Connect REST Interface. Many tutorials demonstrate a POST method against the /connectors endpoint. However, this then requires a DELETE and an additional POST to update the connector. Using a PUT against the /config endpoint, you can update the connector without first issuing a DELETE.

curl -s -d @"config/jdbc_source_connector_postgresql_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/config | jq
curl -s -d @"config/s3_sink_connector_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_00/config | jq

You can confirm the source and sink connectors are deployed and running using the following commands:

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_00/status | jq
Kafka Connect source connector running successfully

Errors preventing the connector from starting correctly will be displayed using the /status endpoint, as shown in the example below. In this case, the Kubernetes Service Account associated with the pod lacked the proper IAM permissions to the Amazon S3 target bucket.

Kafka Connect sink connector failed to run with errors

Confirming Success of Connectors #1

The entire contents of the three tables will be exported from RDS to Kafka by the source connector, then exported from Kafka to S3 by the sink connector. To confirm the source connector worked, verify the existence of three new Kafka topics that should have been created: pagila.public.address, pagila.public.city, and pagila.public.country.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep pagila.public.

To confirm the sink connector worked, verify the new S3 objects have been created in the data lake’s S3 bucket. If you use the AWS CLI v2’s s3 API, we can view the contents of our target S3 bucket:

aws s3api list-objects \
--bucket your-s3-bucket \
--query 'Contents[].{Key: Key}' \
--output text

You should see approximately 15 new S3 objects (JSON files) in the S3 bucket, whose keys are organized by their topic names. The sink connector flushes new data to S3 every 100 records, or 60 seconds.

topics/pagila.public.address/partition=0/pagila.public.address+0+0000000000.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000200.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000300.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000400.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000500.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000600.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000000.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000100.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000200.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000300.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000400.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000500.json
topics/pagila.public.country/partition=0/pagila.public.country+0+0000000000.json
topics/pagila.public.country/partition=0/pagila.public.country+0+0000000100.json

You could also use the AWS Management Console to view the S3 bucket’s contents.

Amazon S3 bucket showing results of Kafka Connect S3 sink connector, organized by topic names

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

For example, the address table’s data will look similar to the following using the ‘Query with S3 Select’ feature via the console or API:

{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000
}

Congratulations, you have successfully imported data from a relational database into your data lake using Kafka Connect!

Connector Source #2

Create a new file or modify config/jdbc_source_connector_postgresql_01.json. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.public.alt.",
"poll.interval.ms": 5000,
"mode": "timestamp",
"timestamp.column.name": "last_update",
"catalog.pattern": "public",
"table.whitelist": "address",
"numeric.mapping": "best_fit",
"transforms": "createKey,extractInt,InsertTopic,InsertSourceDetails",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "address_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "address_id",
"validate.non.null": "false",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertSourceDetails.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field": "message_source",
"transforms.InsertSourceDetails.static.value": "pagila"
}

This second Kafka Connect source connector also uses Confluent’s Kafka Connect JDBC Source connector to export data from the just address table with a JDBC driver and import that data into a new Kafka topic, pagila.public.alt.address. The difference with this source connector is transforms, known as Single Message Transformations (SMTs). SMTs are applied to messages as they flow through Connect from RDS to Kafka.

In this connector, there are four transforms, which perform the following common functions:

  1. Extract address_id integer field as the Kafka message key, as detailed in this blog post by Confluence (see ‘Setting the Kafka message key’).
  2. Append Kafka topic name into message as a new static field;
  3. Append database name into message as a new static field;

Connector Sink #2

Create a new file or modify config/s3_sink_connector_01.json. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics": "pagila.public.alt.address",
"s3.region": "us-east-1",
"s3.bucket.name": "you-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}

This second sink connector is nearly identical to the first sink connector, except it only exports data from a single Kafka topic, pagila.public.alt.address, into S3.

Deploy Connectors #2

Deploy the second set of source and sink connectors using the Kafka Connect REST Interface, exactly like the first pair.

curl -s -d @"config/jdbc_source_connector_postgresql_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/config | jq
curl -s -d @"config/s3_sink_connector_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_01/config | jq

Confirming Success of Connectors #2

Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first set, for a total of four connectors.

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_01/status | jq
Kafka Connect source and sink connectors running successfully

To view the results of the first transform, extracting the address_id integer field as the Kafka message key, we can use a Kafka command-line consumer:

bin/kafka-console-consumer.sh \
--topic pagila.public.alt.address \
--offset 102 --partition 0 --max-messages 5 \
--property print.key=true --property print.value=true \
--property print.offset=true --property print.partition=true \
--property print.headers=false --property print.timestamp=false \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties

In the output below, note the beginning of each message, which displays the Kafka message key, identical to the address_id. For example, {"type":"int32","optional":false},"payload":100}.

Output showing messages in the Kafka pagila.public.alt.address topic

Examing the Amazon S3 bucket using the AWS Management Console or the CLI, you should note the fourth set of S3 objects within the /topics/pagila.public.alt.address/ object key prefix.

Amazon S3 bucket showing JSON-format files containing address data

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.alt.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

In the sample data below, note the two new fields that have been appended into each record, a result of the Kafka Connector transforms:

{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}

Congratulations, you have successfully imported more data from a relational database into your data lake, including performing a simple series of transforms using Kafka Connect!

Connector Source #3

Create or modify config/jdbc_source_connector_postgresql_02.json. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.query",
"poll.interval.ms": 5000,
"mode": "timestamp",
"timestamp.column.name": "last_update",
"query": "SELECT * FROM (SELECT a.address_id, a.address, a.address2, city.city, a.district, a.postal_code, country.country, a.phone, a.last_update FROM address AS a INNER JOIN city ON a.city_id = city.city_id INNER JOIN country ON country.country_id = city.country_id ORDER BY address_id) AS subquery",
"incrementing.column.name": "address_id",
"transforms": "createKey,extractInt,InsertTopic,InsertSourceDetails",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "address_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "address_id",
"validate.non.null": "false",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertSourceDetails.type": "org.apache.kafka.connect.transforms.InsertField$Value",
}

Unlike the first two source connectors that export data from tables, this connector uses a SELECT query to export data from the Pagila database’s address , city, and country tables and import the results of that SQL query data into a new Kafka topic, pagila.public.alt.address. The SQL query in the source connector’s configuration is as follows:

SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses

The final parameterized query, executed by the source connector, which allows it to detect changes based on the last_update field is as follows:

SELECT *
FROM (SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
WHERE "last_update" > ?
AND "last_update" < ?
ORDER BY "last_update" ASC

Connector Sink #3

Create or modify config/s3_sink_connector_02.json. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics": "pagila.query",
"s3.region": "us-east-1",
"s3.bucket.name": "your-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "gzip",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"partition.field.name": "country",
"schema.compatibility": "NONE",
"transforms": "RenameField, insertStaticField1,insertStaticField2,insertStaticField3",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "district:state_province",
"transforms.insertStaticField1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "message_source",
"transforms.insertStaticField1.static.value": "pagila",
"transforms.insertStaticField2.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField2.static.field": "message_source_engine",
"transforms.insertStaticField2.static.value": "postgresql",
"transforms.insertStaticField3.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField3.static.field": "environment",
"transforms.insertStaticField3.static.value": "development"
}

This sink connector is significantly different than the previous two sink connectors. In addition to leveraging SMTs in the corresponding source connector, we are also using them in this sink connector. The sink connect appends three arbitrary static fields to each record as it is written to Amazon S3 — message_source, message_source_engine, and environment using the InsertField transform. The sink connector also renames the district field to state_province using the ReplaceField transform.

The first two sink connectors wrote uncompressed JSON-format files to Amazon S3. This third sink connector optimizes the data imported into S3 for downstream data analytics. The sink connector writes GZIP-compressed Apache Parquet files to Amazon S3. In addition, the compressed Parquet files are partitioned by the country field. Using a columnar file format, compression, and partitioning, queries against the data should be faster and more efficient.

Deploy Connectors #3

Deploy the final source and sink connectors using the Kafka Connect REST Interface, exactly like the first two pairs.

curl -s -d @"config/jdbc_source_connector_postgresql_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/config | jq
curl -s -d @"config/s3_sink_connector_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_02/config | jq

Confirming Success of Connectors #3

Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first two sets, for a total of six connectors.

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_02/status | jq
Kafka Connect source and sink connectors running successfully

Reviewing the messages within the newpagila.query topic, note the message_topic field has been appended to the message by the source connector but not message_source, message_source_engine, and environment fields. The sink connector appends these fields as it writes the messages to S3. Also, note the district field has yet to be renamed by the sink connector to state_province.

Output showing messages in the Kafka pagila.query topic

Examing the Amazon S3 bucket, again, you should note the fifth set of S3 objects within the /topics/pagila.query/ object key prefix. The Parquet-format files within are partitioned by country.

Amazon S3 bucket showing data partitioned by Country

Within each country partition, there are Parquet files whose records contain addresses within those countries.

Amazon S3 bucket showing GZIP-compressed Apache Parquet-format files

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.query/country=United States/pagila.query+0+0000000003.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

In the sample data below, note the four new fields that have been appended into each record, a result of the source and sink connector SMTs. Also, note the renamed district field:

{
"address_id": 599,
"address": "1895 Zhezqazghan Drive",
"address2": "",
"city": "Garden Grove",
"state_province": "California",
"postal_code": "36693",
"country": "United States",
"phone": "137809746111",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 6,
"address": "1121 Loja Avenue",
"address2": "",
"city": "San Bernardino",
"state_province": "California",
"postal_code": "17886",
"country": "United States",
"phone": "838635286649",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 18,
"address": "770 Bydgoszcz Avenue",
"address2": "",
"city": "Citrus Heights",
"state_province": "California",
"postal_code": "16266",
"country": "United States",
"phone": "517338314235",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}

Record Updates and Query-based CDC

What happens when we change data within the tables that Kafka Connect is polling every 5 seconds? To answer this question, let’s make a few DML changes:

-- update address field
UPDATE public.address
SET address = '123 CDC Test Lane'
WHERE address_id = 100;
-- update address2 field
UPDATE public.address
SET address2 = 'Apartment #2201'
WHERE address_id = 101;
-- second update to same record
UPDATE public.address
SET address2 = 'Apartment #2202'
WHERE address_id = 101;

-- insert new country
INSERT INTO public.country (country)
values ('Wakanda');
-- should be 110
SELECT country_id FROM country WHERE country='Wakanda';
-- insert new city
INSERT INTO public.city (city, country_id)
VALUES ('Birnin Zana', 110);
-- should be 601
SELECT city_id FROM public.city WHERE country_id=110;
-- update city_id to new city_id
UPDATE public.address
SET phone = city_id = 601
WHERE address_id = 102;
-- second update to same record
UPDATE public.address
SET district = 'Lake Turkana'
WHERE address_id = 102;
-- delete an address record
UPDATE public.customer
SET address_id = 200
WHERE customer_id IN (
SELECT customer_id FROM customer WHERE address_id = 104);
DELETE
FROM public.address
WHERE address_id = 104;

To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The three Kafka Connect source connectors detect changes, which are exported from PostgreSQL to Kafka. The three sink connectors then write these changes to new JSON and Parquet files to the target S3 bucket.

Kafka Connect log showing changes to Pagila database being exported/imported

Viewing Data in the Data Lake

A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

AWS Glue Data Catalog showing five new tables, the result of the AWS Glue Crawler

When writing Parquet into partitions, one shortcoming of the Kafka Connect S3 sink connector is duplicate column names in AWS Glue. As a result, any columns used as partitions are duplicated in the Glue Data Catalog’s database table schema. The issue will result in an error similar to HIVE_INVALID_METADATA: Hive metadata for table pagila_query is invalid: Table descriptor contains duplicate columns when performing queries. To remedy this, predefine the table and the table’s schema. Alternately, edit the Glue Data Catalog table’s schema after crawling and remove the duplicate, non-partition column(s). Below, that would mean removing duplicate country column 7.

AWS Glue Data Catalog table schema showing duplicate column

Performing a typical SQL SELECT query in Athena will return all of the original records as well as the changes we made earlier as duplicate records (same address_id primary key).

Amazon Athena showing the SQL query and the result set
SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM "pagila_kafka_connect"."pagila_query"
WHERE address_id BETWEEN 100 AND 105
ORDER BY address_id;

Note the original records for address_id 100–103 as well as each change we made earlier. The last_update field reflects the date and time the record was created or updated. Also, note the record with address_id 104 in the query results. This is the record we deleted from the Pagila database.

address_id address address2 city state_province postal_code country last_update
100 1308 Arecibo Way Augusta-Richmond County Georgia 30695 United States 2017-02-15 09:45:30.000
100 123 CDC Test Lane Augusta-Richmond County Georgia 30695 United States 2021-08-09 14:10:29.126
101 1599 Plock Drive Tete Tete 71986 Mozambique 2017-02-15 09:45:30.000
101 1599 Plock Drive Apartment #2201 Tete Tete 71986 Mozambique 2021-08-09 14:10:29.467
101 1599 Plock Drive Apartment #2202 Tete Tete 71986 Mozambique 2021-08-09 14:19:03.761
102 669 Firozabad Loop al-Ayn Abu Dhabi 92265 United Arab Emirates 2017-02-15 09:45:30.000
102 669 Firozabad Loop Birnin Zana Abu Dhabi 92265 Wakanda 2021-08-09 14:10:29.789
102 669 Firozabad Loop Birnin Zana Lake Turkana 92265 Wakanda 2021-08-09 15:56:53.323
103 588 Vila Velha Manor Kimchon Kyongsangbuk 51540 South Korea 2017-02-15 09:45:30.000
104 1913 Kamakura Place Jelets Lipetsk 97287 Russian Federation 2017-02-15 09:45:30.000
105 733 Mandaluyong Place Abha Asir 77459 Saudi Arabia 2017-02-15 09:45:30.000
view raw pagila_query_01.csv hosted with ❤ by GitHub

To view only the most current data, we can use Athena’s ROW_NUMBER() function:

SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1
AND address_id BETWEEN 100 AND 105
ORDER BY address_id;

Now, we only see the latest records. Unfortunately, the record we deleted with address_id 104 is still present in the query results.

address_id address address2 city state_province postal_code country last_update
100 123 CDC Test Lane Augusta-Richmond County Georgia 30695 United States 2021-08-09 14:10:29.126
101 1599 Plock Drive Apartment #2202 Tete Tete 71986 Mozambique 2021-08-09 14:19:03.761
102 669 Firozabad Loop Birnin Zana Lake Turkana 92265 Wakanda 2021-08-09 15:56:53.323
103 588 Vila Velha Manor Kimchon Kyongsangbuk 51540 South Korea 2017-02-15 09:45:30.000
104 1913 Kamakura Place Jelets Lipetsk 97287 Russian Federation 2017-02-15 09:45:30.000
105 733 Mandaluyong Place Abha Asir 77459 Saudi Arabia 2017-02-15 09:45:30.000
view raw pagila_query_02.csv hosted with ❤ by GitHub

Using log-based CDC with Debezium, as opposed to query-based CDC, we would have received a record in S3 that indicated the delete. The null value message, shown below, is referred to as a tombstone message in Kafka. Note the ‘before’ syntax with the delete record as opposed to the ‘after’ syntax we observed earlier with the update record.

{
"before": {
"address": "",
"address2": null,
"phone": "",
"district": "",
"last_update": "1970-01-01T00:00:00Z",
"address_id": 104,
"postal_code": null,
"city_id": 0
},

"source": {
"schema": "public",
"sequence": "[\"1101256482032\",\"1101256482032\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1101256483936,
"name": "pagila",
"txId": 17137,
"version": "1.6.1.Final",
"ts_ms": 1628864251512,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "d",
"ts_ms": 1628864251671
}

An inefficient solution to duplicates and deletes with query-based CDC would be to bulk ingest the entire query result set from the Pagila database each time instead of only the changes based on the last_update field. Performing an unbounded query repeatedly on a huge dataset would negatively impact database performance. Notwithstanding, you would still end up with duplicates in the data lake unless you first purged the data in S3 before re-importing the new query results.

Data Movement

Using Amazon Athena, we can easily write the results of our ROW_NUMBER() query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT (CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

High-level architecture for this post’s demonstration

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by country, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.

CREATE TABLE pagila_kafka_connect.pagila_query_processed
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['country'],
external_location='s3://your-s3-bucket/processed/pagila_query'
) AS
SELECT address_id, last_update, address, address2, city,
state_province, postal_code, country
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_update DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1 AND address_id BETWEEN 0 and 100
ORDER BY address_id;

Examing the Amazon S3 bucket, on last time, you should new set of S3 objects within the /processed/pagila_query/ key path. The Parquet-format files, partitioned by country, are the result of the CTAS query.

Amazon S3 bucket showing SNAPPY-compressed Parquet-format files containing CTAS query results

We should now see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS query. We can perform additional queries on the processed data.

Amazon Athena showing query results from the processed data table in AWS Glue Data Catalog

ACID Transactions with a Data Lake

To fully take advantage of CDC and maximize the freshness of data in the data lake, we would also need to adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level updates and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.

Conclusion

This post explored how CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon EKS, Amazon MSK, and Apache Kafka Connect. We learned about query-based CDC for capturing ongoing changes to the source data. In a subsequent post, we will explore log-based CDC using Debezium and see how data lake file formats like Apache Avro, Apache Hudi, Apache Iceberg, and Delta Lake can help us manage the data in our data lake.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

Securely Decoupling Kubernetes-based Applications on Amazon EKS using Kafka with SASL/SCRAM

Securely decoupling Go-based microservices on Amazon EKS using Amazon MSK with IRSA, SASL/SCRAM, and data encryption

Introduction

This post will explore a simple Go-based application deployed to Kubernetes using Amazon Elastic Kubernetes Service (Amazon EKS). The microservices that comprise the application communicate asynchronously by producing and consuming events from Amazon Managed Streaming for Apache Kafka (Amazon MSK).

High-level application and AWS infrastructure architecture for the post

Authentication and Authorization for Apache Kafka

According to AWS, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients, and Apache Kafka ACLs to allow or deny actions.

For this post, our Amazon MSK cluster will use SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Mechanism) username and password-based authentication to increase security. Credentials used for SASL/SCRAM authentication will be securely stored in AWS Secrets Manager and encrypted using AWS Key Management Service (KMS).

Data Encryption

Data at rest in the MSK cluster will be encrypted at rest using Amazon MSK’s integration with AWS KMS to provide transparent server-side encryption. Encryption in transit of data moving between the brokers of the MSK cluster will be provided using Transport Layer Security (TLS 1.2).

Resource Management

AWS resources for Amazon MSK will be created and managed using HashiCorp Terraform, a popular open-source infrastructure-as-Code (IaC) software tool. Amazon EKS resources will be created and managed with eksctl, the official CLI for Amazon EKS sponsored by Weaveworks. Lastly, Kubernetes resources will be created and managed with Helm, the open-source Kubernetes package manager.

Demonstration Application

The Go-based microservices, which compose the demonstration application, will use Segment’s popular kafka-go client. Segment is a leading customer data platform (CDP). The microservices will access Amazon MSK using Kafka broker connection information stored in AWS Systems Manager (SSM) Parameter Store.

Source Code

All source code for this post, including the demonstration application, Terraform, and Helm resources, are open-sourced and located on GitHub.garystafford/terraform-msk-demo
Terraform project for using Amazon Managed Streaming for Apache Kafka (Amazon MSK) from Amazon Elastic Kubernetes…github.com

Prerequisites

To follow along with this post’s demonstration, you will need recent versions of terraform, eksctl, and helm installed and accessible from your terminal. Optionally, it will be helpful to have git or gh, kubectl, and the AWS CLI v2 (aws).

Demonstration

To demonstrate the EKS and MSK features described above, we will proceed as follows:

  1. Deploy the EKS cluster and associated resources using eksctl;
  2. Deploy the MSK cluster and associated resources using Terraform;
  3. Update the route tables for both VPCs and associated subnets to route traffic between the peered VPCs;
  4. Create IAM Roles for Service Accounts (IRSA) allowing access to MSK and associated services from EKS, using eksctl;
  5. Deploy the Kafka client container to EKS using Helm;
  6. Create the Kafka topics and ACLs for MSK using the Kafka client;
  7. Deploy the Go-based application to EKS using Helm;
  8. Confirm the application’s functionality;

1. Amazon EKS cluster

To begin, create a new Amazon EKS cluster using Weaveworks’ eksctl. The default cluster.yaml configuration file included in the project will create a small, development-grade EKS cluster based on Kubernetes 1.20 in us-east-1. The cluster will contain a managed node group of three t3.medium Amazon Linux 2 EC2 worker nodes. The EKS cluster will be created in a new VPC.

apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: eks-kafka-demo
region: us-east-1
version: "1.20"
iam:
withOIDC: true
managedNodeGroups:
name: managed-ng-1
amiFamily: AmazonLinux2
instanceType: t3.medium
desiredCapacity: 3
minSize: 2
maxSize: 5
volumeSize: 120
volumeType: gp2
labels:
nodegroup-type: demo-app-workloads
tags:
nodegroup-name: managed-ng-1
nodegroup-role: worker
ssh:
enableSsm: true # use aws ssm instead of ssh – no need to open port 22
iam:
withAddonPolicies:
albIngress: true
autoScaler: true
cloudWatch: true
# cloudWatch:
# clusterLogging:
# enableTypes: ["*"]
view raw cluster.yaml hosted with ❤ by GitHub

Set the following environment variables and then run the eksctl create cluster command to create the new EKS cluster and associated infrastructure.

export AWS_ACCOUNT=$(aws sts get-caller-identity \
--output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="eks-kafka-demo"
eksctl create cluster -f ./eksctl/cluster.yaml

In my experience, it could take up to 25-40 minutes to fully build and configure the new 3-node EKS cluster.

Start of the Amazon EKS cluster creation using eksctl
Successful completion of the Amazon EKS cluster creation using eksctl

As part of creating the EKS cluster, eksctl will automatically deploy three AWS CloudFormation stacks containing the following resources:

  1. Amazon Virtual Private Cloud (VPC), subnets, route tables, NAT Gateways, security policies, and the EKS control plane;
  2. EKS managed node group containing Kubernetes three worker nodes;
  3. IAM Roles for Service Accounts (IRSA) that maps an AWS IAM Role to a Kubernetes Service Account;

Once complete, update your kubeconfig file so that you can connect to the new Amazon EKS cluster using the following AWS CLI command:

aws eks --region ${EKS_REGION} update-kubeconfig \
--name ${CLUSTER_NAME}

Review the details of the new EKS cluster using the following eksctl command:

eksctl utils describe-stacks \
--region ${EKS_REGION} --cluster ${CLUSTER_NAME}

Review the new EKS cluster in the Amazon Container Services console’s Amazon EKS Clusters tab.

New Amazon EKS cluster as seen from the Amazon Container Services console

Below, note the EKS cluster’s OpenID Connect URL. Support for IAM Roles for Service Accounts (IRSA) on the EKS cluster requires an OpenID Connect issuer URL associated with it. OIDC was configured in the cluster.yaml file; see line 8 (shown above).

New Amazon EKS cluster as seen from the Amazon Container Services console

The OpenID Connect identity provider, referenced in the EKS cluster’s console, created by eksctl, can be observed in the IAM Identity provider console.

EKS cluster’s OpenID Connect identity provider in the IAM Identity provider console

2. Amazon MSK cluster

Next, deploy the Amazon MSK cluster and associated network and security resources using HashiCorp Terraform.

Graphviz open source graph visualization of Terraform’s AWS resources

Before creating the AWS infrastructure with Terraform, update the location of the Terraform state. This project’s code uses Amazon S3 as a backend to store the Terraform’s state. Change the Amazon S3 bucket name to one of your existing buckets, located in the main.tf file.

terraform {
backend "s3" {
bucket = "terrform-us-east-1-your-unique-name"
key = "dev/terraform.tfstate"
region = "us-east-1"
}
}

Also, update the eks_vpc_id variable in the variables.tf file with the VPC ID of the EKS VPC created by eksctl in step 1.

variable "eks_vpc_id" {
default = "vpc-your-id"
}

The quickest way to obtain the ID of the EKS VPC is by using the following AWS CLI v2 command:

aws ec2 describe-vpcs --query 'Vpcs[].VpcId' \
--filters Name=tag:Name,Values=eksctl-eks-kafka-demo-cluster/VPC \
--output text

Next, initialize your Terraform backend in Amazon S3 and initialize the latesthashicorp/aws provider plugin with terraform init.

Use terraform plan to generate an execution plan, showing what actions Terraform would take to apply the current configuration. Terraform will create approximately 25 AWS resources as part of the plan.

Finally, use terraform apply to create the Amazon resources. Terraform will create a small, development-grade MSK cluster based on Kafka 2.8.0 in us-east-1, containing a set of three kafka.m5.large broker nodes. Terraform will create the MSK cluster in a new VPC. The broker nodes are spread across three Availability Zones, each in a private subnet, within the new VPC.

Start of the process to create the Amazon MSK cluster using Terraform
Successful creation of the Amazon MSK cluster using Terraform

It could take 30 minutes or more for Terraform to create the new cluster and associated infrastructure. Once complete, you can view the new MSK cluster in the Amazon MSK management console.

New Amazon MSK cluster as seen from the Amazon MSK console

Below, note the new cluster’s ‘Access control method’ is SASL/SCRAM authentication. The cluster implements encryption of data in transit with TLS and encrypts data at rest using a customer-managed customer master key (CMS) in AWM KSM.

New Amazon MSK cluster as seen from the Amazon MSK console

Below, note the ‘Associated secrets from AWS Secrets Manager.’ The secret, AmazonMSK_credentials, contains the SASL/SCRAM authentication credentials — username and password. These are the credentials the demonstration application, deployed to EKS, will use to securely access MSK.

New Amazon MSK cluster as seen from the Amazon MSK console

The SASL/SCRAM credentials secret shown above can be observed in the AWS Secrets Manager console. Note the customer-managed customer master key (CMK), stored in AWS KMS, which is used to encrypt the secret.

SASL/SCRAM credentials secret shown in the AWS Secrets Manager console

3. Update route tables for VPC Peering

Terraform created a VPC Peering relationship between the new EKS VPC and the MSK VPC. However, we will need to complete the peering configuration by updating the route tables. We want to route all traffic from the EKS cluster destined for MSK, whose VPC CIDR is 10.0.0.0/22, through the VPC Peering Connection resource. There are four route tables associated with the EKS VPC. Add a new route to the route table whose name ends with ‘PublicRouteTable’, for example, rtb-0a14e6250558a4abb / eksctl-eks-kafka-demo-cluster/PublicRouteTable. Manually create the required route in this route table using the VPC console’s Route tables tab, as shown below (new route shown second in list).

The EKS route table with a new route to MSK via the VPC Peering Connection

Similarly, we want to route all traffic from the MSK cluster destined for EKS, whose CIDR is 192.168.0.0/16, through the same VPC Peering Connection resource. Update the single MSK VPC’s route table using the VPC console’s Route tables tab, as shown below (new route shown second in list).

The MSK route table with a new route to EKS via the VPC Peering Connection

4. Create IAM Roles for Service Accounts (IRSA)

With both the EKS and MSK clusters created and peered, we are ready to start deploying Kubernetes resources. Create a new namespace, kafka, which will hold the demonstration application and Kafka client pods.

export AWS_ACCOUNT=$(aws sts get-caller-identity \
--output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="eks-kafka-demo"
export NAMESPACE="kafka"
kubectl create namespace $NAMESPACE

Then using eksctl, create two IAM Roles for Service Accounts (IRSA) associated with Kubernetes Service Accounts. The Kafka client’s pod will use one of the roles, and the demonstration application’s pods will use the other role. According to the eksctl documentation, IRSA works via IAM OpenID Connect Provider (OIDC) that EKS exposes, and IAM roles must be constructed with reference to the IAM OIDC Provider described earlier in the post, and a reference to the Kubernetes Service Account it will be bound to. The two IAM policies referenced in the eksctl commands below were created earlier by Terraform.

# kafka-demo-app role
eksctl create iamserviceaccount \
--name kafka-demo-app-sasl-scram-serviceaccount \
--namespace $NAMESPACE \
--region $EKS_REGION \
--cluster $CLUSTER_NAME \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSScramSecretManagerPolicy" \
--approve \
--override-existing-serviceaccounts
# kafka-client-msk role
eksctl create iamserviceaccount \
--name kafka-client-msk-sasl-scram-serviceaccount \
--namespace $NAMESPACE \
--region $EKS_REGION \
--cluster $CLUSTER_NAME \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSKafkaClientMSKPolicy" \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSScramSecretManagerPolicy" \
--approve \
--override-existing-serviceaccounts
# confirm successful creation of accounts
eksctl get iamserviceaccount \
--cluster $CLUSTER_NAME \
--namespace $NAMESPACE
kubectl get serviceaccounts -n $NAMESPACE
Successful creation of the two IAM Roles for Service Accounts (IRSA) using eksctl

Recall eksctl created three CloudFormation stacks initially. With the addition of the two IAM Roles, we now have a total of five CloudFormation stacks deployed.

Amazon EKS-related CloudFormation stacks created by eksctl

5. Kafka client

Next, deploy the Kafka client using the project’s Helm chart, kafka-client-msk. We will use the Kafka client to create Kafka topics and Apache Kafka ACLs. This particular Kafka client is based on a custom Docker Image that I have built myself using an Alpine Linux base image with Java OpenJDK 17, garystafford/kafka-client-msk. The image contains the latest Kafka client along with the AWS CLI v2 and a few other useful tools like jq. If you prefer an alternative, there are multiple Kafka client images available on Docker Hub.h

# purpose: Kafka client for Amazon MSK
# author: Gary A. Stafford
# date: 2021-07-20
FROM openjdk:17-alpine3.14
ENV KAFKA_VERSION="2.8.0"
ENV KAFKA_PACKAGE="kafka_2.13-2.8.0"
ENV AWS_MSK_IAM_AUTH="1.1.0"
ENV GLIBC_VER="2.33-r0"
RUN apk update && apk add –no-cache wget tar bash jq
# install glibc compatibility for alpine (req. for aws cli v2) and aws cli v2
# reference: https://github.com/aws/aws-cli/issues/4685#issuecomment-615872019
RUN apk –no-cache add binutils curl less groff \
&& curl -sL https://alpine-pkgs.sgerrand.com/sgerrand.rsa.pub -o /etc/apk/keys/sgerrand.rsa.pub \
&& curl -sLO https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VER}/glibc-${GLIBC_VER}.apk \
&& curl -sLO https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VER}/glibc-bin-${GLIBC_VER}.apk \
&& apk add –no-cache \
glibc-${GLIBC_VER}.apk \
glibc-bin-${GLIBC_VER}.apk \
&& curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip \
&& unzip awscliv2.zip \
&& aws/install \
&& rm -rf awscliv2.zip aws \
&& apk –no-cache del binutils curl \
&& rm glibc-${GLIBC_VER}.apk \
&& rm glibc-bin-${GLIBC_VER}.apk \
&& rm -rf /var/cache/apk/*
# setup java truststore
RUN cp $JAVA_HOME/lib/security/cacerts /tmp/kafka.client.truststore.jks
# install kafka
RUN wget https://downloads.apache.org/kafka/$KAFKA_VERSION/$KAFKA_PACKAGE.tgz \
&& tar -xzf $KAFKA_PACKAGE.tgz \
&& rm -rf $KAFKA_PACKAGE.tgz
WORKDIR /$KAFKA_PACKAGE
# install aws-msk-iam-auth jar
RUN wget https://github.com/aws/aws-msk-iam-auth/releases/download/$AWS_MSK_IAM_AUTH/aws-msk-iam-auth-$AWS_MSK_IAM_AUTH-all.jar \
&& mv aws-msk-iam-auth-$AWS_MSK_IAM_AUTH-all.jar libs/
CMD ["/bin/sh", "-c", "tail -f /dev/null"]
ENTRYPOINT ["/bin/bash"]
view raw Dockerfile hosted with ❤ by GitHub

The Kafka client only requires a single pod. Run the following helm commands to deploy the Kafka client to EKS using the project’s Helm chart, kafka-client-msk:

cd helm/
# perform dry run to validate chart
helm install kafka-client-msk ./kafka-client-msk \
--namespace $NAMESPACE --debug --dry-run
# apply chart resources
helm install kafka-client-msk ./kafka-client-msk \
--namespace $NAMESPACE
Successful deployment of the Kafka client’s Helm chart

Confirm the successful creation of the Kafka client pod with either of the following commands:

kubectl get pods -n kafka
kubectl describe pod -n kafka -l app=kafka-client-msk
Describing the Kafka client pod using kubectl

The ability of the Kafka client to interact with Amazon MSK, AWS SSM Parameter Store, and AWS Secrets Manager is based on two IAM policies created by Terraform, EKSKafkaClientMSKPolicy and EKSScramSecretManagerPolicy. These two policies are associated with a new IAM role, which in turn, is associated with the Kubernetes Service Account, kafka-client-msk-sasl-scram-serviceaccount. This service account is associated with the Kafka client pod as part of the Kubernetes Deployment resource in the Helm chart.

6. Kafka topics and ACLs for Kafka

Use the Kafka client to create Kafka topics and Apache Kafka ACLs. First, use the kubectl exec command to execute commands from within the Kafka client container.

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-client-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash

Once successfully attached to the Kafka client container, set the following three environment variables: 1) Apache ZooKeeper connection string, 2) Kafka bootstrap brokers, and 3) ‘Distinguished-Name’ of the Bootstrap Brokers (see AWS documentation). The values for these environment variables will be retrieved from AWS Systems Manager (SSM) Parameter Store. The values were stored in the Parameter store by Terraform during the creation of the MSK cluster. Based on the policy attached to the IAM Role associated with this Pod (IRSA), the client has access to these specific parameters in the SSM Parameter store.

export ZOOKPR=$(\
aws ssm get-parameter --name /msk/scram/zookeeper \
--query 'Parameter.Value' --output text)
export BBROKERS=$(\
aws ssm get-parameter --name /msk/scram/brokers \
--query 'Parameter.Value' --output text)
export DISTINGUISHED_NAME=$(\
echo $BBROKERS | awk -F' ' '{print $1}' | sed 's/b-1/*/g')

Use the env and grep commands to verify the environment variables have been retrieved and constructed properly. Your Zookeeper and Kafka bootstrap broker URLs will be uniquely different from the ones shown below.

env | grep 'ZOOKPR\|BBROKERS\|DISTINGUISHED_NAME'
Setting the required environment variables in the Kafka client container

To test the connection between EKS and MSK, list the existing Kafka topics, from the Kafka client container:

bin/kafka-topics.sh --list --zookeeper $ZOOKPR

You should see three default topics, as shown below.

The new MSK cluster’s default Kafka topics

If you did not properly add the new VPC Peering routes to the appropriate route tables in the previous step, establishing peering of the EKS and MSK VPCs, you are likely to see a timeout error while attempting to connect. Go back and confirm that both of the route tables are correctly updated with the new routes.

Connection timeout error due to incorrect configuration of VPC peering-related route tables

Kafka Topics, Partitions, and Replicas

The demonstration application produces and consumes messages from two topics, foo-topic and bar-topic. Each topic will have three partitions, one for each of the three broker nodes, along with three replicas.

Kafka topic’s relationship to partitions, replicas, and brokers

Use the following commands from the client container to create the two new Kafka topics. Once complete, confirm the creation of the topics using the list option again.

bin/kafka-topics.sh --create --topic foo-topic \
--partitions 3 --replication-factor 3 \
--zookeeper $ZOOKPR
bin/kafka-topics.sh --create --topic bar-topic \
--partitions 3 --replication-factor 3 \
--zookeeper $ZOOKPR
bin/kafka-topics.sh --list --zookeeper $ZOOKPR
Creating the two new Kafka topics

Review the details of the topics using the describe option. Note the three partitions per topic and the three replicas per topic.

bin/kafka-topics.sh --describe --topic foo-topic --zookeeper $ZOOKPR
bin/kafka-topics.sh --describe --topic bar-topic --zookeeper $ZOOKPR
Describing each of the two new Kafka topics

Kafka ACLs

According to Kafka’s documentation, Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses Zookeeper to store all the Access Control Lists (ACLs). Kafka ACLs are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H On Resource R.” You can read more about the ACL structure on KIP-11. To add, remove or list ACLs, you can use the Kafka authorizer CLI.

Authorize access by the Kafka brokers and the demonstration application to the two topics. First, allow access to the topics from the brokers using the DISTINGUISHED_NAME environment variable (see AWS documentation).

# read auth for brokers
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Read \
--group=consumer-group-B \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Read \
--group=consumer-group-A \
--topic bar-topic
# write auth for brokers
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Write \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Write \
--topic bar-topic

The three instances (replicas/pods) of Service A, part of consumer-group-A, produce messages to the foo-topic and consume messages from the bar-topic. Conversely, the three instances of Service B, part of consumer-group-B, produce messages to the bar-topic and consume messages from the foo-topic.

Message flow from and to microservices to Kafka topics

Allow access to the appropriate topics from the demonstration application’s microservices. First, set the USER environment variable — the MSK cluster’s SASL/SCRAM credential’s username, stored in AWS Secrets Manager by Terraform. We can retrieve the username from Secrets Manager and assign it to the environment variable with the following command.

export USER=$(
aws secretsmanager get-secret-value \
--secret-id AmazonMSK_credentials \
--query SecretString --output text | \
jq .username | sed -e 's/^"//' -e 's/"$//')

Create the appropriate ACLs.

# producers
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--producer \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--producer \
--topic bar-topic
# consumers
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--consumer \
--topic foo-topic \
--group consumer-group-B
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--consumer \
--topic bar-topic \
--group consumer-group-A

To list the ACLs you just created, use the following commands:

# list all ACLs
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--list
# list for individual topics, e.g. foo-topic
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--list \
--topic foo-topic
Kafka ACLs associated with the foo-topic Kafka topic

7. Deploy example application

We should finally be ready to deploy our demonstration application to EKS. The application contains two Go-based microservices, Service A and Service B. The origin of the demonstration application’s functionality is based on Soham Kamani’s September 2020 blog post, Implementing a Kafka Producer and Consumer In Golang (With Full Examples) For Production. All source Go code for the demonstration application is included in the project.

.
├── Dockerfile
├── README.md
├── consumer.go
├── dialer.go
├── dialer_scram.go
├── go.mod
├── go.sum
├── main.go
├── param_store.go
├── producer.go
└── tls.go

Both microservices use the same Docker image, garystafford/kafka-demo-service, configured with different environment variables. The configuration makes the two services operate differently. The microservices use Segment’s kafka-go client, as mentioned earlier, to communicate with the MSK cluster’s broker and topics. Below, we see the demonstration application’s consumer functionality (consumer.go).

package main
import (
"context"
"github.com/segmentio/kafka-go"
)
func consume(ctx context.Context) {
dialer := saslScramDialer()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic2,
GroupID: group,
Logger: kafka.LoggerFunc(log.Debugf),
Dialer: dialer,
})
for {
msg, err := r.ReadMessage(ctx)
if err != nil {
log.Panicf("%v could not read message: %v", getHostname(), err.Error())
}
log.Debugf("%v received message: %v", getHostname(), string(msg.Value))
}
}
view raw consumer.go hosted with ❤ by GitHub

The consumer above and the producer both connect to the MSK cluster using SASL/SCRAM. Below, we see the SASL/SCRAM Dialer functionality. This Dialer type mirrors the net.Dialer API but is designed to open Kafka connections instead of raw network connections. Note how the function can access AWS Secrets Manager to retrieve the SASL/SCRAM credentials.

package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/secretsmanager"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"time"
)
var (
secretId = "AmazonMSK_credentials"
versionStage = "AWSCURRENT"
)
type credentials struct {
username string
password string
}
func getCredentials() credentials {
svc := secretsmanager.New(sess)
input := &secretsmanager.GetSecretValueInput{
SecretId: aws.String(secretId),
VersionStage: aws.String(versionStage),
}
result, err := svc.GetSecretValue(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case secretsmanager.ErrCodeResourceNotFoundException:
log.Error(secretsmanager.ErrCodeResourceNotFoundException, aerr.Error())
case secretsmanager.ErrCodeInvalidParameterException:
log.Error(secretsmanager.ErrCodeInvalidParameterException, aerr.Error())
case secretsmanager.ErrCodeInvalidRequestException:
log.Error(secretsmanager.ErrCodeInvalidRequestException, aerr.Error())
case secretsmanager.ErrCodeDecryptionFailure:
log.Error(secretsmanager.ErrCodeDecryptionFailure, aerr.Error())
case secretsmanager.ErrCodeInternalServiceError:
log.Error(secretsmanager.ErrCodeInternalServiceError, aerr.Error())
default:
log.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Error(err.Error())
}
}
kmsCredentials := map[string]string{}
if err := json.Unmarshal([]byte(*result.SecretString), &kmsCredentials); err != nil {
log.Panic(err.Error())
}
return credentials{
username: kmsCredentials["username"],
password: kmsCredentials["password"],
}
}
func saslScramDialer() *kafka.Dialer {
credentials := getCredentials()
mechanism, err := scram.Mechanism(
scram.SHA512,
credentials.username,
credentials.password,
)
if err != nil {
log.Fatal(err)
}
config := tlsConfig()
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: config,
SASLMechanism: mechanism,
}
return dialer
}
view raw dialer_scram.go hosted with ❤ by GitHub

We will deploy three replicas of each microservice (three pods per microservices) using Helm. Below, we see the Kubernetes Deployment and Service resources for each microservice.

apiVersion: v1
kind: Service
metadata:
name: kafka-demo-service-a
labels:
app: kafka-demo-service-a
component: service
spec:
ports:
name: http
port: 8080
selector:
app: kafka-demo-service-a
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-demo-service-a
labels:
app: kafka-demo-service-a
component: service
spec:
replicas: {{ .Values.kafkaDemoService.replicaCount }}
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-demo-service-a
component: service
template:
metadata:
labels:
app: kafka-demo-service-a
component: service
spec:
serviceAccountName: {{ .Values.kafkaDemoService.serviceAccountName }}
containers:
image: {{ .Values.kafkaDemoService.image.image }}
name: kafka-demo-service-a
ports:
containerPort: {{ .Values.kafkaDemoService.image.ports.containerPort }}
imagePullPolicy: {{ .Values.kafkaDemoService.image.pullPolicy }}
env:
name: LOG_LEVEL
value: "debug"
name: TOPIC1
value: "foo-topic"
name: TOPIC2
value: "bar-topic"
name: GROUP
value: "consumer-group-A"
name: MSG_FREQ
value: "10"
apiVersion: v1
kind: Service
metadata:
name: kafka-demo-service-b
labels:
app: kafka-demo-service-b
component: service
spec:
ports:
name: http
port: 8080
selector:
app: kafka-demo-service-b
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-demo-service-b
labels:
app: kafka-demo-service-b
component: service
spec:
replicas: {{ .Values.kafkaDemoService.replicaCount }}
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-demo-service-b
component: service
template:
metadata:
labels:
app: kafka-demo-service-b
component: service
spec:
serviceAccountName: {{ .Values.kafkaDemoService.serviceAccountName }}
containers:
image: {{ .Values.kafkaDemoService.image.image }}
name: kafka-demo-service-b
ports:
containerPort: {{ .Values.kafkaDemoService.image.ports.containerPort }}
imagePullPolicy: {{ .Values.kafkaDemoService.image.pullPolicy }}
env:
name: LOG_LEVEL
value: "debug"
name: TOPIC1
value: "bar-topic"
name: TOPIC2
value: "foo-topic"
name: GROUP
value: "consumer-group-B"
name: MSG_FREQ
value: "10"
view raw Deployment.yaml hosted with ❤ by GitHub

Run the following helm commands to deploy the demonstration application to EKS using the project’s Helm chart, kafka-demo-app:

cd helm/
# perform dry run to validate chart
helm install kafka-demo-app ./kafka-demo-app \
--namespace $NAMESPACE --debug --dry-run
# apply chart resources
helm install kafka-demo-app ./kafka-demo-app \
--namespace $NAMESPACE
Successful deployment of the demonstration application’s Helm chart

Confirm the successful creation of the Kafka client pod with either of the following commands:

kubectl get pods -n kafka
kubectl get pods -n kafka -l app=kafka-demo-service-a
kubectl get pods -n kafka -l app=kafka-demo-service-b

You should now have a total of seven pods running in the kafka namespace. In addition to the previously deployed single Kafka client pod, there should be three new Service A pods and three new Service B pods.

The kafka namespace showing seven running pods

The ability of the demonstration application to interact with AWS SSM Parameter Store and AWS Secrets Manager is based on the IAM policy created by Terraform, EKSScramSecretManagerPolicy. This policy is associated with a new IAM role, which in turn, is associated with the Kubernetes Service Account, kafka-demo-app-sasl-scram-serviceaccount. This service account is associated with the demonstration application’s pods as part of the Kubernetes Deployment resource in the Helm chart.

8. Verify application functionality

Although the pods starting and running successfully is a good sign, to confirm that the demonstration application is operating correctly, examine the logs of Service A and Service B using kubectl. The logs will confirm that the application has successfully retrieved the SASL/SCRAM credentials from Secrets Manager, connected to MSK, and can produce and consume messages from the appropriate topics.

kubectl logs -l app=kafka-demo-service-a -n kafka
kubectl logs -l app=kafka-demo-service-b -n kafka

The MSG_FREQ environment variable controls the frequency at which the microservices produce messages. The frequency is 60 seconds by default but overridden and increased to 10 seconds in the Helm chart.

Below, we see the logs generated by the Service A pods. Note one of the messages indicating the Service A producer was successful: writing 1 messages to foo-topic (partition: 0). And a message indicating the consumer was successful: kafka-demo-service-a-db76c5d56-gmx4v received message: This is message 68 from host kafka-demo-service-b-57556cdc4c-sdhxc. Each message contains the name of the host container that produced and consumed it.

Logs generated by the Service A pods

Likewise, we see logs generated by the two Service B pods. Note one of the messages indicating the Service B producer was successful: writing 1 messages to bar-topic (partition: 2). And a message indicating the consumer was successful: kafka-demo-service-b-57556cdc4c-q8wvz received message: This is message 354 from host kafka-demo-service-a-db76c5d56-r88fk.

Logs generated by the Service B pods

CloudWatch Metrics

We can also examine the available Amazon MSK CloudWatch Metrics to confirm the EKS-based demonstration application is communicating as expected with MSK. There are 132 different metrics available for this cluster. Below, we see the BytesInPerSec and BytesOutPerSecond for each of the two topics, across each of the two topic’s three partitions, which are spread across each of the three Kafka broker nodes. Each metric shows similar volumes of traffic, both inbound and outbound, to each topic. Along with the logs, the metrics appear to show the multiple instances of Service A and Service B are producing and consuming messages.

Amazon CloudWatch Metrics for the MSK cluster

Prometheus

We can also confirm the same results using an open-source observability tool, like Prometheus. The Amazon MSK Developer Guide outlines the steps necessary to monitor Kafka using Prometheus. The Amazon MSK cluster created by eksctl already has open monitoring with Prometheus enabled and ports 11001 and 11002 added to the necessary MSK security group by Terraform.

Amazon MSK broker targets successfully connected to Prometheus

Running Prometheus in a single pod on the EKS cluster, built from an Ubuntu base Docker image or similar, is probably the easiest approach for this particular demonstration.

rate(kafka_server_BrokerTopicMetrics_Count{topic=~"foo-topic|bar-topic", name=~"BytesInPerSec|BytesOutPerSec"}[5m])
Prometheus graph showing the rate of BytesInPerSec and BytesOutPerSecond for the two topics

References


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Kubernetes-based Microservice Observability with Istio Service Mesh: Part 2 of 2

In part two of this two-part post, we will continue to explore the set of popular open-source observability tools that are easily integrated with the Istio service mesh. While these tools are not a part of Istio, they are essential to making the most of Istio’s observability features. The tools include Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We will round out the toolset with the addition of Fluent Bit for log processing and aggregation. We will observe a distributed, microservices-based reference application platform deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster using these tools. The platform, running on EKS, will use Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.

Kiali Management Console showing reference application platform

Observability

The O’Reilly book, Distributed Systems Observability, by Cindy Sridharan, describes The Three Pillars of Observability in Chapter 4: “Logs, metrics, and traces are often known as the three pillars of observability. While plainly having access to logs, metrics, and traces doesn’t necessarily make systems more observable, these are powerful tools that, if understood well, can unlock the ability to build better systems.

Reference Application Platform

To demonstrate Istio’s observability tools, we deployed a reference application platform to EKS on AWS. I have developed the application platform to demonstrate different Kubernetes platforms, such as EKS, GKE, AKS, and concepts such as service mesh, API management, observability, DevOps, and Chaos Engineering. The platform comprises a backend containing eight Go-based microservices, labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, four MongoDB databases, and one RabbitMQ message queue. The platform and all its source code are open-sourced on GitHub.

Reference Application Platform’s Angular-based UI

The reference application platform is designed to generate HTTP-based service-to-service, TCP-based service-to-database, and TCP-based service-to-queue-to-service IPC (inter-process communication). For example, Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message to a RabbitMQ queue, which Service F consumes message off on the RabbitMQ queue, and writes to MongoDB, and so on. The platform’s distributed service communications can be observed using Istio’s observability tools when the system is deployed to a Kubernetes cluster running the Istio service mesh.

High-level architecture of Reference Application Platform

Part Two

In part one of the post, we configured and deployed the reference application platform to an Amazon EKS development-grade cluster on AWS. The reference application, running on EKS, communicates with two external systems, Amazon DocumentDB (with MongoDB compatibility) and Amazon MQ.

Deployed Reference Application Platform as seen from Argo CD

In part two of the post, we will explore each of the observability tools we installed in greater detail. We will understand how each tool contributes to the three pillars of observability: logs, metrics, and traces.

Logs, metrics, and traces are often known as the three pillars of observability.
 — Cindy Sridharan

Pillar One: Logs

To paraphrase Jay Kreps on the LinkedIn Engineering Blog, a log is an append-only, totally-ordered sequence of records ordered by time. The ordering of records defines a notion of “time” since entries to the left are defined to be older than entries to the right. Logs are a historical record of events that happened in the past. Logs have been around almost as long as computers and are at the heart of many distributed data systems and real-time application architectures.

Go-based Microservice Logging

An effective logging strategy starts with what you log, when you log, and how you log. As part of our logging strategy, the eight Go-based microservices use Logrus, a popular structured logger for Go first released in 2014. The microservices also implement Banzai Cloud’s logrus-runtime-formatter. There is an excellent article on the formatter, Golang runtime Logrus Formatter. These two logging packages give us greater control over what you log, when you log, and how you log information about our microservices. The recommended configuration of the packages is minimal.

func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}

Logrus provides several advantages over Go’s simple logging package, log. For example, log entries are not only for Fatal errors, nor should all verbose log entries be output in a Production environment. The post’s microservices are taking advantage of Logrus’ ability to log at seven levels: Trace, Debug, Info, Warning, Error, Fatal, and Panic. I have also variabilized the log level, allowing it to be easily changed in the Kubernetes Deployment resource at deploy-time.

The microservices also take advantage of Banzai Cloud’s logrus-runtime-formatter. The Banzai formatter automatically tags log messages with runtime and stack information, including function name and line number; extremely helpful when troubleshooting. I am also using Logrus’ JSON formatter.

Service A log entries in CloudWatch Insights

In 2020, Logus entered maintenance mode. The author, Simon Eskildsen (Principal Engineer at Shopify), stated they will not be introducing new features. This does not mean Logrus is dead. With over 18,000 GitHub Stars, Logrus will continue to be maintained for security, bug fixes, and performance. The author states that many fantastic alternatives to Logus now exist, such as Zerolog, Zap, and Apex.

Client-side Angular UI Logging

Likewise, I have enhanced the logging of the Angular UI using NGX Logger. NGX Logger is a simple logging module for angular (currently supports Angular 6+). It allows “pretty print” to the console and allows log messages to be POSTed to a URL for server-side logging. For this demo, the UI will only log to the web browser’s console. Similar to Logrus, NGX Logger supports multiple log levels: Trace, Debug, Info, Warning, Error, Fatal, and Off. However, instead of just outputting messages, NGX Logger allows us to output properly formatted log entries to the browser’s console.

The level of logs output is configured to be dependent on the environment, Production or not Production. Below is an example of the log output from the Angular UI in Chrome. Since the UI’s Docker Image was built with the Production configuration, the log level is set to INFO. You would not want to expose potentially sensitive information in verbose log output to our end-users in Production.

Controlling logging levels is accomplished by adding the following ternary operator to the app.module.ts file.

imports: [
BrowserModule,
HttpClientModule,
FormsModule,
LoggerModule.forRoot({
level: !environment.production ?
NgxLoggerLevel.DEBUG : NgxLoggerLevel.INFO,
serverLogLevel: NgxLoggerLevel.INFO
})
],

Platform Logs

Based on the platform built, configured, and deployed in , you now have access logs from multiple sources.

  1. Amazon DocumentDB: Amazon CloudWatch Audit and Profiler logs;
  2. Amazon MQ: Amazon CloudWatch logs;
  3. Amazon EKS: API server, Audit, Authenticator, Controller manager, and Scheduler CloudWatch logs;
  4. Kubernetes Dashboard: Individual EKS Pod and Replica Set logs;
  5. Kiali: Individual EKS Pod and Container logs;
  6. Fluent Bit: EKS performance, host, dataplane, and application CloudWatch logs;

Fluent Bit

According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open-source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.

With Fluent Bit, deployed in part one, the EKS cluster’s performance, host, dataplane, and application logs will also be available in Amazon CloudWatch.

Within the application log groups, you have access to the individual log streams for each reference application’s components.

Within each CloudWatch log stream, you can view individual log entries.

CloudWatch Logs Insights enables you to interactively search and analyze your log data in Amazon CloudWatch Logs. You can perform queries to help you more efficiently and effectively respond to operational issues. If an issue occurs, you can use CloudWatch Logs Insights to identify potential causes and validate deployed fixes.

CloudWatch Logs Insights supports CloudWatch Logs Insights query syntax, a query language you can use to perform queries on your log groups. Each query can include one or more query commands separated by Unix-style pipe characters (|). For example:

fields @timestamp, @message
| filter kubernetes.container_name = "service-f"
and @message like "error"
| sort @timestamp desc
| limit 20

Pillar Two: Metrics

For metrics, we will examine CloudWatch Container Insights, Prometheus, and Grafana. Prometheus and Grafana are industry-leading tools you installed as part of the Istio deployment.

Prometheus

Prometheus is an open-source systems monitoring and alerting toolkit originally built at SoundCloud circa 2012. Prometheus joined the Cloud Native Computing Foundation (CNCF) in 2016 as the second project hosted after Kubernetes.

According to Istio, the Prometheus addon is a Prometheus server that comes preconfigured to scrape Istio endpoints to collect metrics. You can use Prometheus with Istio to record metrics that track the health of Istio and applications within the service mesh. You can visualize metrics using tools like Grafana and Kiali. The Istio Prometheus addon is intended for demonstration only and is not tuned for performance or security.

The istioctl dashboardcommand provides access to all of the Istio web UIs. With the EKS cluster running, Istio installed, and the reference application platform deployed, access Prometheus using the istioctl dashboard prometheus command from your terminal. You must be logged into AWS from your terminal to connect to Prometheus successfully. If you are not logged in to AWS, you will often see the following error: Error: not able to locate <tool_name> pod: Unauthorized. Since we used the non-production demonstration versions of the Istio Addons, there is no authentication and authorization required to access Prometheus.

According to Prometheus, users select and aggregate time-series data in real-time using a functional query language called PromQL (Prometheus Query Language). The result of an expression can either be shown as a graph, viewed as tabular data in Prometheus’s expression browser, or consumed by external systems through Prometheus’ HTTP API. The expression browser includes a drop-down menu with all available metrics as a starting point for building queries. Shown below are a few PromQL examples that were developed as part of writing this post.

istio_agent_go_info{kubernetes_namespace="dev"}
istio_build{kubernetes_namespace="dev"}
up{alpha_eksctl_io_cluster_name="istio-observe-demo", job="kubernetes-nodes"}
sum by (pod) (rate(container_network_transmit_packets_total{stack="reference-app",namespace="dev",pod=~"service-.*"}[5m]))
sum by (instance) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code="200"})
sum by (response_code) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code!~"200|0"})

Prometheus APIs

Prometheus has both an HTTP API and a Management API. There are many useful endpoints in addition to the Prometheus UI, available at http://localhost:9090/graph. For example, the Prometheus HTTP API endpoint that lists all the command-line configuration flags is available at http://localhost:9090/api/v1/status/flags. The endpoint that lists all the available Prometheus metrics is available at http://localhost:9090/api/v1/label/__name__/values; a total of 951 metrics in this demonstration!

The Prometheus endpoint that lists many available metrics with HELP and TYPE to explain their function is found at http://localhost:9090/metrics.

Understanding Metrics

In addition to these endpoints, the standard service level metrics exported by Istio and available via Prometheus are found in the Istio Standard Metrics documentation. An explanation of many of the metrics available via Prometheus are also found in the cAdvisor README on their GitHub site. As mentioned in this AWS Blog Post, the cAdvisor metrics are also available from the command line using the following commands:

export NODE=$(kubectl get nodes | sed -n '2 p') | awk {'print $1'}
kubectl get --raw "/api/v1/nodes/${NODE}/proxy/metrics/cadvisor"

Observing Metrics

Below is an example graph of the backend microservice containers deployed to EKS. The graph PromQL expression returns the amount of working set memory, including recently accessed memory, dirty memory, and kernel memory (container_memory_working_set_bytes), summed by pod, in megabytes (MB). There was no load on the services during the period displayed.

sum by (pod) (container_memory_working_set_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2)

The container_memory_working_set_bytes metric is the same metric used by the kubectl top command (not container_memory_usage_bytes).

> kubectl top pod -n dev --containers=true --use-protocol-buffer
POD                          NAME          CPU(cores)   MEMORY(bytes)
service-a-546fbd558d-28jlm service-a 1m 6Mi
service-a-546fbd558d-2lcsg service-a 1m 6Mi
service-b-545c85df9-dl9h8 service-b 1m 6Mi
service-b-545c85df9-q99xm service-b 1m 5Mi
service-c-58996574-58wd8 service-c 1m 7Mi
service-c-58996574-6q7n4 service-c 1m 7Mi
service-d-867796bb47-87ps5 service-d 1m 6Mi
service-d-867796bb47-fh6wl service-d 1m 6Mi
...

In another Prometheus example, the PromQL query expression returns the per-second rate of CPU resources measured in CPU units (1 CPU = 1 AWS vCPU), as measured over the last 5 minutes, per time series in the range vector, summed by the pod. During this period, the backend services were under a consistent, simulated load of 25 concurrent users using hey. The four Service D pods were consuming the most CPU units during this time period.

sum by (pod) (rate(container_cpu_usage_seconds_total{image=~"registry.hub.docker.com/garystafford/.*"}[5m])) * 1000

The container_cpu_usage_seconds_total metric is the same metric used by the kubectl top command. The above PromQL expression multiplies the query results by 1,000 to match the results from kubectl top, shown below.

> kubectl top pod -n dev --containers=true --use-protocol-buffer
POD                          NAME          CPU(cores)   MEMORY(bytes)
service-a-546fbd558d-28jlm service-a 25m 9Mi
service-a-546fbd558d-2lcsg service-a 27m 8Mi
service-b-545c85df9-dl9h8 service-b 29m 11Mi
service-b-545c85df9-q99xm service-b 23m 8Mi
service-c-58996574-c8hkn service-c 62m 9Mi
service-c-58996574-kx895 service-c 55m 8Mi
service-d-867796bb47-87ps5 service-d 285m 12Mi
service-d-867796bb47-9ln7p service-d 226m 11Mi
...

Limits

Prometheus also exposes container resource limits. For example, the memory limits set on the reference platform’s backend services, displayed in megabytes (MB), using the container_spec_memory_limit_bytes metric. When viewed alongside the real-time resources consumed by the services, these metrics are useful to properly configure and monitor Kubernetes management features such as the Horizontal Pod Autoscaler.

sum by (container) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2) / count by (container) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"})

Or, memory limits by Pod:

sum by (pod) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2)

Cluster Metrics

Prometheus also contains metrics about Istio components, Kubernetes components, and the EKS cluster. For example, the total memory in gigabytes (GB) of each m5.large EC2 worker nodes in the istio-observe-demo EKS cluster’s managed-ng-1 Managed Node Group.

machine_memory_bytes{alpha_eksctl_io_cluster_name="istio-observe-demo", alpha_eksctl_io_nodegroup_name="managed-ng-1"} / (1024^3)

For total physical cores, use the machine_cpu_physical_core metric, and for vCPU cores use the machine_cpu_cores metric.

Grafana

Grafana describes itself as the leading open-source software for time-series analytics. According to Grafana Labs, Grafana allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. You can easily create, explore, and share visually rich, data-driven dashboards. Grafana also allows users to visually define alert rules for their most important metrics. Grafana will continuously evaluate rules and can send notifications.

If you deployed Grafana using the Istio addons process demonstrated in part one of the post, access Grafana similar to the other tools:

istioctl dashboard grafana

According to Istio, Grafana is an open-source monitoring solution used to configure dashboards for Istio. You can use Grafana to monitor the health of Istio and applications within the service mesh. While you can build your own dashboards, Istio offers a set of preconfigured dashboards for all of the most important metrics for the mesh and the control plane. The preconfigured dashboards use Prometheus as the data source.

Below is an example of the Istio Mesh Dashboard, filtered to show the eight backend services workloads running in the dev namespace. During this period, the backend services were under a consistent simulated load of approximately 20 concurrent users using hey. You can observe the p50, p90, and p99 latency of requests to these workloads.

Dashboards are built from Panels, the basic visualization building blocks in Grafana. Each panel has a query editor specific to the data source (Prometheus in this case) selected. The query editor allows you to write your (PromQL) query. Below is the PromQL expression query responsible for the p50 latency Panel displayed in the Istio Mesh Dashboard.

 label_join((histogram_quantile(0.50, sum(rate(istio_request_duration_milliseconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)) / 1000) or histogram_quantile(0.50, sum(rate(istio_request_duration_seconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)), "destination_workload_var", ".", "destination_workload", "destination_workload_namespace")

Below is an example of the Outbound Workloads section of the Istio Workload Dashboard. The complete dashboard contains three sections: General, Inbound Workloads, and Outbound Workloads. Here we have filtered the on reference platform’s backend services in the dev namespace.

Here is a different view of the Istio Workload Dashboard, the dashboard’s Inbound Workloads section filtered to a single workload, Service A, the backend’s edge service. Service A accepts incoming traffic from the Istio Ingress Gateway as shown in the dashboard’s panels.

Grafana provides the ability to Explore a Panel. Explore strips away the dashboard and panel options so that you can focus on the query. It helps you iterate until you have a working query and then think about building a dashboard. Below is an example of the Panel showing the egress TCP traffic, based on the istio_tcp_sent_bytes_total metric, for Service F. Service F consumes messages off on the RabbitMQ queue (Amazon MQ) and writes messages to MongoDB (DocumentDB).

You can monitor the resource usage of Istio with the Performance Dashboard.

Additional Dashboards

Grafana provides a site containing official and community-built dashboards, including the above-mentioned Istio dashboards. Importing dashboards into your Grafana instance is as simple as copying the dashboard URL or the ID provided from the Grafana dashboard site and pasting it into the dashboard import option of your Grafana instance. Be aware that not every Kubernetes dashboard in Grafan’s site is compatible with your specific version of Kubernetes, Istio, or EKS, nor relies on Prometheus as a data source. As a result, you might have to test and tweak imported dashboards to get them working.

Below is an example of an imported community dashboard, Kubernetes cluster monitoring (via Prometheus) by Instrumentisto Team (dashboard ID 315).

Alerting

An effective observability strategy must include more than just the ability to visualize results. An effective strategy must also detect anomalies and notify (alert) the appropriate resources or directly resolve incidents. Grafana, like Prometheus, is capable of alerting and notification. You visually define alert rules for your critical metrics. Grafana will continuously evaluate metrics against the rules and send notifications when pre-defined thresholds are breached.

Prometheus supports multiple popular notification channels, including PagerDuty, HipChat, Email, Kafka, and Slack. Below is an example of a Prometheus notification channel that sends alert notifications to a Slack support channel.

Below is an example of an alert based on an arbitrarily high CPU usage of 300 milliCPUs (m). When the CPU usage of a single pod goes above that value for more than 3 minutes, an alert is sent. The high CPU usage could be caused by the Horizontal Pod Autoscaler not functioning, or the HPA has reached its maxReplicas limit, or there are not enough resources available within the cluster to schedule additional pods.

Triggered by the alert, Prometheus sends detailed notifications to the designated Slack channel.

Amazon CloudWatch Container Insights

Lastly in the category of Metrics, Amazon CloudWatch Container Insights collects, aggregates, and summarizes metrics and logs from your containerized applications and microservices. CloudWatch alarms can be set on metrics that Container Insights collects. Container Insights is available for Amazon Elastic Container Service (Amazon ECS) including Fargate, Amazon EKS, and Kubernetes platforms on Amazon EC2.

In Amazon EKS, Container Insights uses a containerized version of the CloudWatch agent to discover all running containers in a cluster. It then collects performance data at every layer of the performance stack. Container Insights collects data as performance log events using the embedded metric format. These performance log events are entries that use a structured JSON schema that enables high-cardinality data to be ingested and stored at scale.

In part one of the post, we also installed CloudWatch Container Insights monitoring for Prometheus, which automates the discovery of Prometheus metrics from containerized systems and workloads.

Below is an example of a basic Performance Monitoring CloudWatch Container Insights Dashboard. The dashboard is filtered to the dev namespace of the EKS cluster, where the reference application platform is running. During this period, the backend services were put under a simulated load using hey. As the load on the application increases, observe the Number of Pods increases from 19 to 34 pods, based on the Deployment resources and HPA configurations. There is also an Alert, shown on the right of the screen. An alarm was triggered for an arbitrarily high level of network transmission activity.

Next is an example of Container Insights’ Container Map view in Memory mode. You see a visual representation of the dev namespace, with each of the backend service’s Service and Deployment resources shown.

There is a warning icon indicating an Alarm on the cluster was triggered.

Lastly, CloudWatch Insights allows you to jump from the CloudWatch Insights to the CloudWatch Log Insights console. CloudWatch Insights will also write the CloudWatch Insights query for you. Below, we went from the Service D container metrics view in the CloudWatch Insights Performance Monitoring console directly to the CloudWatch Log Insights console with a query, ready to run.

Pillar 3: Traces

According to the Open Tracing website, distributed tracing, also called distributed request tracing, is used to profile and monitor applications, especially those built using a microservices architecture. Distributed tracing helps pinpoint where failures occur and what causes poor performance.

According to Istio, header propagation may be accomplished through client libraries, such as Zipkin or Jaeger. It may also be accomplished manually, referred to as trace context propagation, documented in the Distributed Tracing Task. Istio proxies can automatically send spans. Applications need to propagate the appropriate HTTP headers so that when the proxies send span information, the spans can be correlated correctly into a single trace. To accomplish this, an application needs to collect and propagate the following headers from the incoming request to any outgoing requests.

  • x-request-id
  • x-b3-traceid
  • x-b3-spanid
  • x-b3-parentspanid
  • x-b3-sampled
  • x-b3-flags
  • x-ot-span-context

The x-b3 headers originated as part of the Zipkin project. The B3 portion of the header is named for the original name of Zipkin, BigBrotherBird. Passing these headers across service calls is known as B3 propagation. According to Zipkin, these attributes are propagated in-process and eventually downstream (often via HTTP headers) to ensure all activity originating from the same root are collected together.

To demonstrate distributed tracing with Jaeger and Zipkin, Service A, Service B, and Service E have been modified to pass the b3 headers. These are the three services that make HTTP requests to other upstream services. The following code has been added to propagate the headers from one service to the next. The Istio sidecar proxy (Envoy) generates the first headers. It is critical to only propagate the headers that are present in the downstream request and have a value, as the code below does. Propagating an empty header will break the distributed tracing.

incomingHeaders := []string{
"x-b3-flags",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-spanid",
"x-b3-traceid",
"x-ot-span-context",
"x-request-id",
}
for _, header := range incomingHeaders {
if r.Header.Get(header) != "" {
req.Header.Add(header, r.Header.Get(header))
}
}

Below, the highlighted section of the response payload from a call to Service A’s /api/request-echo endpoint reveals the b3 headers originating from the Istio proxy and passed to Service A.

Jaeger

According to their website, Jaeger, inspired by Dapper and OpenZipkin, is a distributed tracing system released as open source by Uber Technologies. Jaeger is used for monitoring and troubleshooting microservices-based distributed systems, including distributed context propagation, distributed transaction monitoring, root cause analysis, service dependency analysis, and performance and latency optimization. The Jaeger website contains a helpful overview of Jaeger’s architecture and general tracing-related terminology.

If you deployed Jaeger using the Istio addons process demonstrated in part one of the post, access Jaeger similar to the other tools:

istioctl dashboard jaeger

Below is an example of the Jaeger UI’s Search view, displaying the results of a search for the Istio Ingress Gateway service over a period of time. We see a timeline of traces across the top with a list of trace results below. As discussed on the Jaeger website, a trace is composed of spans. A span represents a logical unit of work in Jaeger that has an operation name. A trace is an execution path through the system and can be thought of as a directed acyclic graph (DAG) of spans. If you have worked with systems like Apache Spark, you are probably already familiar with the concept of DAGs.

Below is a detailed view of a single trace in Jaeger’s Trace Timeline mode. The 14 spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 160 ms. The root span in the trace is the Istio Ingress Gateway. The Angular UI, loaded in the end user’s web browser, calls Service A via the Istio Ingress Gateway. From there, we see the expected flow of our service-to-service IPC. Service A calls Services B and Service C. Service B calls Service E, which calls Service G and Service H.

In this demonstration, traces are not instrumented to span the RabbitMQ message queue nor MongoDB. This means you would not see a trace that includes a call from Service D to Service F via the RabbitMQ.

The visualization of the trace’s timeline demonstrates the synchronous nature of the reference platform’s service-to-service IPC instead of the asynchronous nature of the decoupled communications using the RabbitMQ messaging queue. Note how Service A waits for each service in its call chain to respond before returning its response to the requester.

Within Jaeger’s Trace Timeline view, you have the ability to drill into a single span, which contains additional metadata. The span’s metadata includes the API endpoint URL being called, HTTP method, response status, and several other headers.

Jaeger also has an experimental Trace Graph mode, which displays a graph view of the same trace.

Jaeger also includes a Compare Trace feature and two Dependencies views: Force-Directed Graph and DAG. I find both views rather primitive compared to Kiali. Lacking access to Kiali, the views are marginally useful as a dependency graph.

Zipkin

Zipkin is a distributed tracing system, which helps gather timing data needed to troubleshoot latency problems in service architectures. According to a 2012 post on Twitter’s Engineering Blog, Zipkin started as a project during Twitter’s first Hack Week. During that week, they implemented a basic version of the Google Dapper paper for Thrift.

Zipkin and Jaeger are very similar in terms of capabilities. I have chosen to focus on Jaeger in this post as I prefer it over Zipkin. If you want to try Zipkin instead of Jaeger, you can use the following commands to remove Jaeger and install Zipkin from the Istio addons extras directory. In part one of the post, we did not install Zipkin by default when we deployed the Istio addons. Be aware that running both tools at the same time in the same Kubernetes cluster will cause unpredictable tracing results.

kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/jaeger.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/extras/zipkin.yaml

Access Zipkin similar to the other observability tools:

istioctl dashboard zipkin

Below is an example of a distributed trace visualized in Zipkin’s UI, containing 14 spans. This is very similar to the trace visualized in Jaeger, shown above. The spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 154 ms.

Zipkin can also visualize a dependency graph based on the distributed trace. Below is an example of a traffic simulation over a two-minute period, showing network traffic flowing between the reference platform’s components, illustrated as a dependency graph.

Kiali: Microservice Observability

According to their website, Kiali is a management console for an Istio-based service mesh. It provides dashboards, observability, and lets you operate your mesh with robust configuration and validation capabilities. It shows the structure of a service mesh by inferring traffic topology and displaying the mesh’s health. Kiali provides detailed metrics, powerful validation, Grafana access, and strong integration for distributed tracing with Jaeger.

If you deployed Kaili using the Istio addons process demonstrated in part one of the post, access Kiali similar to the other tools:

istioctl dashboard kaili

For improved security, I optionally chose to install the latest version of Kaili using the customizable install mentioned in Istio’s documentation. Using Kiali’s Install via Kiali Server Helm Chart option adds token-based authentication, similar to the Kubernetes Dashboard.

Logging into Kiali, we see the Overview tab, which provides a global view of all namespaces within the Istio service mesh and the number of applications within each namespace.

The Graph tab in the Kiali UI represents the components running in the Istio service mesh. Below, filtering on the cluster’s dev Namespace, we can observe that Kiali has mapped 8 applications (Workloads), 10 services, and 22 edges (a graph term). Specifically, we see the Istio Ingres Proxy at the edge of the service mesh, the Angular UI and eight backend services all with their respective Envoy proxy sidecars that are taking traffic (Service F did not take any direct traffic from another service in this example), the external DocumentDB egress point, and the external Amazon MQ egress point. Finally, note how service-to-service traffic flows, with Istio, from the service to its sidecar proxy, to the other service’s sidecar proxy, and finally to the service.

Below is a similar view of the service mesh, but this time, there are failures between the Istio Ingress Gateway and Service A, shown in red. We can also observe overall metrics for the HTTP traffic, such as the request per second inbound and outbound, total requests, success and error rates, and HTTP status codes.

Kiali allows you to zoom in and focus on a single component in the graph and its individual metrics.

Kiali can also display average request times and other metrics for each edge in the graph (communication between two components). Kaili can even show those metrics over a given period of time, using Kiali’s Replay feature, shown below.

Focusing on the external DocumentDB cluster, Kiali also allows us to view TCP traffic between the four services within the service mesh that connect to the external cluster.

The Applications tab lists all the applications, their namespace, and labels.

You can drill into an individual component on both the Applications and Workloads tabs and view additional details. Details include the overall health, Pods, and Istio Config status. Below is an overview of the Service A workload in the dev Namespace.

The Workloads detailed view also includes inbound and outbound metrics. Below is an example of the outbound request volume, duration, throughput, and size metrics, for Service A in the dev Namespace.

Kiali also gives you access to the individual pod’s container logs. Although log access is not as user-friendly as other log sources discussed previously, having logs available alongside metrics (integration with Grafana), traces (integration with Jaeger), and mesh visualization, all in Kiali, can be very effective as a single source for observability.

Kiali also has an Istio Config tab. The Istio Config tab displays a list of all of the available Istio configuration objects that exist in the user’s environment.

You can use Kiali to configure and manage the Istio service mesh and its installed resources. Using Kiali, you can actually modify the deployed resources, similar to using the kubectl edit command.

Oftentimes, I find Kiali to be my first stop when troubleshooting platform issues. Once I identify the specific components or communication paths having issues, I can query the CloudWatch logs and Prometheus metrics through the Grafana dashboard.

Conclusion

In this two-part post, we explored a set of popular open-source observability tools, easily integrated with the Istio service mesh. These tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset with the addition of Fluent Bit for log processing and forwarding to Amazon CloudWatch Container Insights. Using these tools, we successfully observed a microservices-based, distributed reference application platform deployed to Amazon EKS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

1 Comment

Kubernetes-based Microservice Observability with Istio Service Mesh: Part 1 of 2

This two-part post explores a set of popular open-source observability tools that are easily integrated with the Istio service mesh. While these tools are not a part of Istio, they are essential to making the most of Istio’s observability features. The tools include Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We will round out the toolset with the addition of Fluent Bit for log processing and aggregation. We will observe a distributed, microservices-based reference application platform deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster using these tools. The platform, running on EKS, will use Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.

Kiali Management Console showing reference application platform

Observability

Similar to quantum computing, big data, artificial intelligence, machine learning, and 5G, observability is currently a hot buzzword in the IT industry. According to Wikipedia, observability is a measure of how well the internal states of a system can be inferred from its external outputs. The O’Reilly book, Distributed Systems Observability, by Cindy Sridharan, describes The Three Pillars of Observability in Chapter 4: “Logs, metrics, and traces are often known as the three pillars of observability. While plainly having access to logs, metrics, and traces doesn’t necessarily make systems more observable, these are powerful tools that, if understood well, can unlock the ability to build better systems.

Logs, metrics, and traces are often known as the three pillars of observability.

Cindy Sridharan

Honeycomb is a developer of observability tools for production systems. The honeycomb.io site includes articles, blog posts, whitepapers, and podcasts on observability. According to Honeycomb, “Observability is achieved when a system is understandable — which is difficult with complex systems, where most problems are the convergence of many things failing at once.

As modern distributed systems grow ever more complex, the ability to observe those systems demands equally modern tooling designed with this level of complexity in mind. Traditional logging and monitoring tools struggle with today’s polyglot, distributed, event-driven, ephemeral, containerized and serverless application environments. Tools like the Istio service mesh attempt to solve the observability challenge by offering easy integration with several popular open-source telemetry tools. Istio’s integrations include Jaeger for distributed tracing, Kiali for Istio service mesh-based microservice visualization, and Prometheus and Grafana for metric collection, monitoring, and alerting. Combined with cloud-native monitoring and logging tools such as Fluent Bit and Amazon CloudWatch Container Insights, we have a complete observability platform for modern distributed applications running on Amazon Elastic Kubernetes Service (Amazon EKS).

Traditional logging and monitoring tools struggle with today’s polyglot, distributed, event-driven, ephemeral, containerized and serverless application environments.

Gary Stafford

Reference Application Platform

To demonstrate Istio’s observability tools, we will deploy a reference application platform, written in Go and TypeScript with Angular, to EKS on AWS. The reference application platform was developed to demonstrate different Kubernetes platforms, such as EKS, GKE, and AKS, and concepts such as service mesh, API management, observability, DevOps, and Chaos Engineering. The platform is currently comprised of a backend containing eight Go-based microservices, labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, four MongoDB databases, and one RabbitMQ message queue for event-based communications. The platform and all its source code are open-sourced on GitHub.

Reference Application Platform’s Angular-based UI

The reference application platform is designed to generate HTTP-based service-to-service, TCP-based service-to-database, and TCP-based service-to-queue-to-service IPC (inter-process communication). Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message on a RabbitMQ queue, which Service F consumes and writes to MongoDB, and so on. Distributed service communications can be observed using Istio’s observability tools when the system is deployed to a Kubernetes cluster running the Istio service mesh.

High-level architecture of Reference Application Platform

Service Responses

Each Go microservice contains a /greeting, /health, and /metrics endpoint. The service’s /health endpoint is used to configure Kubernetes Liveness, Readiness, and Startup Probes. The /metrics endpoint exposes metrics that Prometheus scraps. Lastly, upstream services respond to requests from downstream services when calling their /greeting endpoint by returning a small informational JSON payload — a greeting.

{
"id": "1f077127-2f9f-4a90-ad88-da52327c2620",
"service": "Service C",
"message": "Konnichiwa (こんにちは), from Service C!",
"created": "2021-06-04T04:34:02.901726709Z",
"hostname": "service-c-6d5cc8fdfd-stsq9"
}

The responses are aggregated across the service call chain, resulting in an array of service responses being returned to the edge service, Service A, and subsequently, the platform’s UI running in the end user’s web browser.

[
{
"id": "a9afab6a-3e2a-41a6-aec7-7257d2904076",
"service": "Service D",
"message": "Shalom (שָׁלוֹם), from Service D!",
"created": "2021-06-04T14:28:32.695151047Z",
"hostname": "service-d-565c775894-vdsjx"
},
{
"id": "6d4cc38a-b069-482c-ace5-65f0c2d82713",
"service": "Service G",
"message": "Ahlan (أهلا), from Service G!",
"created": "2021-06-04T14:28:32.814550521Z",
"hostname": "service-g-5b846ff479-znpcb"
},
{
"id": "988757e3-29d2-4f53-87bf-e4ff6fbbb105",
"service": "Service H",
"message": "Nǐ hǎo (你好), from Service H!",
"created": "2021-06-04T14:28:32.947406463Z",
"hostname": "service-h-76cb7c8d66-lkr26"
},
{
"id": "966b0bfa-0b63-4e21-96a1-22a76e78f9cd",
"service": "Service E",
"message": "Bonjour, from Service E!",
"created": "2021-06-04T14:28:33.007881464Z",
"hostname": "service-e-594d4754fc-pr7tc"
},
{
"id": "c612a228-704f-4562-90c5-33357b12ff8d",
"service": "Service B",
"message": "Namasté (नमस्ते), from Service B!",
"created": "2021-06-04T14:28:33.015985983Z",
"hostname": "service-b-697b78cf54-4lk8s"
},
{
"id": "b621bd8a-02ee-4f9b-ac1a-7d91ddad85f5",
"service": "Service C",
"message": "Konnichiwa (こんにちは), from Service C!",
"created": "2021-06-04T14:28:33.042001406Z",
"hostname": "service-c-7fd4dd5947-5wcgs"
},
{
"id": "52eac1fa-4d0c-42b4-984b-b65e70afd98a",
"service": "Service A",
"message": "Hello, from Service A!",
"created": "2021-06-04T14:28:33.093380628Z",
"hostname": "service-a-6f776d798f-5l5dz"
}
]

CORS

The platform’s backend edge service, Service A, is configured for Cross-Origin Resource Sharing (CORS) using the access-control-allow-origin response header. The CORS configuration allows the Angular UI, running in the end user’s web browser, to call Service A’s /greeting endpoint, which potentially resides in a different host from the UI. Shown below is the Go source code for Service A. Note the use of the ALLOWED_ORIGINS environment variable on lines 32 and 195, which allows you to configure the origins that are allowed from the service’s Deployment resource.

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: Service A
// date: 2021-06-05
package main
import (
"encoding/json"
"fmt"
runtime "github.com/banzaicloud/logrus-runtime-formatter"
"io"
"io/ioutil"
"net/http"
"net/http/httputil"
"os"
"strconv"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
log "github.com/sirupsen/logrus"
)
var (
logLevel = getEnv("LOG_LEVEL", "debug")
port = getEnv("PORT", ":8080")
serviceName = getEnv("SERVICE_NAME", "Service A")
message = getEnv("GREETING", "Hello, from Service A!")
allowedOrigins = getEnv("ALLOWED_ORIGINS", "*")
URLServiceB = getEnv("SERVICE_B_URL", "http://service-b&quot;)
URLServiceC = getEnv("SERVICE_C_URL", "http://service-c&quot;)
)
type Greeting struct {
ID string `json:"id,omitempty"`
ServiceName string `json:"service,omitempty"`
Message string `json:"message,omitempty"`
CreatedAt time.Time `json:"created,omitempty"`
Hostname string `json:"hostname,omitempty"`
}
var greetings []Greeting
// *** HANDLERS ***
func GreetingHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
log.Debug(r)
greetings = nil
callNextServiceWithTrace(URLServiceB+"/api/greeting", r)
callNextServiceWithTrace(URLServiceC+"/api/greeting", r)
tmpGreeting := Greeting{
ID: uuid.New().String(),
ServiceName: serviceName,
Message: message,
CreatedAt: time.Now().Local(),
Hostname: getHostname(),
}
greetings = append(greetings, tmpGreeting)
err := json.NewEncoder(w).Encode(greetings)
if err != nil {
log.Error(err)
}
}
func HealthCheckHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{\"alive\": true}"))
if err != nil {
log.Error(err)
}
}
func ResponseStatusHandler(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
statusCode, err := strconv.Atoi(params["code"])
if err != nil {
log.Error(err)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(statusCode)
}
func RequestEchoHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
requestDump, err := httputil.DumpRequest(r, true)
if err != nil {
log.Error(err)
}
_, err = fmt.Fprintf(w, string(requestDump))
if err != nil {
log.Error(err)
}
}
// *** UTILITY FUNCTIONS ***
func callNextServiceWithTrace(url string, r *http.Request) {
log.Debug(url)
var tmpGreetings []Greeting
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Error(err)
}
// Headers must be passed for Jaeger Distributed Tracing
incomingHeaders := []string{
"x-b3-flags",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-spanid",
"x-b3-traceid",
"x-ot-span-context",
"x-request-id",
}
for _, header := range incomingHeaders {
if r.Header.Get(header) != "" {
req.Header.Add(header, r.Header.Get(header))
}
}
log.Info(req)
client := &http.Client{
Timeout: time.Second * 10,
}
response, err := client.Do(req)
if err != nil {
log.Error(err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
log.Error(err)
}
}(response.Body)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error(err)
}
err = json.Unmarshal(body, &tmpGreetings)
if err != nil {
log.Error(err)
}
for _, r := range tmpGreetings {
greetings = append(greetings, r)
}
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Error(err)
}
return hostname
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func run() error {
c := cors.New(cors.Options{
AllowedOrigins: []string{allowedOrigins},
AllowCredentials: true,
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"},
})
router := mux.NewRouter()
api := router.PathPrefix("/api").Subrouter()
api.HandleFunc("/greeting", GreetingHandler).Methods("GET", "OPTIONS")
api.HandleFunc("/health", HealthCheckHandler).Methods("GET", "OPTIONS")
api.HandleFunc("/request-echo", RequestEchoHandler).Methods(
"GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD")
api.HandleFunc("/status/{code}", ResponseStatusHandler).Methods("GET", "OPTIONS")
api.Handle("/metrics", promhttp.Handler())
handler := c.Handler(router)
return http.ListenAndServe(port, handler)
}
func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}
func main() {
if err := run(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}
view raw main.go hosted with ❤ by GitHub

MongoDB- and RabbitMQ-as-a-Service

Using external services will help us understand how Istio and its observability tools collect telemetry for communications between the reference application platform on Kubernetes and external systems.

Amazon DocumentDB

For this demonstration, the reference application platform’s MongoDB databases will be hosted, external to EKS, on Amazon DocumentDB (with MongoDB compatibility). According to AWS, Amazon DocumentDB is a purpose-built database service for JSON data management at scale, fully managed and integrated with AWS, and enterprise-ready with high durability.

Amazon MQ

Similarly, the reference application platform’s RabbitMQ queue will be hosted, external to EKS, on Amazon MQ. AWS MQ is a managed message broker service for Apache ActiveMQ and RabbitMQ, making it easy to set up and operate message brokers on AWS. Amazon MQ reduces your operational responsibilities by managing the provisioning, setup, and maintenance of message brokers for you. For RabbitMQ, Amazon MQ provides access to the RabbitMQ web console. The console allows us to monitor and manage RabbitMQ.

RabbitMQ Web Console showing the reference platform’s greeting queue

Shown below is the Go source code for Service F. This service consumes messages from the RabbitMQ queue, placed there by Service D, and writes the messages to MongoDB. Services use Sean Treadway’s Go RabbitMQ Client Library and MongoDB’s MongoDB Go Driver for connectivity.

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: Service F
// date: 2021-06-05
package main
import (
"bytes"
"context"
"encoding/json"
runtime "github.com/banzaicloud/logrus-runtime-formatter"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var (
logLevel = getEnv("LOG_LEVEL", "debug")
port = getEnv("PORT", ":8080")
serviceName = getEnv("SERVICE_NAME", "Service F")
message = getEnv("GREETING", "Hola, from Service F!")
queueName = getEnv("QUEUE_NAME", "service-d.greeting")
mongoConn = getEnv("MONGO_CONN", "mongodb://mongodb:27017/admin")
rabbitMQConn = getEnv("RABBITMQ_CONN", "amqp://guest:guest@rabbitmq:5672")
)
type Greeting struct {
ID string `json:"id,omitempty"`
ServiceName string `json:"service,omitempty"`
Message string `json:"message,omitempty"`
CreatedAt time.Time `json:"created,omitempty"`
Hostname string `json:"hostname,omitempty"`
}
var greetings []Greeting
// *** HANDLERS ***
func GreetingHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
greetings = nil
tmpGreeting := Greeting{
ID: uuid.New().String(),
ServiceName: serviceName,
Message: message,
CreatedAt: time.Now().Local(),
Hostname: getHostname(),
}
greetings = append(greetings, tmpGreeting)
callMongoDB(tmpGreeting, mongoConn)
err := json.NewEncoder(w).Encode(greetings)
if err != nil {
log.Error(err)
}
}
func HealthCheckHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{\"alive\": true}"))
if err != nil {
log.Error(err)
}
}
// *** UTILITY FUNCTIONS ***
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Error(err)
}
return hostname
}
func callMongoDB(greeting Greeting, mongoConn string) {
log.Info(greeting)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoConn))
if err != nil {
log.Error(err)
}
defer func(client *mongo.Client, ctx context.Context) {
err := client.Disconnect(ctx)
if err != nil {
log.Error(err)
}
}(client, nil)
collection := client.Database("service-f").Collection("messages")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = collection.InsertOne(ctx, greeting)
if err != nil {
log.Error(err)
}
}
func getMessages(rabbitMQConn string) {
conn, err := amqp.Dial(rabbitMQConn)
if err != nil {
log.Error(err)
}
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
log.Error(err)
}
}(conn)
ch, err := conn.Channel()
if err != nil {
log.Error(err)
}
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
log.Error(err)
}
}(ch)
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
if err != nil {
log.Error(err)
}
msgs, err := ch.Consume(
q.Name,
"service-f",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Error(err)
}
forever := make(chan bool)
go func() {
for delivery := range msgs {
log.Debug(delivery)
callMongoDB(deserialize(delivery.Body), mongoConn)
}
}()
<-forever
}
func deserialize(b []byte) (t Greeting) {
log.Debug(b)
var tmpGreeting Greeting
buf := bytes.NewBuffer(b)
decoder := json.NewDecoder(buf)
err := decoder.Decode(&tmpGreeting)
if err != nil {
log.Error(err)
}
return tmpGreeting
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func run() error {
go getMessages(rabbitMQConn)
router := mux.NewRouter()
api := router.PathPrefix("/api").Subrouter()
api.HandleFunc("/greeting", GreetingHandler).Methods("GET")
api.HandleFunc("/health", HealthCheckHandler).Methods("GET")
api.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(port, router)
}
func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}
func main() {
if err := run(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}
view raw main.go hosted with ❤ by GitHub

Source Code

All source code for this post is available on GitHub within two projects. Go-based microservices source code and Kubernetes resources are located in the k8s-istio-observe-backend project repository. The Angular UI TypeScript-based source code is located in the k8s-istio-observe-frontend project repository. You do not need to clone the Angular UI project for this demonstration. The demonstration uses the 2021-istio branch for both projects.

git clone --branch 2021-istio --single-branch \
https://github.com/garystafford/k8s-istio-observe-backend.git
# optional - not needed for demonstration
git clone --branch 2021-istio --single-branch \
https://github.com/garystafford/k8s-istio-observe-frontend.git

Docker images referenced in the Kubernetes Deployment resource files for the Go services and UI are all available on Docker Hub. The Go microservice Docker images were built using the official Golang Alpine image on DockerHub, containing Go version 1.16.4. Using the Alpine image to compile the Go source code ensures the containers will be as small as possible and minimize the container’s potential attack surface.

Prerequisites

This post will assume a basic level of knowledge of AWS EKS, Kubernetes, and Istio. Furthermore, the post assumes you have already installed recent versions of the AWS CLI v2, kubectl, Weaveworks’ eksctl, Docker, and Istio. Meaning that the aws, kubectl, eksctl, istioctl, and docker command tools are all available from the terminal.

CLI for Amazon EKS

Weaveworks’ eksctl is a simple CLI tool for creating and managing clusters on EKS — Amazon’s managed Kubernetes service for EC2. It is written in Go and uses CloudFormation.

CLI for Istio

The Istio configuration command-line utility, istioctl, is designed to help debug and diagnose the Istio mesh.

Set-up and Installation

To deploy the microservices platform to EKS, we will proceed in roughly the following order:

  1. Create a TLS certificate and Route53 hosted zone records for ALB;
  2. Create an Amazon DocumentDB database cluster;
  3. Create an Amazon MQ RabbitMQ message broker;
  4. Create an EKS cluster;
  5. Modify Kubernetes resources for your own environment;
  6. Deploy AWS Application Load Balancer (ALB) and associated resources;
  7. Deploy Istio to the EKS cluster;
  8. Deploy Fluent Bit to the EKS cluster;
  9. Deploy the reference platform to EKS;
  10. Test and troubleshoot the platform;
  11. Observe the results in part two;

Amazon DocumentDB

As previously mentioned, the MongoDB databases will be hosted, external to EKS, on Amazon DocumentDB, with MongoDB compatibility. Create a DocumentDB cluster. For the sake of simplicity and affordability of the demo, I recommend creating a single db.r5.large node cluster. We will connect from the microservices to Amazon DocumentDB using the supplied mongodb:// connection string.

Amazon DocumentDB clusters are deployed within an Amazon Virtual Private Cloud (Amazon VPC). If you are installing DocumentDB in a separate VPC than EKS, you will need to ensure that the EKS VPC can access the DocumentDB VPC. Per the DocumentDB documentation, DocumentDB clusters can be accessed directly by Amazon EC2 instances or other AWS services that are deployed in the same Amazon VPC. Additionally, Amazon DocumentDB can be accessed via EC2 instances or other AWS services from different VPCs in the same AWS Region or other Regions via VPC peering.

Amazon MQ

Similarly, the RabbitMQ queues will be hosted, external to EKS, on Amazon MQ. Create an Amazon MQ RabbitMQ broker. To ensure the simplicity and affordability of the demo, I recommend a single mq.m5.large instance broker. The broker is running the RabbitMQ engine and has TLS disabled. We will connect from the microservices to Amazon MQ using AMQP (Advanced Message Queuing Protocol). Amazon MQ provides an amqps:// endpoint. The amqps URI scheme is used to instruct a client to make a secure connection to the server. You can manage and observe RabbitMQ from the RabbitMQ web console provided by Amazon MQ.

Modify Kubernetes Resources

You will need to change several configuration settings in the GitHub project’s Kubernetes resource files to match your environment.

Istio ServiceEntry for Document DB

Modify the Istio ServiceEntry resource, external-mesh-document-db.yaml, adding your DocumentDB host address. This file allows egress traffic from the microservices on EKS to the DocumentDB cluster.

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: docdb-external-mesh
spec:
hosts:
- {{ your_document_db_hostname }}
ports:
- name: mongo
number: 27017
protocol: MONGO
location: MESH_EXTERNAL
resolution: NONE

Istio ServiceEntry for Amazon MQ

Modify the Istio ServiceEntry resource, external-mesh-amazon-mq.yaml, adding your Amazon MQ host address. This file allows egress traffic from the microservices on EKS to the Amazon MQ RabbitMQ broker.

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: amazon-mq-external-mesh
spec:
hosts:
- {{ your_amazon_mq_hostname }}
ports:
- name: rabbitmq
number: 5671
protocol: TCP
location: MESH_EXTERNAL
resolution: NONE

Istio Gateway

There are numerous strategies you can use to route traffic into the EKS cluster via Istio. For this demonstration, I am using an AWS Application Load Balancer (ALB). I have mapped one hostname, observe-ui.example-api.com, to the Angular UI application running on EKS. The backend microservice-based API, specifically the edge service, Service A, is mapped to a second hostname, observe-api.example-api.com.

According to Istio, the Gateway describes a load balancer operating at the edge of the mesh, receiving incoming or outgoing HTTP/TCP connections. Modify the Istio Ingress Gateway resource, gateway.yaml. Insert your own DNS entries into the hosts section. These are the only hosts that will be allowed into the mesh on port 80.

apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: istio-gateway
spec:
selector:
istio: ingressgateway # use istio default controller
servers:
- port:
number: 80
name: ui
protocol: HTTP
hosts:
- {{ your_ui_hostname }}
- {{ your_api_hostname }}

Istio VirtualService

According to Istio, a VirtualService defines a set of traffic routing rules to apply when a host is addressed. A VirtualService is bound to a Gateway to control the forwarding of traffic arriving at a particular host and port. Modify the project’s two Istio VirtualServices resources, virtualservices.yaml. Insert the corresponding DNS entries from the Istio Gateway.

---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: angular-ui
spec:
hosts:
- {{ your_ui_hostname }}
gateways:
- istio-gateway
http:
- match:
- uri:
prefix: /
route:
- destination:
host: angular-ui.dev.svc.cluster.local
subset: v1
port:
number: 80
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: service-a
spec:
hosts:
- {{ your_api_hostname }}
gateways:
- istio-gateway
http:
- match:
- uri:
prefix: /api
route:
- destination:
host: service-a.dev.svc.cluster.local
subset: v1
port:
number: 8080

Kubernetes Secret

According to the Kubernetes project, Kubernetes Secrets lets you store and manage sensitive information, such as passwords, OAuth tokens, and SSH keys. Storing confidential information in a Secret is safer and more flexible than putting it verbatim in a Pod definition or in a container image.

The project contains a Kubernetes Opaque type Secret resource, go-srv-demo.yaml. The Secret contains several pieces of arbitrary user-defined data we want to secure. Data includes the full DocumentDB mongodb:// connection string and the Amazon MQ amqps:// connection string used by the microservices. We will use the Secret to secure the entire connection string, including the hostname, port, username, and password. The data also includes the DocumentDB host, username, and password, and an arbitrary username and password to login to Mongo Express using Basic Authentication.

You must encode your secret’s values using base64. On Linux and Mac, you can use the base64 program to encode the connection strings.

echo -n '{{ your_secret_to_encode }}' | base64
# e.g., echo -n 'amqps://username:password@hostname.mq.us-east-1.amazonaws.com:5671/' | base64

Add the base64 encoded values to the Secret resource.

apiVersion: v1
kind: Secret
metadata:
name: go-srv-config
namespace: dev
type: Opaque
data:
mongodb.conn: {{ your_base64_encoded_secret }}
rabbitmq.conn: {{ your_base64_encoded_secret }}
---
apiVersion: v1
kind: Secret
metadata:
name: mongo-express-config
namespace: mongo-express
type: Opaque
data:
me.basicauth.username: {{ your_base64_encoded_secret }}
me.basicauth.password: {{ your_base64_encoded_secret }}
mongodb.host: {{ your_base64_encoded_secret }}
mongodb.username: {{ your_base64_encoded_secret }}
mongodb.password: {{ your_base64_encoded_secret }}

AWS Load Balancer Controller

The project contains a Custom Resource Definition (CRD) and associated resources, aws-load-balancer-controller-v220-all.yaml. These resources configure the AWS Application Load Balancer (ALB) using the AWS Load Balancer Controller v2.2.0, aws-load-balancer-controller. The AWS Load Balancer Controller manages AWS Elastic Load Balancers (ELB) for a Kubernetes cluster. The controller provisions an AWS ALB when you create a Kubernetes Ingress.

Modify line 797 to include the name of your own cluster. I am using the cluster name istio-observe-demo throughout the demo.

spec:
containers:
- args:
- --cluster-name=istio-observe-demo
- --ingress-class=alb
image: amazon/aws-alb-ingress-controller:v2.2.0
livenessProbe:
failureThreshold: 2
httpGet:
path: /healthz
port: 61779
scheme: HTTP

EKS Cluster Config

The project contains an eksctl ClusterConfig resource, cluster.yaml. The ClusterConfig defines the configuration of the Amazon EKS cluster along with networking, security, and other associated resources. Instead of a pre-existing Amazon Virtual Private Cloud (Amazon VPC) for this demo, eksctl will create a VPC and associated AWS resources as part of cluster creation. Modify the file to match your AWS Region, desired EKS cluster name, and Kubernetes release. For the demo, I am using the latest Kubernetes 1.20 release.

apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: istio-observe-demo
region: us-east-1
version: "1.20"
iam:
withOIDC: true

Set Environment Variables

Modify and set the following environment variables in your terminal. I will be using us-east-1 for all the demonstration’s AWS resources that are part of the demonstration. They should match the eksctl ClusterConfig resource above.

export AWS_ACCOUNT=$(aws sts get-caller-identity --output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="istio-observe-demo"

Istio Home

Set your ISTIO_HOME directory. I have the latest Istio 1.10.0 installed and have theISTIO_HOME environment variable set in my Oh My Zsh .zshrc file. I have also set Istio’s bin/ subdirectory in my PATH environment variable. The bin/ subdirectory contains the istioctl executable.

echo $ISTIO_HOME
/Applications/Istio/istio-1.10.0
where istioctl
/Applications/Istio/istio-1.10.0/bin/istioctl
istioctl version

client version: 1.10.0
control plane version: 1.10.0
data plane version: 1.10.0 (4 proxies)

Create EKS Cluster

With the cluster.yaml file modified previously, deploy the EKS cluster to a new VPC on AWS.

eksctl create cluster -f ./resources/other/cluster.yaml

This step deploys a large number of resources using CloudFormation. The complete EKS provisioning process can take up to 15–20 minutes to complete.

For the complete demonstration, eksctl will deploy a total of four CloudFormation stacks to your AWS environment.

Once complete, configure kubectl so that you can connect to an Amazon EKS cluster.

aws eks --region ${EKS_REGION} update-kubeconfig \
--name ${CLUSTER_NAME}

Confirm that your cluster creation was successful with the following commands:

kubectl cluster-info
eksctl utils describe-stacks \
--region ${EKS_REGION} --cluster ${CLUSTER_NAME}

Use the EKS Management Console to review the new cluster’s details.

The EKS cluster in this demonstration was created with a single Amazon EKS managed node group, managed-ng-1. The managed node group contains three m5.large EC2 instances. The composition of the EKS cluster can be modified in the eksctl ClusterConfig resource, cluster.yaml.

Deploy AWS Load Balancer Controller

Using the aws-load-balancer-controller-v220-all.yaml file you previously modified, deploy the AWS Load Balancer Controller v2.2.0. Please carefully review the AWS Load Balancer Controller instructions to understand how this resource is configured and integrated with EKS.

curl -o resources/aws/iam-policy.json \
https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.2.0/docs/install/iam_policy.json
aws iam create-policy \
--policy-name AWSLoadBalancerControllerIAMPolicy220 \
--policy-document file://resources/aws/iam-policy.json

eksctl create iamserviceaccount \
--region ${EKS_REGION} \
--cluster ${CLUSTER_NAME} \
--namespace=kube-system \
--name=aws-load-balancer-controller \
--attach-policy-arn=arn:aws:iam::${AWS_ACCOUNT}:policy/AWSLoadBalancerControllerIAMPolicy220 \
--override-existing-serviceaccounts \
--approve
kubectl apply --validate=false \
-f https://github.com/jetstack/cert-manager/releases/download/v1.3.1/cert-manager.yaml

kubectl apply -f resources/other/aws-load-balancer-controller-v220-all.yaml

To confirm the aws-load-balancer-controller is deployed and ready, run the following command:

kubectl get deployment -n kube-system aws-load-balancer-controller
NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
aws-load-balancer-controller
1/1 1 1 55s

AWS Load Balancer Controller Policy

There is an OpenID Connect provider URL associated with the EKS cluster. To use IAM roles for service accounts, an IAM OIDC provider must exist for your cluster. Obtain the URL from the EKS Management Console’s Details tab.

You can also obtain the URL using the following AWS CLI commands:

aws eks describe-cluster --name ${CLUSTER_NAME}
aws iam list-open-id-connect-providers

The project contains a policy document, trust-eks-policy.json. Modify the policy document by adding the OpenID Connect information found above. Instructions are also included in the AWS Create an IAM OIDC provider for your cluster documentation.

{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Principal":{
"Federated":" {{ your_openid_connect_arn }}"
},
"Action":"sts:AssumeRoleWithWebIdentity",
"Condition":{
"StringEquals":{
"oidc.eks.us-east-1.amazonaws.com/id/{{ your_open_id_connect_id }}:sub":"system:serviceaccount:kube-system:alb-ingress-controller"
}
}
}
]
}

Create and attach the AWS Load Balancer Controller IAM policies and roles.

aws iam create-role \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--assume-role-policy-document file://resources/aws/trust-eks-policy.json

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn="arn:aws:iam::${AWS_ACCOUNT}:policy/AWSLoadBalancerControllerIAMPolicy220"

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy

Create Namespaces

Kubernetes supports multiple virtual clusters backed by the same physical cluster. These virtual clusters are called namespaces. The dev namespace will house the reference application platform for this demonstration — the Angular UI frontend and Go microservices backend. This namespace represents a development environment on EKS for our reference application platform. A second namespace, mongo-express, will be used to deploy Mongo Express later in the post.

kubectl apply -f ./minikube/resources/namespaces.yaml

Enable Automatic Sidecar Injection

To take advantage of Istio’s features, pods in the mesh must be running an Istio sidecar proxy. By setting the istio-injection=enabled label on a namespace and the injection webhook is enabled, any new pods created in that namespace will automatically have an Istio sidecar proxy added to them. Labeling the dev namespace for automatic sidecar injection ensures that our reference application platform — the UI and the microservices — will have Istio sidecar proxy automatically injected into their pods.

kubectl label namespace dev istio-injection=enabled

Deploy Secret Resources

Create the DocumentDB and Amazon MQ Secrets in the appropriate dev and mongo-express namespaces.

kubectl apply -f ./resources/secrets/secrets.yaml

Install Istio Configuration Profile

Istio comes with several built-in configuration profiles. The profiles provide customization of the Istio control plane and the sidecars for the Istio data plane.

istioctl profile list
Istio configuration profiles:
default
demo
empty
external
minimal
openshift
preview
remote

For this demonstration, use the default profile, which installs Istio core, istiod, istio-ingressgateway, and istio-egressgateway.

istioctl install --set profile=demo -y
✔ Istio core installed
✔ Istiod installed
✔ Ingress gateways installed
✔ Egress gateways installed
✔ Installation complete

Deploy Istio Gateway, VirtualService, and DestinationRule Resources

An Istio Gateway describes a load balancer operating at the edge of the mesh receiving incoming or outgoing HTTP/TCP connections. An Istio VirtualService defines a set of traffic routing rules to apply when a host is addressed. Lastly, an Istio DestinationRule defines policies that apply to traffic intended for a Service after routing has occurred. You need to deploy an Istio Gateway and a set of VirtualService. You will also need to deploy a set of DestinationRule resources. Create the Istio Gateway, Virtual Services, and Destination Rules, which you modified earlier.

kubectl apply -f resources/istio/gateway.yaml -n dev
kubectl apply -f resources/istio/virtualservices.yaml -n dev
kubectl apply -f resources/istio/destination-rules.yaml -n dev

Deploy Istio Telemetry Add-ons

The Istio project includes sample deployments of various telemetry add-ons that integrate with Istio. The add-ons include Jaeger, Zipkin, Kiali, Prometheus, and Grafana. While these applications are not a part of Istio, they are essential to making the most of Istio’s observability features. According to the Istio project, the deployments are meant to quickly get up and running and are optimized for this case. As a result, they may not be suitable for production. See the GitHub project for more info on integrating a production-grade version of each add-on.

Install the add-ons using the default configurations and then replace Prometheus with a modified version included in the project. The modified Kubernetes ConfigMap in the prometheus.yaml file has added configuration to scrape our reference platform’s /api/metrics endpoint.

kubectl apply -f $ISTIO_HOME/samples/addons
kubectl apply -f resources/istio/prometheus.yaml -n istio-system

You should see seven workloads in the namespace from the EKS Management Console’s Workloads tab, each with one pod up and running. The workloads include Grafana, Jaeger, Kiali, and Prometheus. Also included is the Istio Configuration demo Profile’s istiod, istio-ingressgateway, and istio-egressgateway, installed previously.

Deploy Kubernetes Web UI (Dashboard)

Kubernetes Web UI (Dashboard) is a web-based Kubernetes user interface. You can use the Dashboard to deploy containerized applications to a Kubernetes cluster, troubleshoot your containerized application, and manage cluster resources. You can use the Dashboard to get an overview of applications running on your cluster, as well as for creating or modifying individual Kubernetes resources.

To deploy the dashboard, follow the steps outlined in the Tutorial: Deploy the Kubernetes Dashboard (web UI).

kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.0.5/aio/deploy/recommended.yaml
kubectl apply -f resources/aws/eks-admin-service-account.yaml

Each Service Account has a Secret with a valid Bearer Token that can be used to log in to the Dashboard. Use the following command to retrieve the token associated with the eks-admin Account.

kubectl -n kube-system describe secret $(kubectl -n kube-system get secret | grep eks-admin | awk '{print $1}')

Start the kubectl proxy in a separate terminal window.

kubectl proxy

Use the eks-admin Account’s token to log in to the Kubernetes Dashboard at the following URL:

http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#!/login

Deploy Mongo Express

Mongo Express is a web-based MongoDB administrative interface written with Node.js, Express, and Bootstrap3. Install Mongo Express into the mongo-express namespace on the EKS cluster to manage the DocumentDB cluster.

kubectl apply -f ./resources/services/mongo-express.yaml -n mongo-express

Obtain the external IP address of any of the Kubernetes worker nodes and the NodePort of Mongo Express with the following two commands:

kubectl get nodes -o wide |  awk {'print $1" " $2 " " $7'} | column -t
kubectl get service/mongo-express -n mongo-express

To ensure secure access to Mongo Express, create an Inbound Rule in your VPC’s Security Group that allows only your IP address (the ‘My IP’ option) access to Mongo Express running on the NodePort obtained above.

Start the kubectl proxy in a separate terminal window.

kubectl proxy

Use the external IP address of any of the Kubernetes worker nodes and current NodePort to access Mongo Express. Mongo Express will require you to enter the username and password you encoded in the Kubernetes Secret created earlier using basic authentication. Once you have deployed the reference application platform, later in the post, you will observe four databases: service-c, service-f, service-g, and service-h. The typical operational databases you would normally see with your own MongoDB installation are unavailable in the UI since DocumentDB is a managed service.

Mongo Express UI showing the four reference platform’s databases

Modify and Deploy the ALB Ingress

The project contains an ALB Ingress resource, alb-ingress.yaml. The AWS Load Balancer Controller installed earlier is configured to limit the ingresses ALB ingress controller controls. By setting the --ingress-class=alb argument, it constrains the controller’s scope to ingresses with matching kubernetes.io/ingress.class: alb annotation. This is especially helpful when running multiple ingress controllers in the same cluster.

The ALB Ingress resource, alb-ingress.yaml, needs to be modified before deployment. First, update the alb.ingress.kubernetes.io/healthcheck-port annotation. The port value is derived from the status-port of the istio-ingressgateway, which was installed as part of the Istio demo configuration profile. To obtain the status-port from the istio-ingressgateway, run the following command:

kubectl -n istio-system get svc istio-ingressgateway \
-o jsonpath='{.spec.ports[?(@.name=="status-port")].nodePort}'

Next, insert the ARN of your SSL/TLS (Transport Layer Security) certificate that is associated with the domain listed in the external-dns.alpha.kubernetes.io/hostname annotation into the ALB Ingress resource, alb-ingress.yaml. Run the following command to insert the TLS certificate’s ARN into the alb.ingress.kubernetes.io/certificate-arn annotation. This command assumes that your SSL/TLS certificate is registered with AWS Certificate Manager (ACM).

export ALB_CERT=$(aws acm list-certificates --certificate-statuses ISSUED \
| jq -r '.CertificateSummaryList[] | select(.DomainName=="*.example-api.com") | .CertificateArn')
yq e '.metadata.annotations."alb.ingress.kubernetes.io/certificate-arn" = env(ALB_CERT)' -i resources/other/alb-ingress.yaml

The alb.ingress.kubernetes.io/actions.ssl-redirect annotation will redirect all HTTP traffic to HTTPS. The TLS certificate is used for HTTPS traffic. The ALB then terminates the HTTPS traffic at the ALB and forwards the unencrypted traffic to the EKS cluster on port 80.

Finally, update external-dns.alpha.kubernetes.io/hostname annotation with a common-delimited list of your platform’s UI and API hostnames. Below is the complete ALB Ingress resource, alb-ingress.yaml.

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: demo-ingress
namespace: istio-system
annotations:
kubernetes.io/ingress.class: alb
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/tags: Environment=dev
alb.ingress.kubernetes.io/healthcheck-port: '{{ your_status_port }}'
alb.ingress.kubernetes.io/healthcheck-path: /healthz/ready
alb.ingress.kubernetes.io/healthcheck-protocol: HTTP
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/actions.ssl-redirect: '{"Type": "redirect", "RedirectConfig": { "Protocol": "HTTPS", "Port": "443", "StatusCode": "HTTP_301"}}'
external-dns.alpha.kubernetes.io/hostname: "{{ your_ui_hostname, your_api_hostname }}"
alb.ingress.kubernetes.io/certificate-arn: "{{ your_ssl_tls_cert_arn }}"
alb.ingress.kubernetes.io/load-balancer-attributes: routing.http2.enabled=true,idle_timeout.timeout_seconds=30
labels:
app: reference-app
spec:
rules:
- http:
paths:
- pathType: Prefix
path: /
backend:
service:
name: ssl-redirect
port:
name: use-annotation
- pathType: Prefix
path: /
backend:
service:
name: istio-ingressgateway
port:
number: 80
- pathType: Prefix
path: /api
backend:
service:
name: istio-ingressgateway
port:
number: 80

To deploy the ALB Ingress resource, alb-ingress.yaml, run the following command:

kubectl apply -f resources/other/alb-ingress.yaml

To confirm the configuration of the AWS Load Balancer Controller and the ingresses ALB ingress controller controls, run the following command:

kubectl describe ingress.networking.k8s.io --all-namespaces

Any misconfigurations should show up as errors in the Events section.

Running the following command should display the public DNS address of the ALB associated with port 80.

kubectl -n istio-system get ingress
NAME           CLASS    HOSTS   ADDRESS                                                                   PORTS   AGE
demo-ingress <none> * k8s-istiosys-demoingr-
...us-east-1.elb.amazonaws.com 80 23m

Use the EC2 Load Balancer Management Console to review the new ALB’s details.

Deploy Fluent Bit

According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open-source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.

We will use Fluent Bit to send the reference platform’s logs to Amazon CloudWatch Container Insights. To install Fluent Bit, I have used the procedure outlined in the AWS documentation: Quick Start Setup for Container Insights on Amazon EKS and Kubernetes. I recommend reviewing this documentation for detailed installation instructions.

kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/cloudwatch-namespace.yaml

ClusterName=${CLUSTER_NAME}
RegionName=${EKS_REGION}
FluentBitHttpPort='2020'
FluentBitReadFromHead='Off'
[[ ${FluentBitReadFromHead} = 'On' ]] && FluentBitReadFromTail='Off'|| FluentBitReadFromTail='On'
[[ -z ${FluentBitHttpPort} ]] && FluentBitHttpServer='Off' || FluentBitHttpServer='On'
kubectl create configmap fluent-bit-cluster-info \
--from-literal=cluster.name=${ClusterName} \
--from-literal=http.server=${FluentBitHttpServer} \
--from-literal=http.port=${FluentBitHttpPort} \
--from-literal=read.head=${FluentBitReadFromHead} \
--from-literal=read.tail=${FluentBitReadFromTail} \
--from-literal=logs.region=${RegionName} -n amazon-cloudwatch

kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluent-bit/fluent-bit.yaml

kubectl get pods -n amazon-cloudwatch

DASHBOARD_NAME=istio_observe_demo
REGION_NAME=${EKS_REGION}
CLUSTER_NAME=${CLUSTER_NAME}

curl https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/service/cwagent-prometheus/sample_cloudwatch_dashboards/fluent-bit/cw_dashboard_fluent_bit.json \
| sed "s/{{YOUR_AWS_REGION}}/${REGION_NAME}/g" \
| sed "s/{{YOUR_CLUSTER_NAME}}/${CLUSTER_NAME}/g" \
| xargs -0 aws cloudwatch put-dashboard --dashboard-name ${DASHBOARD_NAME} --dashboard-body

curl https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluentd/fluentd.yaml | kubectl delete -f -
kubectl delete configmap cluster-info -n amazon-cloudwatch

From the EKS Management Console’s Workloads tab, you should see three fluent-bit pods up and running in the amazon-cloudwatch namespace. There is one fluent-bit pod per EKS worker node.

Once the reference application platform is deployed and running, you should be able to visualize the application in the Amazon CloudWatch Container Insights console’s Map view.

Amazon CloudWatch Container Insights UI

The reference platform’s cluster logs will also be available in Amazon CloudWatch. You should have access to individual Log groups for each application’s components.

Lastly, individual pod logs can also be viewed through the Kubernetes Dashboard. The microservice’s log verbosity level is set to info by default. This level can be changed using the LOG_LEVEL environment variable in the service’s Kubernetes Deployment resource.

Deploy ServiceEntry Resources

Using Istio ServiceEntry configurations, you can reach any publicly accessible service from within your Istio cluster. The Istio proxy can be configured to block any host without an HTTP service or service entry defined within the mesh. We will not go to this extreme in the demonstration. However, we will configure ServiceEntry configurations to monitor egress traffic to the reference platform’s two external services, DocumentDB and Amazon MQ.

Confirm the istio-egressgateway is running, then deploy the two ServiceEntry resources you modified earlier.

kubectl get pod -l istio=egressgateway -n istio-system
NAME                                   READY   STATUS    RESTARTS   AGE
istio-egressgateway-585f7668fc-74qtf 1/1 Running 0 14h
kubectl apply -f resources/istio/external-mesh-document-db-internal.yaml
kubectl apply -f resources/istio/external-mesh-amazon-mq-internal.yaml

Deploy the Reference Application Platform

Each of the platform’s components has a file in the project containing both the Kubernetes Service and corresponding Deployment resources.

apiVersion: v1
kind: Service
metadata:
name: service-h
labels:
app: service-h
component: service
spec:
ports:
name: http
port: 8080
selector:
app: service-h
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: service-h
labels:
app: service-h
component: service
version: v1
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 1
selector:
matchLabels:
app: service-h
component: service
version: v1
template:
metadata:
labels:
app: service-h
component: service
version: v1
spec:
containers:
name: service-h
image: registry.hub.docker.com/garystafford/go-srv-h:1.6.8
livenessProbe:
httpGet:
path: /api/health
port: 8080
initialDelaySeconds: 3
periodSeconds: 3
env:
name: LOG_LEVEL
value: info
name: MONGO_CONN
valueFrom:
secretKeyRef:
name: go-srv-config
key: mongodb.conn
name: GREETING
value: "Nǐ hǎo (你好), from Service H!"
ports:
containerPort: 8080
imagePullPolicy: Always
view raw service-h.yaml hosted with ❤ by GitHub

Deploy the reference application platform’s frontend UI and eight backend microservices to the EKS cluster using the following commands:

kubectl apply -f ./resources/services/angular-ui.yaml -n dev

for service in a b c d e f g h; do
kubectl apply -f "./resources/services/service-$service.yaml" -n dev
done

From the EKS Management Console’s Workloads tab, you should observe that the three pods for each reference application platform component are up and running in the dev namespace.

You can also use the Kubernetes Dashboard to confirm that the deployments were successful to the dev namespace.

Test the Platform

You want to ensure the platform’s web-based UI is reachable via the AWS Application Load Balancer to EKS through Istio and to the UI’s FQDN (fully qualified domain name) of angular-ui.dev.svc.cluster.local. You want to ensure the platform’s eight microservices are communicating with each other and communicating with the external DocumentDB cluster and Amazon MQ RabbitMQ broker. The easiest way to test the cluster is by viewing the Angular UI in a web browser. For example, in my case, https://observe-ui.example-api.com.

Reference Application Platform’s Angular-based UI

The UI requires you to input the hostname of the backend, which is the edge service, Service A. For example, in my case, https://observe-api.example-api.com. Since you want to use your own hostname and the UI’s JavaScript code is running locally in your web browser, this option allows you to provide your own hostname. This is the same hostname you inserted into the Istio VirtualService for Service A. This hostname routes the API calls to the FQDN of Service A running in the dev namespace, service-a.dev.svc.cluster.local. You should observe seven greeting responses displayed in the UI, all but Service F.

You can also use tools like Postman to test the backend directly, using the same hostname of the backend, as above.

Using Postman to make requests against the /greeting endpoint of Service A
Using Postman to make requests against the /request-echo endpoint of Service A

Load Testing with Hey

You can also use performance testing tools to load-test the platform. Many issues will not show up until the platform is placed under elevated load. I recently tried hey, a modern go-based load generator tool as a replacement for Apache Bench (ab), Unlike ab, hey supports HTTP/2 endpoints, which is required to test the platform on EKS with Istio. You can install hey with Homebrew.

brew install hey

Using hey, you can test the reference application platform by hitting the API hostname and /api/greeting endpoint. The command below generates 1,000 requests, simulates 25 concurrent users, and uses HTTP/2. Traffic will be generated across all the services, the RabbitMQ broker, and the DocumentDB databases.

hey -n 1000 -c 25 -h2 {{ your_api_hostname }}/api/greeting

The results show 1,000 successful HTTP 200 responses from the reference platform’s API in about 43 seconds with an average response time of 1.0430 seconds.

To generate a consistent level of traffic over a longer period of time, try this variation of the command:

hey -n 25000 -c 25 -q 1 -h2 {{ your_api_hostname }}/api/greeting

This command generates a steady stream of traffic for about 18 minutes, making it more convenient when exploring and troubleshooting your observability tools.

Part Two

In part two of this post, we will explore each observability tool and see how they can help us manage the reference application platform running on the EKS cluster.

Jaeger UI showing a distributed trace of Service A

To tear down the EKS cluster, DocumentDB cluster, and Amazon MQ broker, use the commands below.

# EKS cluster
eksctl delete cluster --name $CLUSTER_NAME

# Amazon MQ
aws mq list-brokers | jq -r '.BrokerSummaries[] | .BrokerId'
aws mq delete-broker --broker-id {{ your_broker_id }}
# DocumentDB
aws docdb describe-db-clusters \
| jq -r '.DBClusters[] | .DbClusterResourceId'
aws docdb delete-db-cluster \
--db-cluster-identifier {{ your_cluster_id }}

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Cross-Account Amazon Elastic Container Registry (ECR) Access for ECS

Deploying containerized applications on Amazon ECS using cross-account elastic container registries

This is an updated version of a post, originally published in October 2019. This post uses AWS CLI version 2 and contains updated versions of all Docker images.

Introduction

There are two scenarios I frequently encounter that require sharing Amazon Elastic Container Registry (ECR)-based Docker images across multiple AWS Accounts. In the first scenario, a vendor wants to share a Docker image with their customer, stored in the vendor’s private container registry. Many popular container security and observability solutions function in this manner.

Below, we see an example of an application consisting of three containers. Two of the container images originated from the customer’s own ECR repositories (right side). The third image originated from their vendor’s ECR repository (left side).

In the second scenario, an enterprise operates multiple AWS accounts to create logical security boundaries between environments and responsibilities. The first AWS account contains the enterprise’s deployable assets, including their ECR image repositories. The enterprise has additional accounts, such as Development, Test, Staging, and Production, for each Software Development Life Cycle (SDLC) phase. The ECR images in the repository account need to be accessed from multiple AWS accounts and often across different AWS Regions for deployment.

Below, we see an example of a deployed application also consisting of three containers. All the container images originated from the ECR repositories account (left side). The images were pulled into the Production account during deployment to ECS (right side).

This post will explore the first scenario — a vendor who wants to share a private Docker image with their customer securely. The post will demonstrate how to share images across AWS accounts for use with Docker Swarm and Amazon Elastic Container Service (ECS) with AWS Fargate, both using ECR Repository Policies.

For the demonstration, we will use an existing application I have created, a RESTful, HTTP-based NLP (Natural Language Processing) API, consisting of four Golang microservices. The edge service, nlp-client, communicates with the rake-app, lang-app, and prose-app services. There is a fifth service, dyanmo-app, which is not discussed in this post, but easily added to the API.

A customer has developed the nlp-client, lang-app, and prose-app container-based microservices as part of their NLP application in the post’s scenario. Instead of developing their own implementation of the RAKE (Rapid Automatic Keyword Extraction) algorithm, they have licensed a version from a vendor. The vendor’s rake-app service is delivered in the form of a licensed Docker image. The acronym ISV (Independent Software Vendor) is used to represent the vendor throughout the code.

The NPL API exposes several endpoints accessible through the nlp-client service. The endpoints perform common NLP operations on text input, such as extracting keywords, tokens, entities, and sentences, and determining the language. All the endpoints can be listed using the /routes endpoint.

[
{
"method": "GET",
"path": "/error",
"name": "main.getError"
},
{
"method": "POST",
"path": "/keywords",
"name": "main.getKeywords"
},
{
"method": "POST",
"path": "/language",
"name": "main.getLanguage"
},
{
"method": "GET",
"path": "/health",
"name": "main.getHealth"
},
{
"method": "GET",
"path": "/health/:app",
"name": "main.getHealthUpstream"
},
{
"method": "GET",
"path": "/routes",
"name": "main.getRoutes"
},
{
"method": "POST",
"path": "/tokens",
"name": "main.getTokens"
},
{
"method": "POST",
"path": "/entities",
"name": "main.getEntities"
},
{
"method": "POST",
"path": "/sentences",
"name": "main.getSentences"
}
]

Requirements

To follow along with the post’s demonstration, you will need two AWS accounts, one representing the vendor and one representing one of their customers. It is relatively simple to create additional AWS accounts — all you need is a unique email address (easy with Gmail) and a credit card. Using AWS Organizations can make the task of creating and managing multiple accounts even easier.

I have intentionally used different AWS Regions to demonstrate how you can share ECR images across both AWS accounts and regions. You will need a current version of the AWS CLI version 2 and of Docker. Lastly, you will need adequate access to each AWS account to create resources.

Source Code

The demonstration’s source code is contained in five public GitHub repositories.

git clone --branch v2.0.0 \
    --single-branch --depth 1 \
    https://github.com/garystafford/ecr-cross-account-demo.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/nlp-client.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/prose-app.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/rake-app.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/lang-app.git

The v2.0.0 branch of the ecr-cross-account-demo GitHub repository contains all the CloudFormation templates and the Docker Compose Stack file.

.
├── LICENSE
├── README.md
├── cfn-templates
│ ├── development-user-group-customer.yml
│ ├── development-user-group-isv.yml
│ ├── ecr-repo-not-shared.yml
│ ├── ecr-repo-shared.yml
│ ├── public-subnet-public-loadbalancer.yml
│ └── public-vpc.yml
└── docker
└── stack.yml

Each of the other four GitHub repositories, such as the nlp-client repository, contains a Golang-based microservice, which together comprises the NLP API. Each repository also contains a Dockerfile.

.
├── Dockerfile
├── LICENSE
├── README.md
├── buildspec.yml
├── go.mod
├── go.sum
└── main.go

We will use AWS CloudFormation to create the necessary resources within both AWS accounts. We will also use CloudFormation to create an ECS Cluster and an Amazon ECS Task Definition for the customer account. Task Definition defines how ECS will deploy the application, consisting of four Docker containers, using AWS Fargate. In addition to ECS, we will create an Amazon Virtual Private Cloud (VPC) to house the ECS cluster and a public-facing, Layer 7 Application Load Balancer (ALB) to load-balance our ECS-based application.

Creating ECR Repositories

In the first AWS account, representing the vendor, we will execute two CloudFormation templates. The first template, development-user-group-isv.yml, creates the Development group and VendorDev user. The VendorDev user will be given explicit access to the vendor’s rake-app ECR repository. Change the DevUserPassword parameter’s value to something more secure.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export IAM_USER_PSWD=T0pS3cr3Tpa55w0rD
aws --region ${ISV_ECR_REGION} cloudformation create-stack \
--stack-name development-user-group-isv \
--template-body file://cfn-templates/development-user-group-isv.yml \
--parameters \
ParameterKey=DevUserPassword,ParameterValue=${IAM_USER_PSWD} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting CloudFormation Stack showing the new IAM Group and User.

Next, we will execute the second CloudFormation template, ecr-repo-shared.yml, which creates the vendor’s rake-app ECR image repository. The rake-app repository will house a copy of the vendor’s rake-app Docker Image. But first, let’s look at the CloudFormation template used to create the repository, specifically the RepositoryPolicyText section. Here we define two repository policies:

  • The AllowPushPull policy explicitly allows the VendorDev user to push and pull versions of the image to the ECR repository. We import the exported Amazon Resource Name (ARN) of the VendorDev user from the previous CloudFormation Stack Outputs. We have also allowed AWS CodeBuild service access to the ECR repository. This is known as a Service-Linked Role. We will not use CodeBuild in this brief post.
  • The AllowPull policy allows anyone in the customer’s AWS account (root) to pull any version of the image. They cannot push, only pull. Cross-account access can be restricted to a finer-grained set of the specific customer’s IAM Entities and source IP addresses.

Note the "ecr:GetAuthorizationToken" policy Action. This action will allow the customer’s user to log into the vendor’s ECR repository and receive an Authorization Token. The customer retrieves a token that is valid for a specified container registry for 12 hours.

RepositoryPolicyText:
Version: '2012-10-17'
Statement:
- Sid: AllowPushPull
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
AWS:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'DevUserArn']]
Action:
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:BatchGetImage'
- 'ecr:CompleteLayerUpload'
- 'ecr:DescribeImages'
- 'ecr:DescribeRepositories'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:GetRepositoryPolicy'
- 'ecr:InitiateLayerUpload'
- 'ecr:ListImages'
- 'ecr:PutImage'
- 'ecr:UploadLayerPart'
- Sid: AllowPull
Effect: Allow
Principal:
AWS: !Join [':', ['arn:aws:iam:', !Ref 'CustomerAccount', 'root']]
Action:
- 'ecr:GetAuthorizationToken'
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:BatchGetImage'
- 'ecr:DescribeRepositories' # optional permission
- 'ecr:DescribeImages' # optional permission

Before executing the following command to deploy the CloudFormation Stack, ecr-repo-shared.yml, replace the CUSTOMER_ACCOUNT value with your pseudo customer’s AWS account ID.

# change me
export CUSTOMER_ACCOUNT=999888777666
export CUSTOMER_ECR_REGION=us-west-2
# NLP Rake Microservice
REPO_NAME=rake-app
aws --region ${ISV_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-shared.yml \
--parameters \
ParameterKey=CustomerAccount,ParameterValue=${CUSTOMER_ACCOUNT} \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting CloudFormation Stack showing the new ECR repository.

Below, we see the ECR repository policies applied correctly in the Permissions tab of the rake-app repository. The first policy covers both the VendorDev user, referred to as an IAM Entity, as well as AWS CodeBuild, referred to as a Service Principal.

The second policy covers the customer’s AWS account ID.

Repeat this process in the customer’s AWS account. First, the CloudFormation template, development-user-group-customer.yml, containing the Development group and CustomerDev user.

# change me
export IAM_USER_PSWD=T0pS3cr3Tpa55w0rD
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name development-user-group-customer \
--template-body file://cfn-templates/development-user-group-customer.yml \
--parameters \
ParameterKey=DevUserPassword,ParameterValue=${IAM_USER_PSWD} \
--capabilities CAPABILITY_NAMED_IAM

Next, we will execute the second CloudFormation template, ecr-repo-not-shared.yml, three times, once for each of the customer’s three ECR repositories, nlp-client, lang-app, and prose-app. Note that in the RepositoryPolicyText section of the template we only define a single policy. Identical to the vendor’s policy, the AllowPushPull policy explicitly allows the previously-created CustomerDev user to push and pull versions of the image to the ECR repository. There is no cross-account access required to the customer’s two ECR repositories.

RepositoryPolicyText:
Version: '2012-10-17'
Statement:
- Sid: AllowPushPull
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
AWS:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'DevUserArn']]

Action:
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:BatchGetImage'
- 'ecr:CompleteLayerUpload'
- 'ecr:DescribeImages'
- 'ecr:DescribeRepositories'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:GetRepositoryPolicy'
- 'ecr:InitiateLayerUpload'
- 'ecr:ListImages'
- 'ecr:PutImage'
- 'ecr:UploadLayerPart'

Execute the following commands to create the three CloudFormation Stacks. The Stacks use the same template, ecr-repo-not-shared.yml, with a different Stack name and RepoName parameter values.

# NLP Client microservice
REPO_NAME=nlp-client
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

# NLP Prose microservice
REPO_NAME=prose-app
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM
# NLP Language microservice
REPO_NAME=lang-app
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting three ECR repositories.

At this point, we have our four ECR repositories across the two AWS accounts, with the proper ECR Repository Policies applied to each.

Building and Pushing Images to ECR

Next, we will build and push the three NLP application images to their corresponding ECR repositories. To confirm that the ECR policies are working correctly, log in as the VendorDev user and perform the below command.

aws ecr get-login-password --region ${ISV_ECR_REGION} \
| docker login --username AWS --password-stdin ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com

Logged in as the vendor’s VendorDev user, build and push the Docker image to the rake-app repository. The Dockerfile and Golang source code are located in each GitHub repository. With Golang and Docker multi-stage builds, we will create very small Docker images, based on Scratch, containing just the compiled Go executable binary. At a mere 7–15 MBs in size, pushing and pulling these Docker images across accounts is very fast.

docker build -t ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0 . --no-cache
docker push ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0

Below, we see the output from the vendor’s VendorDev user logging into the rake-app repository.

We see the vendor’s VendorDev user building results and pushing the Docker image to the rake-app repository.

Next, after logging in as the customer’s CustomerDev user, build and push the Docker images to the ECR nlp-client, lang-app, and prose-app repositories. Again, make sure you substitute the variable values below with your pseudo customer’s AWS account and preferred AWS region.

aws ecr get-login-password --region ${CUSTOMER_ECR_REGION} \
| docker login --username AWS --password-stdin ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com
# nlp-client
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0
# prose-app
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0
# lang-app
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0

At this point, each of the four customer ECR repositories has a Docker image pushed to them.

Deploying Locally to Docker Swarm

As a simple demonstration of cross-account ECS access, we will start with Docker Swarm. Logged in as the customer’s CustomerDev user and using the Docker Swarm Stack file included in the project, we can create and run a local copy of our NLP application in our customer’s account. First, we need to log into the vendor’s ECR repository in order to pull the image from the vendor’s ECR registry.

aws ecr get-login-password --region ${ISV_ECR_REGION} \
| docker login --username AWS --password-stdin ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com

Once logged in to the vendor’s ECR repository, we will pull the image. Using the docker describe-repositories and docker describe-images, we can list cross-account repositories and images your IAM user has access to if you are unsure.

aws ecr describe-repositories \
--registry-id ${ISV_ACCOUNT} \
--region ${ISV_ECR_REGION} \
--repository-name rake-app
aws ecr describe-images \
--registry-id ${ISV_ACCOUNT} \
--region ${ISV_ECR_REGION} \
--repository-name rake-app
docker pull ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0

Using the following command, you should see each of our four applications Docker images.

docker image ls --filter=reference='*amazonaws.com/*'

Below, we see an example of the expected terminal output from pulling the image and listing the images.

Build Docker Stack Locally

Next, build the Docker Swarm Stack. The Docker Compose file, stack.yml, is shown below. Note the location of the Docker images.

version: '3.9'
services:
nlp-client:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0
networks:
nlp-demo
ports:
target: 8080
published: 8080
protocol: tcp
mode: host
environment:
NLP_CLIENT_PORT
RAKE_ENDPOINT
PROSE_ENDPOINT
LANG_ENDPOINT
API_KEY
rake-app:
image: ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0
networks:
nlp-demo
environment:
RAKE_PORT
API_KEY
prose-app:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0
networks:
nlp-demo
environment:
PROSE_PORT
API_KEY
lang-app:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0
networks:
nlp-demo
environment:
LANG_PORT
API_KEY
networks:
nlp-demo:
volumes:
data: {}
view raw stack.yaml hosted with ❤ by GitHub

Execute the following commands to deploy the Docker Stack to Docker Swarm. Again, make sure you substitute the variable values below with your pseudo vendor and customer AWS accounts and regions. Additionally, the NLP API uses an API Key to protect all exposed endpoints, except the /health endpoint, across all four services. Change the default CloudFormation template’s API Key parameter to something more secure.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export CUSTOMER_ACCOUNT=999888777666
export CUSTOMER_ECR_REGION=us-west-2
export API_KEY=SuP3r5eCRetAutHK3y

# don't change me
export NLP_CLIENT_PORT=8080
export RAKE_PORT=8080
export PROSE_PORT=8080
export LANG_PORT=8080
export RAKE_ENDPOINT=http://rake-app:${RAKE_PORT}
export PROSE_ENDPOINT=http://prose-app:${PROSE_PORT}
export LANG_ENDPOINT=http://lang-app:${LANG_PORT}
export TEXT="The Nobel Prize is regarded as the most prestigious award in the World. Notable winners have included Marie Curie, Theodore Roosevelt, Albert Einstein, George Bernard Shaw, and Winston Churchill."
 
docker swarm init 
docker stack deploy --compose-file docker/stack.yml nlp

You can check the success of the deployment with either of the following commands:

docker stack ps nlp --no-trunc
docker container ls

Below, we see an example of the expected terminal output.

With the Docker Stack, you can hit the nlp-client service directly on localhost:8080. Unlike Fargate, which requires unique static ports for each container in the task, with Docker, we can choose to run all the containers on the same port without conflict since only the nlp-client service is exposing port :8080. Unlike with ECS, there is no load balancer in front of the Stack, since we only have a single node in our Swarm and thus a single container instance of each microservice for testing.

To test that the images were pulled successfully and the Docker Stack is running, we can execute a curl command against any of the API endpoints, such as /keywords. Below, I am using jq to pretty-print the JSON response payload.

curl -s -X POST \
"http://localhost:${NLP_CLIENT_PORT}/keywords" \
-H 'Content-Type: application/json' \
-H "X-API-Key: ${API_KEY}" \
-d '{"text": "The Internet is the global system of interconnected computer networks that use the Internet protocol suite to link devices worldwide."}' | jq

The resulting JSON response payload indicates that the nlp-client service was reached successfully and that it was then subsequently able to communicate with the rake-app service, whose container image originated from the vendor’s ECR repository.

[
{
"candidate": "interconnected computer networks",
"score": 9
},
{
"candidate": "link devices worldwide",
"score": 9
},
{
"candidate": "internet protocol suite",
"score": 8
},
{
"candidate": "global system",
"score": 4
},
{
"candidate": "internet",
"score": 2
}
]

Creating Amazon ECS Environment

Although using Docker Swarm locally is a great way to understand how cross-account ECR access works, it is not a typical use case for deploying containerized applications on the AWS Platform. More often, you could use Amazon ECS, Amazon Elastic Kubernetes Service (EKS), or enterprise versions of third-party orchestrators such as RedHat OpenShift or Rancher.

Using CloudFormation and some very convenient CloudFormation templates supplied by Amazon as a starting point, we will create a complete ECS environment for our application. First, we will create a VPC to house the ECS cluster and a public-facing ALB to front our ECS-based application, using the public-vpc.yml template.

aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name public-vpc \
--template-body file://cfn-templates/public-vpc.yml \
--capabilities CAPABILITY_NAMED_IAM

Next, we will create the ECS cluster and an Amazon ECS Task Definition using the public-subnet-public-loadbalancer.yml template. Again, the Task Definition defines how ECS will deploy our application using AWS Fargate. Amazon Fargate allows you to run containers without having to manage servers or clusters. No EC2 instances to manage! Woot! Below, in the CloudFormation template, we see the ContainerDefinitions section of the TaskDefinition resource that contains three container definitions. Note the three images and their ECR locations.

TaskDefinition:
Type: AWS::ECS::TaskDefinition
DependsOn: CloudWatchLogsGroup
Properties:
Family: !Ref 'ServiceNameClient'
Cpu: !Ref 'ContainerCpu'
Memory: !Ref 'ContainerMemory'
NetworkMode: awsvpc
RequiresCompatibilities:
FARGATE
EC2
ExecutionRoleArn:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'ECSTaskExecutionRole']]
TaskRoleArn:
Fn::If:
'HasCustomRole'
!Ref 'Role'
!Ref 'AWS::NoValue'
ContainerDefinitions:
Name: nlp-client
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/nlp-client:1.1.0']]
PortMappings:
ContainerPort: !Ref ContainerPortClient
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: NLP_CLIENT_PORT
Value: !Ref ContainerPortClient
Name: RAKE_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortRake]]
Name: PROSE_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortProse]]
Name: LANG_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortLang]]
Name: API_KEY
Value: !Ref ApiKey
Name: rake-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref VendorAccountId, 'dkr.ecr', !Ref VendorEcrRegion, 'amazonaws.com/rake-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: RAKE_PORT
Value: !Ref ContainerPortRake
Name: API_KEY
Value: !Ref ApiKey
Name: prose-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/prose-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: PROSE_PORT
Value: !Ref ContainerPortProse
Name: API_KEY
Value: !Ref ApiKey
Name: lang-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/lang-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: LANG_PORT
Value: !Ref ContainerPortLang
Name: API_KEY
Value: !Ref ApiKey

Execute the following command to create the ECS cluster and an ECS Task Definition using the CloudFormation template.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export API_KEY=SuP3r5eCRetAutHK3y
aws cloudformation create-stack \
--stack-name public-subnet-public-loadbalancer \
--template-body file://cfn-templates/public-subnet-public-loadbalancer.yml \
--parameters \
ParameterKey=VendorAccountId,ParameterValue=${ISV_ACCOUNT} \
ParameterKey=VendorEcrRegion,ParameterValue=${ISV_ECR_REGION} \
ParameterKey=ApiKey,ParameterValue=${API_KEY} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the expected output from the CloudFormation Management Console.

If you want to update the ECS Task Definition, simply run the aws cloudformation update-stack command.

CloudWatch Container Insights

The CloudFormation template does not enable CloudWatch Container Insights by default. Container Insights collects, aggregates, and summarizes metrics and logs from your containerized applications. To enable Insights, execute the following command:

aws ecs put-account-setting \
--name "containerInsights" --value "enabled"

Confirming the Cross-account Policy

If everything went right in the previous steps, we should now have an ECS cluster running our containerized application, including the container built from the vendor’s Docker image. Below, we see an example of the ECS cluster displayed in the management console.

Within the ECR cluster, we should observe a single running ECS Service. According to AWS, Amazon ECS allows you to run and maintain a specified number of instances of a task definition simultaneously in an Amazon ECS cluster; this is referred to as a Service.

We are running two instances of each container on ECS, thus two copies of the task within a single service. Each task runs its containers in a different Availability Zone for high availability.

Drilling into the service, note the new ALB associated with the new VPC, two public subnets, and the corresponding security group.

Switching to the Task Definitions tab, note that the ECS Task contains four container definitions that comprise the NLP API. Three images originated from the customer’s ECR repositories, and one from the vendor’s ECR repository.

Drilling in further, we will see the details of each container definition, including environment variables, passed from ECR to the container and on to the Golang binary running in the container.

Accessing the NLP API on ECS

With our earlier Docker Swarm example, the curl command was issued against localhost. We now have a public-facing Application Load Balancer (ALB) in front of our ECS-based application. We will use the DNS name of the ALB to hit our application on ECS. The DNS address (A Record) can be obtained from the Load Balancer Management Console, as shown below, or from the Output tab of the public-vpc CloudFormation Stack.

Another difference between the earlier Docker Swarm example and ECS is the port. Although the edge service, nlp-client, runs on port :8080, the ALB acts as a reverse proxy, passing requests from port :80 on the ALB to port :8080 of the nlp-client container instances (actually, the shared ENI of the running task).

Although I did set up a custom domain name for the ALB using Route53 and enabled HTTPS (port 443 on the ELB), https://nlp-ecs.example-api.com, for the sake of brevity, I will not go into the details in this post.

To test our deployed ECS, we can use a tool like curl or Postman to test the API’s endpoints. Don’t forget to you will need to add the API Key for authentication using the X-API-Key header key/value pair. Below we see a successful GET against the /routes endpoint, using Postman.

Here we see a successful POST against the /keywords endpoint, using Postman.

Cleaning Up

To clean up the demonstration’s AWS resources and Docker Stack, run the following scripts in the appropriate AWS accounts. Importantly, similar to S3, you must delete all the Docker images in the ECR repositories first, before deleting the repository, or else you will receive a CloudFormation error. This includes untagged images.

# local docker stack
docker stack rm nlp
# customer account
aws ecr batch-delete-image \
--repository-name nlp-client \
--image-ids imageTag=1.1.0
aws ecr batch-delete-image \
--repository-name prose-app \
--image-ids imageTag=1.1.0
aws ecr batch-delete-image \
--repository-name lang-app \
--image-ids imageTag=1.1.0
aws cloudformation delete-stack \
--stack-name ecr-repo-nlp-client
aws cloudformation delete-stack \
--stack-name ecr-repo-prose-app
aws cloudformation delete-stack \
--stack-name ecr-repo-lang-app
aws cloudformation delete-stack \
--stack-name public-subnet-public-loadbalancer
aws cloudformation delete-stack \
--stack-name public-vpc
aws cloudformation delete-stack \
--stack-name development-user-group-customer
# vendor account
aws ecr batch-delete-image \
--repository-name rake-app \
--image-ids imageTag=1.1.0
aws cloudformation delete-stack \
--stack-name ecr-repo-rake-app
aws cloudformation delete-stack \
--stack-name development-user-group-isv

Conclusion

In the preceding post, we saw how multiple AWS accounts could share private ECR-based Docker images. There are variations and restrictions to the configuration of the ECR Repository Policies, depending on the deployment tools you are using, such as AWS CodeBuild, AWS CodeDeploy, or AWS Elastic Beanstalk. AWS does a good job of providing some examples in their documentation, including Amazon ECR Repository Policy Examples and Amazon Elastic Container Registry Identity-Based Policy Examples.

In late 2020, AWS released Amazon Elastic Container Registry Public (ECR Public). Although this post was about private images, for public images, ECR Public allows you to store, manage, share, and deploy container images for anyone to discover and download globally.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Employing Amazon Macie to Discover and Protect Sensitive Data in your Amazon S3-based Data Lake

Introduction

Working with Analytics customers, it’s not uncommon to see data lakes with a dozen or more discrete data sources. Data typically originates from sources both internal and external to the customer. Internal data may come from multiple teams, departments, divisions, and enterprise systems. External data comes from vendors, partners, public sources, and subscriptions to licensed data sources. The volume, velocity, variety, veracity, and method of delivery vary across the data sources. All this data is being fed into data lakes for purposes such as analytics, business intelligence, and machine learning.

Given the growing volumes of incoming data and variations amongst data sources, it is increasingly complex, expensive, and time-consuming for organizations to ensure compliance with relevant laws, policies, and regulations. Regulations that impact how data is handled in a data lake include the Organizations Health Insurance Portability and Accountability Act (HIPAA), General Data Privacy Regulation (GDPR), Payment Card Industry Data Security Standard (PCI DSS), California Consumer Privacy Act (CCPA), and the Federal Information Security Management Act (FISMA).

Data Lake

AWS defines a data lake as a centralized repository that allows you to store all your structured and unstructured data at any scale. Once in the data lake, you run different types of analytics — from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

Data in a data lake is regularly organized or separated by its stage in the analytics process. Incoming data is often referred to as raw data. Data is then processed — cleansed, filtered, enriched, and tokenized if necessary. Lastly, the data is analyzed and aggregated, and the results are written back to the data lake. The analyzed and aggregated data is used to build business intelligence dashboards and reports, machine learning models, and is delivered to downstream or external systems. The different categories of data — raw, processed, and aggregated, are frequently referred to as bronze, silver, and gold, a reference to their overall data quality or value.

Protecting the Data Lake

Imagine you’ve received a large volume of data from an external data source. The incoming data is cleansed, filtered, and enriched. The data is re-formatted, partitioned, compressed for analytical efficiency, and written back to the data lake. Your analytics pipelines run complex and time-consuming queries against the data. Unfortunately, while building reports for a set of stakeholders, you realize that the original data accidentally included credit card information and other sensitive information about your customers. In addition to being out of compliance, you have the wasted time and expense of the initial data processing, as well as the extra time and expense to replace and re-process the data. The solution — Amazon Macie.

Amazon Macie

According to AWS, Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to discover and protect your sensitive data stored in Amazon Simple Storage Service (Amazon S3). Macie’s alerts, or findings, can be searched, filtered, and sent to Amazon EventBridge, formerly called Amazon CloudWatch Events, for easy integration with existing workflow or event management systems, or to be used in combination with AWS services, such as AWS Step Functions or Amazon Managed Workflows for Apache Airflow (MWAA) to take automated remediation actions.

Amazon Macie’s Summary view

Data Discovery and Protection

In this post, we will deploy an automated data inspection workflow to examine sample data in an S3-based data lake. Amazon Macie will examine data files uploaded to an encrypted S3 bucket. If sensitive data is discovered within the files, the files will be moved to an encrypted isolation bucket for further investigation. Email and SMS text alerts will be sent. This workflow will leverage Amazon EventBridge, Amazon Simple Notification Service (Amazon SNS), AWS Lambda, and AWS Systems Manager Parameter Store.

Macie data inspection workflow architecture

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/macie-demo.git

AWS resources for this post can be deployed using AWS Cloud​Formation. To follow along, you will need recent versions of Python 3, Boto3, and the AWS CLI version 2, installed.

Sample Data

We will use synthetic patient data, freely available from the MITRE Corporation. The data was generated by Synthea, MITRE’s open-source, synthetic patient generator that models the medical history of synthetic patients. Synthea data is exported in a variety of data standards, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Download and unzip the CSV files from the Synthea website.

REMOTE_FILE="synthea_sample_data_csv_apr2020.zip"
wget "https://storage.googleapis.com/synthea-public/${REMOTE_FILE}"
unzip -j "${REMOTE_FILE}" -d synthea_data/

The sixteen CSV data files contain a total of 471,852 rows of data, including column headers.

> wc -l *.csv

      598 allergies.csv
    3,484 careplans.csv
    8,377 conditions.csv
       79 devices.csv
   53,347 encounters.csv
      856 imaging_studies.csv
   15,479 immunizations.csv
   42,990 medications.csv
  299,698 observations.csv
    1,120 organizations.csv
    1,172 patients.csv
    3,802 payer_transitions.csv
       11 payers.csv
   34,982 procedures.csv
    5,856 providers.csv
        1 supplies.csv
  ------------------------------
  471,852 total

Amazon Macie Custom Data Identifier

To demonstrate some of the advanced features of Amazon Macie, we will use three Custom Data Identifiers. According to Macie’s documentation, a custom data identifier is a set of criteria that you define that reflects your organization’s particular proprietary data — for example, employee IDs, customer account numbers, or internal data classifications. We will create three custom data identifiers to detect the specific Synthea-format Patient ID, US driver number, and US passport number columns.

Post’s three custom data identifiers

The custom data identifiers in this post use a combination of regular expressions (regex) and keywords. The identifiers are designed to work with structured data, such as CSV files. Macie reports text that matches the regex pattern if any of these keywords are in the name of the column or field that stores the text, or if the text is within the maximum match distance of one of these words in a field value. Macie supports a subset of the regex pattern syntax provided by the Perl Compatible Regular Expressions (PCRE) library.

Patient ID custom data identifier console

Enable Macie

Before creating a CloudFormation stack with this demonstration’s resources, you will need to enable Amazon Macie from the AWS Management Console, or use the macie2 API and the AWS CLI with the enable-macie command.

aws macie2 enable-macie

Macie can also be enabled for your multi-account AWS Organization. The enable-organization-admin-account command designates an account as the delegated Amazon Macie administrator account for an AWS organization. For more information, see Managing multiple accounts in Amazon Macie.

AWS_ACCOUNT=111222333444
aws macie2 enable-organization-admin-account \
--admin-account-id ${AWS_ACCOUNT}

CloudFormation Stack

To create the CloudFormation stack with the supplied template, cloudformation/macie_demo.yml, run the following AWS CLI command. You will need to include an email address and phone number as input parameters. These parameter values will be used to send email and text alerts when Macie produces a sensitive data finding.

Please make sure you understand all the potential cost and security implications of creating the CloudFormation stack before continuing.

SNS_PHONE="+12223334444"
SNS_EMAIL="your-email-address@email.com"

aws cloudformation create-stack \
--stack-name macie-demo \
--template-body file://cloudformation/macie_demo.yml \
--parameters ParameterKey=SNSTopicEndpointSms,ParameterValue=${SNS_PHONE} \
ParameterKey=SNSTopicEndpointEmail,ParameterValue=${SNS_EMAIL} \
--capabilities CAPABILITY_NAMED_IAM

As shown in the AWS CloudFormation console, the new macie-demo stack will contain twenty-one AWS resources.

CloudFormation stack successfully created

Upload Data

Next, with the stack deployed, upload the CSV format data files to the encrypted S3 bucket, representing your data lake. The target S3 bucket has the following naming convention, synthea-data-<aws_account_id>-<region>. You can retrieve the two new bucket names from AWS Systems Manager Parameter Store, which were written there by CloudFormation, using the ssm API.

aws ssm get-parameters-by-path \
--path /macie_demo/ \
--query 'Parameters[*].Value'

Use the following ssm and s3 API commands to upload the data files.

DATA_BUCKET=$(aws ssm get-parameter \
--name /macie_demo/patient_data_bucket \
--query 'Parameter.Value')
aws s3 cp synthea_data/ \
    "s3://$(eval echo ${DATA_BUCKET})/patient_data/" --recursive

You should end up with sixteen CSV files in the S3 bucket, totaling approximately 82.3 MB.

Synthea patient data files uploaded to in S3

Sensitive Data Discovery Jobs

With the CloudFormation stack created and the patient data files uploaded, we will create two sensitive data discovery jobs. These jobs will scan the contents of the encrypted S3 bucket for sensitive data and report the findings. According to the documentation, you can configure a sensitive data discovery job to run only once for on-demand analysis and assessment, or on a recurring basis for periodic analysis, assessment, and monitoring. For this demonstration, we will create a one-time sensitive data discovery job using the AWS CLI. We will also create a recurring sensitive data discovery job using the AWS SDK for Python (Boto3). Both jobs can also be created from within Macie’s Jobs console.

Creating a new job Macie’s Jobs console

For both sensitive data discovery jobs, we will include the three custom data identifiers. Each of the custom data identifiers has a unique ID. We will need all three IDs to create the two sensitive data discovery jobs. You can use the AWS CLI and the macie2 API to retrieve the values.

aws macie2 list-custom-data-identifiers --query 'items[*].id'

Next, modify the job_specs/macie_job_specs_1x.json file, adding the three custom data identifier IDs. Also, update your AWS account ID and S3 bucket name (lines 3–5, 12, and 14). Note that since all the patient data files are in CSV format, we will limit our inspection to only files with a csv file extension (lines 18–33).

{
"customDataIdentifierIds": [
"custom-data-identifier-id-1",
"custom-data-identifier-id-2",
"custom-data-identifier-id-3"
],
"description": "Review Synthea patient data (1x)",
"jobType": "ONE_TIME",
"s3JobDefinition": {
"bucketDefinitions": [
{
"accountId": "111222333444",
"buckets": [
"synthea-data-111222333444-us-east-1"
]
}
],
"scoping": {
"includes": {
"and": [
{
"simpleScopeTerm": {
"comparator": "EQ",
"key": "OBJECT_EXTENSION",
"values": [
"csv"
]
}
}
]
}
}
},
"tags": {
"KeyName": "Project",
"KeyValue": "Amazon Macie Demo"
}
}

The above JSON template was generated using the standard AWS CLI generate-cli-skeleton command.

aws macie2 create-classification-job --generate-cli-skeleton

To create a one-time sensitive data discovery job using the above JSON template, run the following AWS CLI command. The unique job name will be dynamically generated based on the current time.

aws macie2 create-classification-job \
--name $(echo "SyntheaPatientData_${EPOCHSECONDS}") \
--cli-input-json file://job_specs/macie_job_specs_1x.json

In the Amazon Macie Jobs console, we can see a one-time sensitive data discovery job running. With a sampling depth of 100, the job will take several minutes to run. The samplingPercentage job property can be adjusted to scan any percentage of the data. If this value is less than 100, Macie selects the objects to analyze at random, up to the specified percentage and analyzes all the data in those objects.

One-time sensitive data discovery job running

Once the job is completed, the findings will be available in Macie’s Findings console. Using the three custom data identifiers in addition to Macie’s managed data identifiers, there should be a total of fifteen findings from the Synthea patient data files in S3. There should be six High severity findings and nine Medium severity findings. Of those, three are of a Personal finding type, seven of a Custom Identifier finding type, and five of a Multiple finding type, having both Personal and Custom Identifier finding types.

Macie’s Findings console displaying the results of the one-time job

Isolating High Severity Findings

The data inspection workflow we have deployed uses an AWS Lambda function, macie-object-mover, to isolate all data files with High severity findings to a second S3 bucket. The offending files are copied to the isolation bucket and deleted from the source bucket.

#!/usr/bin/env python3
# Purpose: Lambda function that moves S3 objects flagged by Macie
# Author: Gary A. Stafford (March 2021)
import json
import logging
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
def lambda_handler(event, context):
logging.info(f'event: {json.dumps(event)}')
destination_bucket_name = 'macie-isolation-111222333444-us-east-1'
source_bucket_name = event['detail']['resourcesAffected']['s3Bucket']['name']
file_key_name = event['detail']['resourcesAffected']['s3Object']['key']
copy_source_object = {'Bucket': source_bucket_name, 'Key': file_key_name}
logging.debug(f'destination_bucket_name: {destination_bucket_name}')
logging.debug(f'source_bucket_name: {source_bucket_name}')
logging.debug(f'file_key_name: {file_key_name}')
try:
response = s3_client.copy_object(
CopySource=copy_source_object,
Bucket=destination_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
try:
response = s3_client.delete_object(
Bucket=source_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
return {
'statusCode': 200,
'body': json.dumps(copy_source_object)
}
view raw lambda_function.py hosted with ❤ by GitHub

Amazon EventBridge

According to Macie’s documentation, to support integration with other applications, services, and systems, such as monitoring or event management systems, Amazon Macie automatically publishes findings to Amazon EventBridge as finding events. Amazon EventBridge is a serverless event bus that makes it easier to build event-driven applications at scale using events generated from your applications, integrated Software-as-a-Service (SaaS) applications, and AWS services.

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on any of the custom data identifiers, macie-rule-custom, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

Post’s event rules, shown in the Amazon EventBridge console

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on one of the three custom data identifiers, macie-rule-high, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

{
"source": [
"aws.macie"
],
"detail-type": [
"Macie Finding"
],
"detail": {
"classificationDetails": {
"result": {
"customDataIdentifiers": {
"detections": {
"name": [
"Patient ID",
"US Passport",
"US Driver License"
]
}
}
}
}
}
}

Six data files, containing High severity findings, will be moved to the isolation bucket by the Lambda, triggered by EventBridge.

Isolation bucket containing data files with High severity findings

Scheduled Sensitive Data Discovery Jobs

Data sources commonly deliver data on a repeated basis, such as nightly data feeds. For these types of data sources, we can schedule sensitive data discovery jobs to run on a scheduled basis. For this demonstration, we will create a scheduled job using the AWS SDK for Python (Boto3). Unlike the AWS CLI-based one-time job, you don’t need to modify the project’s script, scripts/create_macie_job_daily.py. The Python script will retrieve your AWS account ID and three custom data identifier IDs. The Python script then runs the create_classification_job command.

#!/usr/bin/env python3
# Purpose: Create Daily Macie classification job – Synthea patient data
# Author: Gary A. Stafford (March 2021)
import logging
import sys
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
sts_client = boto3.client('sts')
macie_client = boto3.client('macie2')
def main():
params = get_parameters()
account_id = sts_client.get_caller_identity()['Account']
custom_data_identifiers = list_custom_data_identifiers()
create_classification_job(params['patient_data_bucket'], account_id, custom_data_identifiers)
def list_custom_data_identifiers():
"""Returns a list of all custom data identifier ids"""
custom_data_identifiers = []
try:
response = macie_client.list_custom_data_identifiers()
for item in response['items']:
custom_data_identifiers.append(item['id'])
return custom_data_identifiers
except ClientError as e:
logging.error(e)
sys.exit(e)
def create_classification_job(patient_data_bucket, account_id, custom_data_identifiers):
"""Create Daily Macie classification job"""
try:
response = macie_client.create_classification_job(
customDataIdentifierIds=custom_data_identifiers,
description='Review Synthea patient data (Daily)',
jobType='SCHEDULED',
initialRun=True,
name='SyntheaPatientData_Daily',
s3JobDefinition={
'bucketDefinitions': [
{
'accountId': account_id,
'buckets': [
patient_data_bucket
]
}
],
'scoping': {
'includes': {
'and': [
{
'simpleScopeTerm': {
'comparator': 'EQ',
'key': 'OBJECT_EXTENSION',
'values': [
'csv',
]
}
},
]
}
}
},
samplingPercentage=100,
scheduleFrequency={
'dailySchedule': {}
},
tags={
'Project': 'Amazon Macie Demo'
}
)
logging.debug(f'Response: {response}')
except ClientError as e:
logging.error(e)
sys.exit(e)
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'patient_data_bucket': ssm_client.get_parameter(Name='/macie_demo/patient_data_bucket')['Parameter']['Value']
}
return params
if __name__ == '__main__':
main()

To create the scheduled sensitive data discovery job, run the following command.

python3 ./scripts/create_macie_job_daily.py

The scheduleFrequency parameter is set to { 'dailySchedule': {} }. This value specifies a daily recurrence pattern for running the job. The initialRun parameter of the create_classification_job command is set to True. This will cause the new job to analyze all eligible objects immediately after the job is created, in addition to on a daily basis.

Scheduled sensitive data discovery job in an active/idle state

Conclusion

In this post, we learned how we can use Amazon Macie to discover and protect sensitive data in Amazon S3. We learned how to use automation to trigger alerts based on Macie’s findings and to isolate data files based on the types of findings. The post’s data inspection workflow can easily be incorporated into existing data lake ingestion pipelines to ensure the integrity of incoming data.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options

Introduction

For anyone new to Amazon Managed Workflows for Apache Airflow (Amazon MWAA), especially those used to managing their own Apache Airflow platform, Amazon MWAA’s configuration might appear to be a bit of a black box at first. This brief post will explore Amazon MWAA’s configuration — how to inspect it and how to modify it. We will use Airflow DAGs to review an MWAA environment’s airflow.cfg file, environment variables, and Python packages.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project in 2019.

With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. For more information on Amazon MWAA, read my last post, Running Spark Jobs on Amazon EMR with Apache Airflow.

Image for post
Apache Airflow UI

Source Code

The DAGs referenced in this post are available on GitHub. Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/aws-airflow-demo.git

Accessing Configuration

Environment Variables

Environment variables are an essential part of an MWAA environment’s configuration. There are various ways to examine the environment variables. You could use Airflow’s BashOperator to simply call the command, env, or the PythonOperator to call a Python iterator function, as shown below. A sample DAG, dags/get_env_vars.py, is included in the project.

def print_env_vars():
keys = str(os.environ.keys().replace("', '", "'|'").split("|")
keys.sort()
for key in keys:
print(key)
get_env_vars_operator = PythonOperator(
task_id='get_env_vars_task',
python_callable=print_env_vars
)
view raw get_env_vars.py hosted with ❤ by GitHub

The DAG’s PythonOperator will iterate over the MWAA environment’s environment variables and output them to the task’s log. Below is a snippet of an example task’s log.

[2020-12-25 23:59:07,170] {{standard_task_runner.py:78}} INFO – Job 272: Subtask get_env_vars_task
[2020-12-25 23:59:08,423] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-25 23:59:08,516] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONSOLE_LOGS_ENABLED': 'false'
[2020-12-25 23:59:08,689] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONSOLE_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:08,777] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_EMAIL': 'airflow@example.com'
[2020-12-25 23:59:08,877] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_ID': 'get_env_vars'
[2020-12-25 23:59:08,970] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_OWNER': 'airflow'
[2020-12-25 23:59:09,269] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_TASK_ID': 'get_env_vars_task'
[2020-12-25 23:59:09,357] {{logging_mixin.py:112}} INFO – 'AIRFLOW_DAG_PROCESSING_LOGS_ENABLED': 'false'
[2020-12-25 23:59:09,552] {{logging_mixin.py:112}} INFO – 'AIRFLOW_DAG_PROCESSING_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:09,647] {{logging_mixin.py:112}} INFO – 'AIRFLOW_ENV_NAME': 'MyAirflowEnvironment'
[2020-12-25 23:59:09,729] {{logging_mixin.py:112}} INFO – 'AIRFLOW_HOME': '/usr/local/airflow'
[2020-12-25 23:59:09,827] {{logging_mixin.py:112}} INFO – 'AIRFLOW_SCHEDULER_LOGS_ENABLED': 'false'
[2020-12-25 23:59:12,915] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-25 23:59:12,986] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'
[2020-12-25 23:59:13,136] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__LOAD_EXAMPLES': 'False'
[2020-12-25 23:59:13,217] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__PARALLELISM': '10000'
[2020-12-25 23:59:14,531] {{logging_mixin.py:112}} INFO – 'AWS_DEFAULT_REGION': 'us-east-1'
[2020-12-25 23:59:14,565] {{logging_mixin.py:112}} INFO – 'AWS_EXECUTION_ENV': 'AWS_ECS_FARGATE'
[2020-12-25 23:59:14,616] {{logging_mixin.py:112}} INFO – 'AWS_REGION': 'us-east-1'
[2020-12-25 23:59:14,647] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_FILE': ''
[2020-12-25 23:59:14,679] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_LEVEL': '20'
[2020-12-25 23:59:14,711] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_REDIRECT': '1'
[2020-12-25 23:59:14,747] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_REDIRECT_LEVEL': 'WARNING'
view raw airflow_env_vars.txt hosted with ❤ by GitHub

Airflow Configuration File

According to Airflow, the airflow.cfg file contains Airflow’s configuration. You can edit it to change any of the settings. The first time you run Apache Airflow, it creates an airflow.cfg configuration file in your AIRFLOW_HOME directory and attaches the configurations to your environment as environment variables.

Amazon MWAA doesn’t expose the airflow.cfg in the Apache Airflow UI of an environment. Although you can’t access it directly, you can view the airflow.cfg file. The configuration file is located in your AIRFLOW_HOME directory, /usr/local/airflow (~/airflow by default).

There are multiple ways to examine your MWAA environment’s airflow.cfg file. You could use Airflow’s PythonOperator to call a Python function that reads the contents of the file, as shown below. The function uses the AIRFLOW_HOME environment variable to locate and read the airflow.cfg. A sample DAG, dags/get_airflow_cfg.py, is included in the project.

def print_airflow_cfg():
with open(f"{os.getenv('AIRFLOW_HOME')}/airflow.cfg", 'r') as airflow_cfg:
file_contents = airflow_cfg.read()
print(f'\n{file_contents}')
get_airflow_cfg_operator = PythonOperator(
task_id='get_airflow_cfg_task',
python_callable=print_airflow_cfg
)
view raw get_airflow_cfg.py hosted with ❤ by GitHub

The DAG’s task will read the MWAA environment’s airflow.cfg file and output it to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 00:02:57,163] {{standard_task_runner.py:78}} INFO – Job 274: Subtask get_airflow_cfg_task
[2020-12-26 00:02:57,583] {{logging_mixin.py:112}} INFO –
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /usr/local/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Set this to True if you want to enable remote logging.
remote_logging = True
# Users must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id = aws_default
remote_base_log_folder = cloudwatch://arn:aws:logs:::log-group:airflow-logs:*
encrypt_s3_logs = False
# Logging level
logging_level = INFO
# Logging level for Flask-appbuilder UI
fab_logging_level = WARN
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# Example: logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
[aws_mwaa]
redirect_url = https://console.aws.amazon.com/
session_duration_minutes = 720
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080
# Default timezone to display all dates in the RBAC UI, can be UTC, system, or
# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the
# default value of core/default_timezone will be used
# Example: default_ui_timezone = America/New_York
default_ui_timezone = UTC
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080
view raw airflow_cfg_log.txt hosted with ❤ by GitHub

Customizing Airflow Configurations

While AWS doesn’t expose the airflow.cfg in the Apache Airflow UI of your environment, you can change the default Apache Airflow configuration options directly within the Amazon MWAA console and continue using all other settings in airflow.cfg. The configuration options changed in the Amazon MWAA console are translated into environment variables.

To customize the Apache Airflow configuration, change the default options directly on the Amazon MWAA console. Select Edit, add or modify configuration options and values in the Airflow configuration options menu, then select Save. For example, we can change Airflow’s default timezone (core.default_ui_timezone) to America/New_York.

Image for post
Amazon MWAA’s Airflow configuration options

Once the MWAA environment is updated, which may take several minutes, view your changes by re-running the DAG,dags/get_env_vars.py. Note the new configuration item on both lines 2 and 6 of the log snippet shown below. The configuration item appears on its own (AIRFLOW__CORE_DEFAULT__UI_TIMEZONE), as well as part of the AIRFLOW_CONFIG_SECRETS dictionary environment variable.

[2020-12-26 05:00:57,756] {{standard_task_runner.py:78}} INFO – Job 293: Subtask get_env_vars_task
[2020-12-26 05:00:58,158] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONFIG_SECRETS': '{"AIRFLOW__CORE__DEFAULT_UI_TIMEZONE":"America/New_York"}'
[2020-12-26 05:00:58,190] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-26 05:01:00,537] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-26 05:01:00,578] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DEFAULT_UI_TIMEZONE': 'America/New_York'
[2020-12-26 05:01:00,630] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'

Using the MWAA API

We can also make configuration changes using the MWAA API. For example, to change the default Airflow UI timezone, call the MWAA API’s update-environment command using the AWS CLI. Include the --airflow-configuration-option parameter, passing the core.default_ui_timezone key/value pair as a JSON blob.

aws mwaa update-environment \
–name <your_environment_name> \
–airflow-configuration-options """{
\"core.default_ui_timezone\": \"America/Los_Angeles\"
}"""

To review an environment’s configuration, use the get-environment command in combination with jq.

aws mwaa get-environment \
–name <your_environment_name> | \
jq -r '.Environment.AirflowConfigurationOptions'

Below, we see an example of the output.

{
"core.default_ui_timezone": "America/Los_Angeles"
}

Python Packages

Airflow is written in Python, and workflows are created via Python scripts. Python packages are a crucial part of an MWAA environment’s configuration. According to the documentation, an ‘extra package’, is a Python subpackage that is not included in the Apache Airflow base, installed on your MWAA environment. As part of setting up an MWAA environment, you can specify the location of the requirements.txt file in the Airflow S3 bucket. Extra packages are installed using the requirements.txt file.

Image for post
Amazon MWAA environment’s configuration

There are several ways to check your MWAA environment’s installed Python packages and versions. You could use Airflow’s BashOperator to call the command, python3 -m pip list. A sample DAG, dags/get_py_pkgs.py, is included in the project.

list_python_packages_operator = BashOperator(
task_id='list_python_packages',
bash_command='python3 -m pip list'
)
view raw get_py_pkgs.py hosted with ❤ by GitHub

The DAG’s task will output a list of all Python packages and package versions to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 21:53:06,310] {{bash_operator.py:136}} INFO – Temporary script location: /tmp/airflowtmp2whgp_p8/list_python_packagesxo8slhc6
[2020-12-26 21:53:06,350] {{bash_operator.py:146}} INFO – Running command: python3 -m pip list
[2020-12-26 21:53:06,395] {{bash_operator.py:153}} INFO – Output:
[2020-12-26 21:53:06,750] {{bash_operator.py:157}} INFO – Package Version
[2020-12-26 21:53:06,786] {{bash_operator.py:157}} INFO – ———————- ———
[2020-12-26 21:53:06,815] {{bash_operator.py:157}} INFO – alembic 1.4.2
[2020-12-26 21:53:06,856] {{bash_operator.py:157}} INFO – amqp 2.6.1
[2020-12-26 21:53:06,898] {{bash_operator.py:157}} INFO – apache-airflow 1.10.12
[2020-12-26 21:53:06,929] {{bash_operator.py:157}} INFO – apispec 1.3.3
[2020-12-26 21:53:06,960] {{bash_operator.py:157}} INFO – argcomplete 1.12.0
[2020-12-26 21:53:07,002] {{bash_operator.py:157}} INFO – attrs 19.3.0
[2020-12-26 21:53:07,036] {{bash_operator.py:157}} INFO – Babel 2.8.0
[2020-12-26 21:53:07,071] {{bash_operator.py:157}} INFO – billiard 3.6.3.0
[2020-12-26 21:53:07,960] {{bash_operator.py:157}} INFO – boto3 1.16.10
[2020-12-26 21:53:07,993] {{bash_operator.py:157}} INFO – botocore 1.19.10
[2020-12-26 21:53:08,028] {{bash_operator.py:157}} INFO – cached-property 1.5.1
[2020-12-26 21:53:08,061] {{bash_operator.py:157}} INFO – cattrs 1.0.0
[2020-12-26 21:53:08,096] {{bash_operator.py:157}} INFO – celery 4.4.7
[2020-12-26 21:53:08,130] {{bash_operator.py:157}} INFO – certifi 2020.6.20
[2020-12-26 21:53:12,260] {{bash_operator.py:157}} INFO – pandas 1.1.0
[2020-12-26 21:53:12,289] {{bash_operator.py:157}} INFO – pendulum 1.4.4
[2020-12-26 21:53:12,490] {{bash_operator.py:157}} INFO – pip 9.0.3
[2020-12-26 21:53:12,522] {{bash_operator.py:157}} INFO – prison 0.1.3
[2020-12-26 21:53:12,550] {{bash_operator.py:157}} INFO – prometheus-client 0.8.0
[2020-12-26 21:53:12,580] {{bash_operator.py:157}} INFO – psutil 5.7.2
[2020-12-26 21:53:12,613] {{bash_operator.py:157}} INFO – pycparser 2.20
[2020-12-26 21:53:12,641] {{bash_operator.py:157}} INFO – pycurl 7.43.0.5
[2020-12-26 21:53:12,676] {{bash_operator.py:157}} INFO – Pygments 2.6.1
[2020-12-26 21:53:12,710] {{bash_operator.py:157}} INFO – PyGreSQL 5.2.1
[2020-12-26 21:53:12,746] {{bash_operator.py:157}} INFO – PyJWT 1.7.1

Conclusion

Understanding your Amazon MWAA environment’s airflow.cfg file, environment variables, and Python packages are all important for proper Airflow platform management. This brief post learned more about Amazon MWAA’s configuration — how to inspect it using DAGs and how to modify it through the Amazon MWAA console.

, , , ,

Leave a comment

Getting Started with Presto Federated Queries using Ahana’s PrestoDB Sandbox on AWS

Introduction

According to The Presto Foundation, Presto (aka PrestoDB), not to be confused with PrestoSQL, is an open-source, distributed, ANSI SQL compliant query engine. Presto is designed to run interactive ad-hoc analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto is used in production at an immense scale by many well-known organizations, including Facebook, Twitter, Uber, Alibaba, Airbnb, Netflix, Pinterest, Atlassian, Nasdaq, and more.

In the following post, we will gain a better understanding of Presto’s ability to execute federated queries, which join multiple disparate data sources without having to move the data. Additionally, we will explore Apache Hive, the Hive Metastore, Hive partitioned tables, and the Apache Parquet file format.

Presto on AWS

There are several options for Presto on AWS. AWS recommends Amazon EMR and Amazon Athena. Presto comes pre-installed on EMR 5.0.0 and later. The Athena query engine is a derivation of Presto 0.172 and does not support all of Presto’s native features. However, Athena has many comparable features and deep integrations with other AWS services. If you need full, fine-grain control, you could deploy and manage Presto, yourself, on Amazon EC2, Amazon ECS, or Amazon EKS. Lastly, you may decide to purchase a Presto distribution with commercial support from an AWS Partner, such as Ahana or Starburst. If your organization needs 24x7x365 production-grade support from experienced Presto engineers, this is an excellent choice.

Federated Queries

In a modern Enterprise, it is rare to find all data living in a monolithic datastore. Given the multitude of available data sources, internal and external to an organization, and the growing number of purpose-built databases, analytics engines must be able to join and aggregate data across many sources efficiently. AWS defines a federated query as a capability that ‘enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources.

Presto allows querying data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, and MongoDB. In fact, there are currently 24 different Presto data source connectors available. With Presto, we can write queries that join multiple disparate data sources, without moving the data. Below is a simple example of a Presto federated query statement that correlates a customer’s credit rating with their age and gender. The query federates two different data sources, a PostgreSQL database table, postgresql.public.customer, and an Apache Hive Metastore table, hive.default.customer_demographics, whose underlying data resides in Amazon S3.

WITH credit_demographics AS (
SELECT
(year (now()) c_birth_year) AS age,
cd_credit_rating AS credit_rating,
cd_gender AS gender,
count(cd_gender) AS gender_count
FROM
postgresql.public.customer
LEFT JOIN hive.default.customer_demographics ON c_current_cdemo_sk = cd_demo_sk
WHERE
c_birth_year IS NOT NULL
AND cd_credit_rating IS NOT NULL
AND lower(cd_credit_rating) != 'unknown'
AND cd_gender IS NOT NULL
GROUP BY
cd_credit_rating,
c_birth_year,
cd_gender
)
SELECT
age,
credit_rating,
gender,
gender_count
FROM
credit_demographics
WHERE
age BETWEEN 21 AND 65
ORDER BY
age,
credit_rating,
gender;

Ahana

The Linux Foundation’s Presto Foundation member, Ahana, was founded as the first company focused on bringing PrestoDB-based ad hoc analytics offerings to market and working to foster growth and evangelize the Presto community. Ahana’s mission is to simplify ad hoc analytics for organizations of all shapes and sizes. Ahana has been successful in raising seed funding, led by GV (formerly Google Ventures). Ahana’s founders have a wealth of previous experience in tech companies, including Alluxio, Kinetica, Couchbase, IBM, Apple, Splunk, and Teradata.

PrestoDB Sandbox

This post will use Ahana’s PrestoDB Sandbox, an Amazon Linux 2, AMI-based solution available on AWS Marketplace, to execute Presto federated queries.

Ahana’s PrestoDB Sandbox AMI allows you to easily get started with Presto to query data wherever your data resides. This AMI configures a single EC2 instance Sandbox to be both the Presto Coordinator and a Presto Worker. It comes with an Apache Hive Metastore backed by PostgreSQL bundled in. In addition, the following catalogs are bundled in to try, test, and prototype with Presto:

  • JMX: useful for monitoring and debugging Presto
  • Memory: stores data and metadata in RAM, which is discarded when Presto restarts
  • TPC-DS: provides a set of schemas to support the TPC Benchmark DS
  • TPC-H: provides a set of schemas to support the TPC Benchmark H

Apache Hive

In this demonstration, we will use Apache Hive and an Apache Hive Metastore backed by PostgreSQL. Apache Hive is data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. The structure can be projected onto data already in storage. A command-line tool and JDBC driver are provided to connect users to Hive. The Metastore provides two essential features of a data warehouse: data abstraction and data discovery. Hive accomplishes both features by providing a metadata repository that is tightly integrated with the Hive query processing system so that data and metadata are in sync.

Getting Started

To get started creating federated queries with Presto, we first need to create and configure our AWS environment, as shown below.

Architecture of the demonstration’s AWS environment and resources

Subscribe to Ahana’s PrestoDB Sandbox

To start, subscribe to Ahana’s PrestoDB Sandbox on AWS Marketplace. Make sure you are aware of the costs involved. The AWS current pricing for the default, Linux-based r5.xlarge on-demand EC2 instance hosted in US East (N. Virginia) is USD 0.252 per hour. For the demonstration, since performance is not an issue, you could try a smaller EC2 instance, such as r5.large instance costs USD 0.126 per hour.

The configuration process will lead you through the creation of an EC2 instance based on Ahana’s PrestoDB Sandbox AMI.

I chose to create the EC2 instance in my default VPC. Part of the demonstration includes connecting to Presto locally using JDBC. Therefore, it was also necessary to include a public IP address for the EC2 instance. If you chose to do so, I strongly recommend limiting the required ports 22 and 8080 in the instance’s Security Group to just your IP address (a /32 CIDR block).

Limiting access to ports 22 and 8080 from only my current IP address

Lastly, we need to assign an IAM Role to the EC2 instance, which has access to Amazon S3. I assigned the AWS managed policy, AmazonS3FullAccess, to the EC2’s IAM Role.

Attaching the AmazonS3FullAccess AWS managed policy to the Role

Part of the configuration also asks for a key pair. You can use an existing key or create a new key for the demo. For reference in future commands, I am using a key named ahana-presto and my key path of ~/.ssh/ahana-presto.pem. Be sure to update the commands to match your own key’s name and location.

Once complete, instructions for using the PrestoDB Sandbox EC2 are provided.

You can view the running EC2 instance, containing Presto, from the web-based AWS EC2 Management Console. Make sure to note the public IPv4 address or the public IPv4 DNS address as this value will be required during the demo.

AWS CloudFormation

We will use Amazon RDS for PostgreSQL and Amazon S3 as additional data sources for Presto. Included in the project files on GitHub is an AWS CloudFormation template, cloudformation/presto_ahana_demo.yaml. The template creates a single RDS for PostgreSQL instance in the default VPC and an encrypted Amazon S3 bucket.

AWSTemplateFormatVersion: "2010-09-09"
Description: "This template deploys a RDS PostgreSQL database and an Amazon S3 bucket"
Parameters:
DBInstanceIdentifier:
Type: String
Default: "ahana-prestodb-demo"
DBEngine:
Type: String
Default: "postgres"
DBEngineVersion:
Type: String
Default: "12.3"
DBAvailabilityZone:
Type: String
Default: "us-east-1f"
DBInstanceClass:
Type: String
Default: "db.t3.medium"
DBStorageType:
Type: String
Default: "gp2"
DBAllocatedStorage:
Type: Number
Default: 20
DBName:
Type: String
Default: "shipping"
DBUser:
Type: String
Default: "presto"
DBPassword:
Type: String
Default: "5up3r53cr3tPa55w0rd"
# NoEcho: True
Resources:
MasterDatabase:
Type: AWS::RDS::DBInstance
Properties:
DBInstanceIdentifier:
Ref: DBInstanceIdentifier
DBName:
Ref: DBName
AllocatedStorage:
Ref: DBAllocatedStorage
DBInstanceClass:
Ref: DBInstanceClass
StorageType:
Ref: DBStorageType
Engine:
Ref: DBEngine
EngineVersion:
Ref: DBEngineVersion
MasterUsername:
Ref: DBUser
MasterUserPassword:
Ref: DBPassword
AvailabilityZone: !Ref DBAvailabilityZone
PubliclyAccessible: true
Tags:
Key: Project
Value: "Demo of RDS PostgreSQL"
DataBucket:
DeletionPolicy: Retain
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Outputs:
Endpoint:
Description: "Endpoint of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Address
Port:
Description: "Port of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Port
JdbcConnString:
Description: "JDBC connection string of RDS PostgreSQL database"
Value: !Join
""
– "jdbc:postgresql://"
!GetAtt MasterDatabase.Endpoint.Address
":"
!GetAtt MasterDatabase.Endpoint.Port
"/"
!Ref DBName
"?user="
!Ref DBUser
"&password="
!Ref DBPassword
Bucket:
Description: "Name of Amazon S3 data bucket"
Value: !Ref DataBucket

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/presto-aws-federated-queries.git

To create the AWS CloudFormation stack from the template, cloudformation/rds_s3.yaml, execute the following aws cloudformation command. Make sure you change the DBAvailabilityZone parameter value (shown in bold) to match the AWS Availability Zone in which your Ahana PrestoDB Sandbox EC2 instance was created. In my case, us-east-1f.

aws cloudformation create-stack \
--stack-name ahana-prestodb-demo \
--template-body file://cloudformation/presto_ahana_demo.yaml \
--parameters ParameterKey=DBAvailabilityZone,ParameterValue=us-east-1f

To ensure the RDS for PostgreSQL database instance can be accessed by Presto running on the Ahana PrestoDB Sandbox EC2, manually add the PrestoDB Sandbox EC2’s Security Group to port 5432 within the database instance’s VPC Security Group’s Inbound rules. I have also added my own IP to port 5432, which enables me to connect to the RDS instance directly from my IDE using JDBC.

The AWS CloudFormation stack’s Outputs tab includes a set of values, including the JDBC connection string for the new RDS for PostgreSQL instance, JdbcConnString, and the Amazon S3 bucket’s name, Bucket. All these values will be required during the demonstration.

Preparing the PrestoDB Sandbox

There are a few steps we need to take to properly prepare the PrestoDB Sandbox EC2 for our demonstration. First, use your PrestoDB Sandbox EC2 SSH key to scp the properties and sql directories to the Presto EC2 instance. First, you will need to set the EC2_ENDPOINT value (shown in bold) to your EC2’s public IPv4 address or public IPv4 DNS value. You can hardcode the value or use the aws ec2 API command is shown below to retrieve the value programmatically.

# on local workstation
EC2_ENDPOINT=$(aws ec2 describe-instances \
--filters "Name=product-code,Values=ejee5zzmv4tc5o3tr1uul6kg2" \
"Name=product-code.type,Values=marketplace" \
--query "Reservations[*].Instances[*].{Instance:PublicDnsName}" \
--output text)
scp -i "~/.ssh/ahana-presto.pem" \
-r properties/ sql/ \
ec2-user@${EC2_ENDPOINT}:~/
ssh -i "~/.ssh/ahana-presto.pem" ec2-user@${EC2_ENDPOINT}

Environment Variables

Next, we need to set several environment variables. First, replace the DATA_BUCKET and POSTGRES_HOST values below (shown in bold) to match your environment. The PGPASSWORD value should be correct unless you changed it in the CloudFormation template. Then, execute the command to add the variables to your .bash_profile file.

echo """
export DATA_BUCKET=prestodb-demo-databucket-CHANGE_ME
export POSTGRES_HOST=presto-demo.CHANGE_ME.us-east-1.rds.amazonaws.com
export PGPASSWORD=5up3r53cr3tPa55w0rd
export JAVA_HOME=/usr
export HADOOP_HOME=/home/ec2-user/hadoop
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/tools/lib/*
export HIVE_HOME=/home/ec2-user/hive
export PATH=$HIVE_HOME/bin:$HADOOP_HOME/bin:$PATH
""" >>~/.bash_profile

Optionally, I suggest updating the EC2 instance with available updates and install your favorite tools, likehtop, to monitor the EC2 performance.

yes | sudo yum update
yes | sudo yum install htop
View of htop running on an r5.xlarge EC2 instance

Before further configuration for the demonstration, let’s review a few aspects of the Ahana PrestoDB EC2 instance. There are several applications pre-installed on the instance, including Java, Presto, Hadoop, PostgreSQL, and Hive. Versions shown are current as of early September 2020.

java -version
# openjdk version "1.8.0_252"
# OpenJDK Runtime Environment (build 1.8.0_252-b09)
# OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
hadoop version
# Hadoop 2.9.2
postgres --version
# postgres (PostgreSQL) 9.2.24
psql --version
# psql (PostgreSQL) 9.2.24
hive --version
# Hive 2.3.7
presto-cli --version
# Presto CLI 0.235-cb21100

The Presto configuration files are in the /etc/presto/ directory. The Hive configuration files are in the ~/hive/conf/ directory. Here are a few commands you can use to gain a better understanding of their configurations.

ls /etc/presto/
cat /etc/presto/jvm.config
cat /etc/presto/config.properties
cat /etc/presto/node.properties
# installed and configured catalogs
ls /etc/presto/catalog/
cat ~/hive/conf/hive-site.xml

Configure Presto

To configure Presto, we need to create and copy a new Presto postgresql catalog properties file for the newly created RDS for PostgreSQL instance. Modify the properties/rds_postgresql.properties file, replacing the value, connection-url (shown in bold), with your own JDBC connection string, shown in the CloudFormation Outputs tab.

connector.name=postgresql
connection-url=jdbc:postgresql://presto-demo.abcdefg12345.us-east-1.rds.amazonaws.com:5432/shipping
connection-user=presto
connection-password=5up3r53cr3tPa55w0rd

Move the rds_postgresql.properties file to its correct location using sudo.

sudo mv properties/rds_postgresql.properties /etc/presto/catalog/

We also need to modify the existing Hive catalog properties file, which will allow us to write to non-managed Hive tables from Presto.

connector.name=hive-hadoop2
hive.metastore.uri=thrift://localhost:9083
hive.non-managed-table-writes-enabled=true

The following command will overwrite the existing hive.properties file with the modified version containing the new property.

sudo mv properties/hive.properties |
/etc/presto/catalog/hive.properties

To finalize the configuration of the catalog properties files, we need to restart Presto. The easiest way is to reboot the EC2 instance, then SSH back into the instance. Since our environment variables are in the .bash_profile file, they will survive a restart and logging back into the EC2 instance.

sudo reboot

Add Tables to Apache Hive Metastore

We will use RDS for PostgreSQL and Apache Hive Metastore/Amazon S3 as additional data sources for our federated queries. The Ahana PrestoDB Sandbox instance comes pre-configured with Apache Hive and an Apache Hive Metastore, backed by PostgreSQL (a separate PostgreSQL 9.x instance pre-installed on the EC2).

The Sandbox’s instance of Presto comes pre-configured with schemas for the TPC Benchmark DS (TPC-DS). We will create identical tables in our Apache Hive Metastore, which correspond to three external tables in the TPC-DS data source’s sf1 schema: tpcds.sf1.customer, tpcds.sf1.customer_address, and tpcds.sf1.customer_demographics. A Hive external table describes the metadata/schema on external files. External table files can be accessed and managed by processes outside of Hive. As an example, here is the SQL statement that creates the external customer table in the Hive Metastore and whose data will be stored in the S3 bucket.

CREATE EXTERNAL TABLE IF NOT EXISTS `customer`(
`c_customer_sk` bigint,
`c_customer_id` char(16),
`c_current_cdemo_sk` bigint,
`c_current_hdemo_sk` bigint,
`c_current_addr_sk` bigint,
`c_first_shipto_date_sk` bigint,
`c_first_sales_date_sk` bigint,
`c_salutation` char(10),
`c_first_name` char(20),
`c_last_name` char(30),
`c_preferred_cust_flag` char(1),
`c_birth_day` integer,
`c_birth_month` integer,
`c_birth_year` integer,
`c_birth_country` char(20),
`c_login` char(13),
`c_email_address` char(50),
`c_last_review_date_sk` bigint)
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

The threeCREATE EXTERNAL TABLE SQL statements are included in the sql/ directory: sql/hive_customer.sql, sql/hive_customer_address.sql, and sql/hive_customer_demographics.sql. The bucket name (shown in bold above), needs to be manually updated to your own bucket name in all three files before continuing.

Next, run the following hive commands to create the external tables in the Hive Metastore within the existing default schema/database.

hive --database default -f sql/hive_customer.sql
hive --database default -f sql/hive_customer_address.sql
hive --database default -f sql/hive_customer_demographics.sql

To confirm the tables were created successfully, we could use a variety of hive commands.

hive --database default -e "SHOW TABLES;"
hive --database default -e "DESCRIBE FORMATTED customer;"
hive --database default -e "SELECT * FROM customer LIMIT 5;"
Using the ‘DESCRIBE FORMATTED customer_address;’ Hive command

Alternatively, you can also create the external table interactively from within Hive, using the hive command to access the CLI. Copy and paste the contents of the SQL files to the hive CLI. To exit hive use quit;.

Interactively querying within Apache Hive

Amazon S3 Data Source Setup

With the external tables created, we will now select all the data from each of the three tables in the TPC-DS data source and insert that data into the equivalent Hive tables. The physical data will be written to Amazon S3 as highly-efficient, columnar storage format, SNAPPY-compressed Apache Parquet files. Execute the following commands. I will explain why the customer_address table statements are a bit different, next.

# inserts 100,000 rows
presto-cli --execute """
INSERT INTO hive.default.customer
SELECT * FROM tpcds.sf1.customer;
"""
# inserts 50,000 rows across 52 partitions
presto-cli --execute """
INSERT INTO hive.default.customer_address
SELECT ca_address_sk, ca_address_id, ca_street_number,
ca_street_name, ca_street_type, ca_suite_number,
ca_city, ca_county, ca_zip, ca_country, ca_gmt_offset,
ca_location_type, ca_state
FROM tpcds.sf1.customer_address
ORDER BY ca_address_sk;
"""
# add new partitions in metastore
hive -e "MSCK REPAIR TABLE default.customer_address;"
# inserts 1,920,800 rows
presto-cli --execute """
INSERT INTO hive.default.customer_demographics
SELECT * FROM tpcds.sf1.customer_demographics;
"""

Confirm the data has been loaded into the correct S3 bucket locations and is in Parquet-format using the AWS Management Console or AWS CLI. Rest assured, the Parquet-format data is SNAPPY-compressed even though the S3 console incorrectly displays Compression as None. You can easily confirm the compression codec with a utility like parquet-tools.

Data organized by key prefixes in Amazon S3
Using S3’s ‘Select from’ feature to preview the SNAPPY-compressed Parquet format data

Partitioned Tables

The customer_address table is unique in that it has been partitioned by the ca_state column. Partitioned tables are created using the PARTITIONED BY clause.

CREATE EXTERNAL TABLE `customer_address`(
`ca_address_sk` bigint,
`ca_address_id` char(16),
`ca_street_number` char(10),
`ca_street_name` char(60),
`ca_street_type` char(15),
`ca_suite_number` char(10),
`ca_city` varchar(60),
`ca_county` varchar(30),
`ca_zip` char(10),
`ca_country` char(20),
`ca_gmt_offset` double precision,
`ca_location_type` char(20)
)
PARTITIONED BY (`ca_state` char(2))
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

According to Apache Hive, a table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Since the data for the Hive tables are stored in Amazon S3, that means that when the data is written to the customer_address table, it is automatically separated into different S3 key prefixes based on the state. The data is physically “partitioned”.

customer_address data, partitioned by the state, in Amazon S3

Whenever add new partitions in S3, we need to run the MSCK REPAIR TABLE command to add that table’s new partitions to the Hive Metastore.

hive -e "MSCK REPAIR TABLE default.customer_address;"

In SQL, a predicate is a condition expression that evaluates to a Boolean value, either true or false. Defining the partitions aligned with the attributes that are frequently used in the conditions/filters (predicates) of the queries can significantly increase query efficiency. When we execute a query that uses an equality comparison condition, such as ca_state = 'TN', partitioning means the query will only work with a slice of the data in the corresponding ca_state=TN prefix key. There are 50,000 rows of data in the customer_address table, but only 1,418 rows (2.8% of the total data) in the ca_state=TN partition. With the additional advantage of Parquet format with SNAPPY compression, partitioning can significantly reduce query execution time.

Adding Data to RDS for PostgreSQL Instance

For the demonstration, we will also replicate the schema and data of the tpcds.sf1.customer_address table to the new RDS for PostgreSQL instance’s shipping database.

CREATE TABLE customer_address (
ca_address_sk bigint,
ca_address_id char(16),
ca_street_number char(10),
ca_street_name char(60),
ca_street_type char(15),
ca_suite_number char(10),
ca_city varchar(60),
ca_county varchar(30),
ca_state char(2),
ca_zip char(10),
ca_country char(20),
ca_gmt_offset double precision,
ca_location_type char(20)
);

Like Hive and Presto, we can create the table programmatically from the command line or interactively; I prefer the programmatic approach. Use the following psql command, we can create the customer_address table in the public schema of the shipping database.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-f sql/postgres_customer_address.sql

Now, to insert the data into the new PostgreSQL table, run the following presto-cli command.

# inserts 50,000 rows
presto-cli --execute """
INSERT INTO rds_postgresql.public.customer_address
SELECT * FROM tpcds.sf1.customer_address;
"""

To confirm that the data was imported properly, we can use a variety of commands.

-- Should be 50000 rows in table
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT COUNT(*) FROM customer_address;"
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT * FROM customer_address LIMIT 5;"

Alternatively, you could use the PostgreSQL client interactively by copying and pasting the contents of the sql/postgres_customer_address.sql file to the psql command prompt. To interact with PostgreSQL from the psql command prompt, use the following command.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto

Use the \dt command to list the PostgreSQL tables and the \q command to exit the PostgreSQL client. We now have all the new data sources created and configured for Presto!

Interacting with Presto

Presto provides a web interface for monitoring and managing queries. The interface provides dashboard-like insights into the Presto Cluster and queries running on the cluster. The Presto UI is available on port 8080 using the public IPv4 address or the public IPv4 DNS.

There are several ways to interact with Presto, via the PrestoDB Sandbox. The post will demonstrate how to execute ad-hoc queries against Presto from an IDE using a JDBC connection and the Presto CLI. Other options include running queries against Presto from Java and Python applications, Tableau, or Apache Spark/PySpark.

Below, we see a query being run against Presto from JetBrains PyCharm, using a Java Database Connectivity (JDBC) connection. The advantage of using an IDE like JetBrains is having a single visual interface, including all the project files, multiple JDBC configurations, output results, and the ability to run multiple ad hoc queries.

Below, we see an example of configuring the Presto Data Source using the JDBC connection string, supplied in the CloudFormation stack Outputs tab.

Make sure to download and use the latest Presto JDBC driver JAR.

With JetBrains’ IDEs, we can even limit the databases/schemas displayed by the Data Source. This is helpful when we have multiple Presto catalogs configured, but we are only interested in certain data sources.

We can also run queries using the Presto CLI, three different ways. We can pass a SQL statement to the Presto CLI, pass a file containing a SQL statement to the Presto CLI, or work interactively from the Presto CLI. Below, we see a query being run, interactively from the Presto CLI.

As the query is running, we can observe the live Presto query statistics (not very user friendly in my terminal).

And finally, the view the query results.

Federated Queries

The example queries used in the demonstration and included in the project were mainly extracted from the scholarly article, Why You Should Run TPC-DS: A Workload Analysis, available as a PDF on the tpc.org website. I have modified the SQL queries to work with Presto.

In the first example, we will run the three versions of the same basic query statement. Version 1 of the query is not a federated query; it only queries a single data source. Version 2 of the query queries two different data sources. Finally, version 3 of the query queries three different data sources. Each of the three versions of the SQL statement should return the same results — 93 rows of data.

Version 1: Single Data Source

The first version of the query statement, sql/presto_query2.sql, is not a federated query. Each of the query’s four tables (catalog_returns, date_dim, customer, and customer_address) reference the TPC-DS data source, which came pre-installed with the PrestoDB Sandbox. Note table references on lines 11–13 and 41–42 are all associated with the tpcds.sf1 schema.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
catalog_returns,
date_dim,
customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
customer_address,
customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;
view raw presto_query2.sql hosted with ❤ by GitHub

We will run each query non-interactively using the presto-cli. We will choose the sf1 (scale factor of 1) tpcds schema. According to Presto, every unit in the scale factor (sf1, sf10, sf100) corresponds to a gigabyte of data.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2.sql \
--output-format ALIGNED \
--client-tags "presto_query2"

Below, we see the query results in the presto-cli.

Below, we see the first query running in Presto’s web interface.

Below, we see the first query’s results detailed in Presto’s web interface.

Version 2: Two Data Sources

In the second version of the query statement, sql/presto_query2_federated_v1.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. The other two tables (customer and customer_address) now reference the Apache Hive Metastore for their schema and underlying data in Amazon S3. Note table references on lines 11 and 12, as opposed to lines 13, 41, and 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
hive.default.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
hive.default.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v1.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v1"

Below, we see the second query’s results detailed in Presto’s web interface.

Even though the data is in two separate and physically different data sources, we can easily query it as though it were all in the same place.

Version 3: Three Data Sources

In the third version of the query statement, sql/presto_query2_federated_v2.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. One of the tables (hive.default.customer) references the Apache Hive Metastore. The fourth table (rds_postgresql.public.customer_address) references the new RDS for PostgreSQL database instance. The underlying data is in Amazon S3. Note table references on lines 11 and 12, and on lines 13 and 41, as opposed to line 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
rds_postgresql.public.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
rds_postgresql.public.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'