Posts Tagged Confluent
Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS
Posted by Gary A. Stafford in AWS, Cloud, Enterprise Software Development, Kubernetes, Serverless on August 11, 2021
Import data from an Amazon RDS database into an Amazon S3-based data lake using Amazon EKS, Amazon MSK, and Apache Kafka Connect
Introduction
A data lake, according to AWS, is a centralized repository that allows you to store all your structured and unstructured data at any scale. Data is collected from multiple sources and moved into the data lake. Once in the data lake, data is organized, cataloged, transformed, enriched, and converted to common file formats, optimized for analytics and machine learning.
One of an organization’s first challenges when building a data lake is how to continually import data from different data sources, such as relational and non-relational database engines, enterprise ERP, SCM, CRM, and SIEM software, flat-files, messaging platforms, IoT devices, and logging and metrics collection systems. Each data source will have its own unique method of connectivity, security, data storage format, and data export capabilities. There are many closed- and open-source tools available to help extract data from different data sources.
A popular open-source tool is Kafka Connect, part of the Apache Kafka ecosystem. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. Kafka Connect makes it simple to quickly define connectors that move large collections of data into and out of Kafka.
In the following post, we will learn how to use Kafka Connect to export data from our data source, an Amazon RDS for PostgreSQL relational database, into Kafka. We will then export that data from Kafka into our data sink — a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 will be converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect.
Best of all, to maintain data freshness of the data lake, as data is added or updated in PostgreSQL, Kafka Connect will automatically detect those changes and stream those changes into the data lake. This process is commonly referred to as Change Data Capture (CDC).

Change Data Capture
According to Gunnar Morling, Principal Software Engineer at Red Hat who works on the Debezium and Hibernate projects and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

You can find another good explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.
Query-based vs. Log-based CDC
To effectively demonstrate the difference between query-based and log-based CDC, examine the results of a SQL UPDATE statement, captured with both methods.
UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;
Here is how the change is represented as a JSON message payload using the query-based CDC method described in this post.
{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}
Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.
{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}
In an upcoming post, we will explore Debezium along with Apache Arvo and a schema registry to build a log-based CDC solution using PostgreSQL’s write-ahead log (WAL). In this post, we will examine query-based CDC using the ‘update timestamp’ technique.
Kafka Connect Connectors
In this post, we will use source and sink connectors from Confluent. Confluent is the undisputed leader in providing enterprise-grade managed Kafka through their Confluent Cloud and Confluent Platform products. Confluent offers dozens of source and sink connectors that cover the most popular data sources and sinks. Connectors used in this post will include:
- Confluent’s Kafka Connect JDBC Source connector imports data from any relational database with a JDBC driver into an Apache Kafka topic. The Kafka Connect JDBC Sink connector exports data from Kafka topics to any relational database with a JDBC driver.
- Confluent’s Kafka Connect Amazon S3 Sink connector exports data from Apache Kafka topics to S3 objects in either Avro, Parquet, JSON, or Raw Bytes.
Prerequisites
This post will focus on data movement with Kafka Connect, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:
- Amazon RDS for PostgreSQL instance (data source);
- Amazon S3 bucket (data sink);
- Amazon MSK cluster;
- Amazon EKS cluster;
- Connectivity between the Amazon RDS instance and Amazon MSK cluster;
- Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
- Ensure the Amazon MSK Configuration has
auto.create.topics.enable=true
. This setting isfalse
by default; - IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);
As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1
, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.
Source Code
All source code for this post, including the Kafka Connect configuration files and the Helm chart, is open-sourced and located on GitHub.
Authentication and Authorization
Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK, Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.
Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (IRSA) allows EKS to access MSK and S3 using IAM (see more details below).
Sample PostgreSQL Database
There are many sample PostgreSQL databases we could use to explore Kafka Connect. One of my favorite, albeit a bit dated, is PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytics costs.

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.
Last Updated Trigger
Each table in the Pagila database has a last_update
field. A convenient way to detect changes in the Pagila database, and ensure those changes make it from RDS to S3, is to have Kafka Connect use the last_update
field. This is a common technique to determine if and when changes were made to data using query-based CDC.
As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update
field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.
CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();
Kubernetes-based Kafka Connect
There are several options for deploying and managing Kafka Connect and other required Kafka management tools to Kubernetes on Amazon EKS. Popular solutions include Strimzi and Confluent for Kubernetes (CFK) or building your own Docker Image using the official Apache Kafka binaries. For this post, I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries. I then installed Confluent’s connectors and their dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf OSS container, building your own image can really teach you how Kafka and Kafka Connect work, in my opinion.
If you chose to use the same Kafka Connect Image used in this post, a Helm Chart is included in the post’s GitHub repository. The Helm chart will deploy a single Kubernetes pod to the kafka
Namespace on Amazon EKS.
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.0.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent
Before deploying the chart, update the value.yaml
file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName
). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on MSK and to S3 from Kafka Connect running on EKS.
Once the Service Account variable is updated, use the following command to deploy the Helm chart:
helm install kafka-connect-msk ./kafka-connect-msk \
--namespace $NAMESPACE --create-namespace
To get a shell to the running Kafka Connect container, use the following kubectl exec
command:
export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash

Configure Bootstrap Brokers
Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone and distributed modes. Since we will use Kafka Connect’s distributed mode, modify the config/connect-distributed.properties
file. A complete sample of the configuration file I used in this post is shown below.
Kafka Connect will run within the pod’s container, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers
property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:
# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')
Update the config/connect-distributed.properties
file.
# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
For convenience when executing Kafka commands, set the BBROKERS
environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:
export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"
Confirm Access to Amazon MSK from Kafka Connect
To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties
You can also try listing the existing Kafka consumer groups:
bin/kafka-consumer-groups.sh --list \ --bootstrap-server $BBROKERS \ --command-config config/client-iam.properties
If either of these fails, you will likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.
Kafka Connect
I recommend starting Kafka Connect as a background process using either method shown below.
bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
# alternately use nohup
nohup bin/connect-distributed.sh \
config/connect-distributed.properties &
To confirm Kafka Connect started properly, immediately tail the connect.log
file. The log will capture any startup errors for troubleshooting.
tail -f logs/connect.log

You can also examine the background process with the ps
command to confirm Kafka Connect is running. Note the process with PID 4915, below. Use the kill
command along with the PID to stop Kafka Connect if necessary.

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, the first time it starts up, as defined in the config/connect-distributed.properties
file: connect-configs
, connect-offsets
, and connect-status
. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-
Kafka Connect Connectors
This post demonstrates three progressively more complex Kafka Connect source and sink connectors. Each will demonstrate different connector capabilities to import/export and transform data between Amazon RDS for PostgreSQL and Amazon S3.
Connector Source #1
Create a new file (or modify the existing file if using my Kafka Connect container) named config/jdbc_source_connector_postgresql_00.json
. Modify lines 3–5, as shown below, to reflect your RDS instance’s JDBC connection details.
This first Kafka Connect source connector uses Confluent’s Kafka Connect JDBC Source connector (io.confluent.connect.jdbc.JdbcSourceConnector
) to export data from RDS with a JDBC driver and import that data into a series of Kafka topics. We will be exporting data from three tables in Pagila’s public
schema: address
, city
, and country
. We will write that data to a series of topics, arbitrarily prefixed with database name and schema, pagila.public.
. The source connector will create the three new topics automatically: pagila.public.address
, pagila.public.city
, and pagila.public.country
.
Note the connector’s mode
property value is set to timestamp
, and the last_update
field is referenced in the timestamp.column.name
property. Recall we added the database function and triggers to these three tables earlier in the post, which will update the last_update
field whenever a record is created or updated in the Pagila database. In addition to an initial export of the entire table, the source connector will poll the database every 5 seconds (poll.interval.ms
property), looking for changes that are newer than the most recently exported last_modified
date. This is accomplished by the source connector, using a parameterized query, such as:
SELECT *
FROM "public"."address"
WHERE "public"."address"."last_update" > ?
AND "public"."address"."last_update" < ?
ORDER BY "public"."address"."last_update" ASC
Connector Sink #1
Next, create and configure the first Kafka Connect sink connector. Create a new file or modify config/s3_sink_connector_00.json
. Modify line 7, as shown below to reflect your Amazon S3 bucket name.
This first Kafka Connect sink connector uses Confluent’s Kafka Connect Amazon S3 Sink connector (io.confluent.connect.s3.S3SinkConnector
) to export data from Kafka topics to Amazon S3 objects in JSON format.
Deploy Connectors #1
Deploy the source and sink connectors using the Kafka Connect REST Interface. Many tutorials demonstrate a POST
method against the /connectors
endpoint. However, this then requires a DELETE
and an additional POST
to update the connector. Using a PUT
against the /config
endpoint, you can update the connector without first issuing a DELETE
.
curl -s -d @"config/jdbc_source_connector_postgresql_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/config | jq
curl -s -d @"config/s3_sink_connector_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_00/config | jq

You can confirm the source and sink connectors are deployed and running using the following commands:
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_00/status | jq

Errors preventing the connector from starting correctly will be displayed using the /status
endpoint, as shown in the example below. In this case, the Kubernetes Service Account associated with the pod lacked the proper IAM permissions to the Amazon S3 target bucket.

Confirming Success of Connectors #1
The entire contents of the three tables will be exported from RDS to Kafka by the source connector, then exported from Kafka to S3 by the sink connector. To confirm the source connector worked, verify the existence of three new Kafka topics that should have been created: pagila.public.address
, pagila.public.city
, and pagila.public.country
.
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep pagila.public.
To confirm the sink connector worked, verify the new S3 objects have been created in the data lake’s S3 bucket. If you use the AWS CLI v2’s s3
API, we can view the contents of our target S3 bucket:
aws s3api list-objects \
--bucket your-s3-bucket \
--query 'Contents[].{Key: Key}' \
--output text
You should see approximately 15 new S3 objects (JSON files) in the S3 bucket, whose keys are organized by their topic names. The sink connector flushes new data to S3 every 100 records, or 60 seconds.
You could also use the AWS Management Console to view the S3 bucket’s contents.

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
For example, the address
table’s data will look similar to the following using the ‘Query with S3 Select’ feature via the console or API:
{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000
}
Congratulations, you have successfully imported data from a relational database into your data lake using Kafka Connect!
Connector Source #2
Create a new file or modify config/jdbc_source_connector_postgresql_01.json
. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.
This second Kafka Connect source connector also uses Confluent’s Kafka Connect JDBC Source connector to export data from the just address
table with a JDBC driver and import that data into a new Kafka topic, pagila.public.alt.address
. The difference with this source connector is transforms, known as Single Message Transformations (SMTs). SMTs are applied to messages as they flow through Connect from RDS to Kafka.
In this connector, there are four transforms, which perform the following common functions:
- Extract
address_id
integer field as the Kafka message key, as detailed in this blog post by Confluence (see ‘Setting the Kafka message key’). - Append Kafka topic name into message as a new static field;
- Append database name into message as a new static field;
Connector Sink #2
Create a new file or modify config/s3_sink_connector_01.json
. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.
This second sink connector is nearly identical to the first sink connector, except it only exports data from a single Kafka topic, pagila.public.alt.address
, into S3.
Deploy Connectors #2
Deploy the second set of source and sink connectors using the Kafka Connect REST Interface, exactly like the first pair.
curl -s -d @"config/jdbc_source_connector_postgresql_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/config | jq
curl -s -d @"config/s3_sink_connector_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_01/config | jq
Confirming Success of Connectors #2
Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first set, for a total of four connectors.
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_01/status | jq

To view the results of the first transform, extracting the address_id
integer field as the Kafka message key, we can use a Kafka command-line consumer:
bin/kafka-console-consumer.sh \
--topic pagila.public.alt.address \
--offset 102 --partition 0 --max-messages 5 \
--property print.key=true --property print.value=true \
--property print.offset=true --property print.partition=true \
--property print.headers=false --property print.timestamp=false \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
In the output below, note the beginning of each message, which displays the Kafka message key, identical to the address_id
. For example, {"type":"int32","optional":false},"payload":100}
.

topicExaming the Amazon S3 bucket using the AWS Management Console or the CLI, you should note the fourth set of S3 objects within the /topics/pagila.public.alt.address/
object key prefix.

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.alt.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
In the sample data below, note the two new fields that have been appended into each record, a result of the Kafka Connector transforms:
{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
Congratulations, you have successfully imported more data from a relational database into your data lake, including performing a simple series of transforms using Kafka Connect!
Connector Source #3
Create or modify config/jdbc_source_connector_postgresql_02.json
. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.
Unlike the first two source connectors that export data from tables, this connector uses a SELECT query to export data from the Pagila database’s address
, city
, and country
tables and import the results of that SQL query data into a new Kafka topic, pagila.public.alt.address
. The SQL query in the source connector’s configuration is as follows:
SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
The final parameterized query, executed by the source connector, which allows it to detect changes based on the last_update
field is as follows:
SELECT *
FROM (SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
WHERE "last_update" > ?
AND "last_update" < ?
ORDER BY "last_update" ASC
Connector Sink #3
Create or modify config/s3_sink_connector_02.json
. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.
This sink connector is significantly different than the previous two sink connectors. In addition to leveraging SMTs in the corresponding source connector, we are also using them in this sink connector. The sink connect appends three arbitrary static fields to each record as it is written to Amazon S3 — message_source
, message_source_engine
, and environment
using the InsertField transform. The sink connector also renames the district
field to state_province
using the ReplaceField transform.
The first two sink connectors wrote uncompressed JSON-format files to Amazon S3. This third sink connector optimizes the data imported into S3 for downstream data analytics. The sink connector writes GZIP-compressed Apache Parquet files to Amazon S3. In addition, the compressed Parquet files are partitioned by the country
field. Using a columnar file format, compression, and partitioning, queries against the data should be faster and more efficient.
Deploy Connectors #3
Deploy the final source and sink connectors using the Kafka Connect REST Interface, exactly like the first two pairs.
curl -s -d @"config/jdbc_source_connector_postgresql_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/config | jq
curl -s -d @"config/s3_sink_connector_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_02/config | jq
Confirming Success of Connectors #3
Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first two sets, for a total of six connectors.
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_02/status | jq

Reviewing the messages within the newpagila.query
topic, note the message_topic
field has been appended to the message by the source connector but not message_source
, message_source_engine
, and environment
fields. The sink connector appends these fields as it writes the messages to S3. Also, note the district
field has yet to be renamed by the sink connector to state_province
.

pagila.query
topicExaming the Amazon S3 bucket, again, you should note the fifth set of S3 objects within the /topics/pagila.query/
object key prefix. The Parquet-format files within are partitioned by country
.

Within each country
partition, there are Parquet files whose records contain addresses within those countries.

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the s3
API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.query/country=United States/pagila.query+0+0000000003.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
In the sample data below, note the four new fields that have been appended into each record, a result of the source and sink connector SMTs. Also, note the renamed district
field:
{
"address_id": 599,
"address": "1895 Zhezqazghan Drive",
"address2": "",
"city": "Garden Grove",
"state_province": "California",
"postal_code": "36693",
"country": "United States",
"phone": "137809746111",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 6,
"address": "1121 Loja Avenue",
"address2": "",
"city": "San Bernardino",
"state_province": "California",
"postal_code": "17886",
"country": "United States",
"phone": "838635286649",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 18,
"address": "770 Bydgoszcz Avenue",
"address2": "",
"city": "Citrus Heights",
"state_province": "California",
"postal_code": "16266",
"country": "United States",
"phone": "517338314235",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
Record Updates and Query-based CDC
What happens when we change data within the tables that Kafka Connect is polling every 5 seconds? To answer this question, let’s make a few DML changes:
-- update address field
UPDATE public.address
SET address = '123 CDC Test Lane'
WHERE address_id = 100;
-- update address2 field
UPDATE public.address
SET address2 = 'Apartment #2201'
WHERE address_id = 101;
-- second update to same record
UPDATE public.address
SET address2 = 'Apartment #2202'
WHERE address_id = 101;
-- insert new country
INSERT INTO public.country (country)
values ('Wakanda');
-- should be 110
SELECT country_id FROM country WHERE country='Wakanda';
-- insert new city
INSERT INTO public.city (city, country_id)
VALUES ('Birnin Zana', 110);
-- should be 601
SELECT city_id FROM public.city WHERE country_id=110;
-- update city_id to new city_id
UPDATE public.address
SET phone = city_id = 601
WHERE address_id = 102;
-- second update to same record
UPDATE public.address
SET district = 'Lake Turkana'
WHERE address_id = 102;
-- delete an address record
UPDATE public.customer
SET address_id = 200
WHERE customer_id IN (
SELECT customer_id FROM customer WHERE address_id = 104);
DELETE
FROM public.address
WHERE address_id = 104;
To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The three Kafka Connect source connectors detect changes, which are exported from PostgreSQL to Kafka. The three sink connectors then write these changes to new JSON and Parquet files to the target S3 bucket.

Viewing Data in the Data Lake
A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

When writing Parquet into partitions, one shortcoming of the Kafka Connect S3 sink connector is duplicate column names in AWS Glue. As a result, any columns used as partitions are duplicated in the Glue Data Catalog’s database table schema. The issue will result in an error similar to HIVE_INVALID_METADATA: Hive metadata for table pagila_query is invalid: Table descriptor contains duplicate columns
when performing queries. To remedy this, predefine the table and the table’s schema. Alternately, edit the Glue Data Catalog table’s schema after crawling and remove the duplicate, non-partition column(s). Below, that would mean removing duplicate country
column 7.

Performing a typical SQL SELECT query in Athena will return all of the original records as well as the changes we made earlier as duplicate records (same address_id
primary key).

SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM "pagila_kafka_connect"."pagila_query"
WHERE address_id BETWEEN 100 AND 105
ORDER BY address_id;
Note the original records for address_id
100–103 as well as each change we made earlier. The last_update
field reflects the date and time the record was created or updated. Also, note the record with address_id
104 in the query results. This is the record we deleted from the Pagila database.
To view only the most current data, we can use Athena’s ROW_NUMBER()
function:
SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1
AND address_id BETWEEN 100 AND 105
ORDER BY address_id;
Now, we only see the latest records. Unfortunately, the record we deleted with address_id
104 is still present in the query results.
Using log-based CDC with Debezium, as opposed to query-based CDC, we would have received a record in S3 that indicated the delete. The null value message, shown below, is referred to as a tombstone message in Kafka. Note the ‘before’ syntax with the delete record as opposed to the ‘after’ syntax we observed earlier with the update record.
{
"before": {
"address": "",
"address2": null,
"phone": "",
"district": "",
"last_update": "1970-01-01T00:00:00Z",
"address_id": 104,
"postal_code": null,
"city_id": 0
},
"source": {
"schema": "public",
"sequence": "[\"1101256482032\",\"1101256482032\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1101256483936,
"name": "pagila",
"txId": 17137,
"version": "1.6.1.Final",
"ts_ms": 1628864251512,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "d",
"ts_ms": 1628864251671
}
An inefficient solution to duplicates and deletes with query-based CDC would be to bulk ingest the entire query result set from the Pagila database each time instead of only the changes based on the last_update
field. Performing an unbounded query repeatedly on a huge dataset would negatively impact database performance. Notwithstanding, you would still end up with duplicates in the data lake unless you first purged the data in S3 before re-importing the new query results.
Data Movement
Using Amazon Athena, we can easily write the results of our ROW_NUMBER()
query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT
(CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT
statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by country
, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.
CREATE TABLE pagila_kafka_connect.pagila_query_processed
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['country'],
external_location='s3://your-s3-bucket/processed/pagila_query'
) AS
SELECT address_id, last_update, address, address2, city,
state_province, postal_code, country
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_update DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1 AND address_id BETWEEN 0 and 100
ORDER BY address_id;
Examing the Amazon S3 bucket, on last time, you should new set of S3 objects within the /processed/pagila_query/
key path. The Parquet-format files, partitioned by country, are the result of the CTAS query.

We should now see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS query. We can perform additional queries on the processed data.

ACID Transactions with a Data Lake
To fully take advantage of CDC and maximize the freshness of data in the data lake, we would also need to adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level updates and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.
Conclusion
This post explored how CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon EKS, Amazon MSK, and Apache Kafka Connect. We learned about query-based CDC for capturing ongoing changes to the source data. In a subsequent post, we will explore log-based CDC using Debezium and see how data lake file formats like Apache Avro, Apache Hudi, Apache Iceberg, and Delta Lake can help us manage the data in our data lake.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine
Posted by Gary A. Stafford in Bash Scripting, Cloud, DevOps, Enterprise Software Development, GCP, Java Development, Python, Software Development on December 28, 2018
Leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications. In previous posts, we have integrated other SaaS products, including as MongoDB Atlas fully-managed MongoDB-as-a-service, ElephantSQL fully-manage PostgreSQL-as-a-service, and CloudAMQP RabbitMQ-as-a-service, into cloud-native applications on Azure, AWS, GCP, and PCF.
In this post, we will build and deploy an existing, Spring Framework, microservice-based, cloud-native API to Google Kubernetes Engine (GKE), replete with Istio 1.0, on Google Cloud Platform (GCP). The API will rely on Confluent Cloud to provide a fully-managed, Kafka-based messaging-as-a-service (MaaS). Similarly, the API will rely on MongoDB Atlas to provide a fully-managed, MongoDB-based Database-as-a-service (DBaaS).
Background
In a previous two-part post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1 and Part 2, we examined the role of Apache Kafka in an event-driven, eventually consistent, distributed system architecture. The system, an online storefront RESTful API simulation, was composed of multiple, Java Spring Boot microservices, each with their own MongoDB database. The microservices used a publish/subscribe model to communicate with each other using Kafka-based messaging. The Spring services were built using the Spring for Apache Kafka and Spring Data MongoDB projects.
Given the use case of placing an order through the Storefront API, we examined the interactions of three microservices, the Accounts, Fulfillment, and Orders service. We examined how the three services used Kafka to communicate state changes to each other, in a fully-decoupled manner.
The Storefront API’s microservices were managed behind an API Gateway, Netflix’s Zuul. Service discovery and load balancing were handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. In that post, the entire containerized system was deployed to Docker Swarm.
Developing the services, not operationalizing the platform, was the primary objective of the previous post.
Featured Technologies
The following technologies are featured prominently in this post.
Confluent Cloud
In May 2018, Google announced a partnership with Confluence to provide Confluent Cloud on GCP, a managed Apache Kafka solution for the Google Cloud Platform. Confluent, founded by the creators of Kafka, Jay Kreps, Neha Narkhede, and Jun Rao, is known for their commercial, Kafka-based streaming platform for the Enterprise.
Confluent Cloud is a fully-managed, cloud-based streaming service based on Apache Kafka. Confluent Cloud delivers a low-latency, resilient, scalable streaming service, deployable in minutes. Confluent deploys, upgrades, and maintains your Kafka clusters. Confluent Cloud is currently available on both AWS and GCP.
Confluent Cloud offers two plans, Professional and Enterprise. The Professional plan is optimized for projects under development, and for smaller organizations and applications. Professional plan rates for Confluent Cloud start at $0.55/hour. The Enterprise plan adds full enterprise capabilities such as service-level agreements (SLAs) with a 99.95% uptime and virtual private cloud (VPC) peering. The limitations and supported features of both plans are detailed, here.
MongoDB Atlas
Similar to Confluent Cloud, MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime SLAs, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.
MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.
MongoDB Atlas has been featured in several past posts, including Deploying and Configuring Istio on Google Kubernetes Engine (GKE) and Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.
Kubernetes Engine
According to Google, Google Kubernetes Engine (GKE) provides a fully-managed, production-ready Kubernetes environment for deploying, managing, and scaling your containerized applications using Google infrastructure. GKE consists of multiple Google Compute Engine instances, grouped together to form a cluster.
A forerunner to other managed Kubernetes platforms, like EKS (AWS), AKS (Azure), PKS (Pivotal), and IBM Cloud Kubernetes Service, GKE launched publicly in 2015. GKE was built on Google’s experience of running hyper-scale services like Gmail and YouTube in containers for over 12 years.
GKE’s pricing is based on a pay-as-you-go, per-second-billing plan, with no up-front or termination fees, similar to Confluent Cloud and MongoDB Atlas. Cluster sizes range from 1 – 1,000 nodes. Node machine types may be optimized for standard workloads, CPU, memory, GPU, or high-availability. Compute power ranges from 1 – 96 vCPUs and memory from 1 – 624 GB of RAM.
Demonstration
In this post, we will deploy the three Storefront API microservices to a GKE cluster on GCP. Confluent Cloud on GCP will replace the previous Docker-based Kafka implementation. Similarly, MongoDB Atlas will replace the previous Docker-based MongoDB implementation.
Kubernetes and Istio 1.0 will replace Netflix’s Zuul and Eureka for API management, load-balancing, routing, and service discovery. Google Stackdriver will provide logging and monitoring. Docker Images for the services will be stored in Google Container Registry. Although not fully operationalized, the Storefront API will be closer to a Production-like platform, than previously demonstrated on Docker Swarm.
For brevity, we will not enable standard API security features like HTTPS, OAuth for authentication, and request quotas and throttling, all of which are essential in Production. Nor, will we integrate a full lifecycle API management tool, like Google Apigee.
Source Code
The source code for this demonstration is contained in four separate GitHub repositories, storefront-kafka-docker, storefront-demo-accounts, storefront-demo-orders, and, storefront-demo-fulfillment. However, since the Docker Images for the three storefront services are available on Docker Hub, it is only necessary to clone the storefront-kafka-docker project. This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone –branch master –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/storefront-kafka-docker.git | |
# optional repositories | |
git clone –branch gke –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/storefront-demo-accounts.git | |
git clone –branch gke –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/storefront-demo-orders.git | |
git clone –branch gke –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/storefront-demo-fulfillment.git |
Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.
Setup Process
The setup of the Storefront API platform is divided into a few logical steps:
- Create the MongoDB Atlas cluster;
- Create the Confluent Cloud Kafka cluster;
- Create Kafka topics;
- Modify the Kubernetes resources;
- Modify the microservices to support Confluent Cloud configuration;
- Create the GKE cluster with Istio on GCP;
- Apply the Kubernetes resources to the GKE cluster;
- Test the Storefront API, Kafka, and MongoDB are functioning properly;
MongoDB Atlas Cluster
This post assumes you already have a MongoDB Atlas account and an existing project created. MongoDB Atlas accounts are free to set up if you do not already have one. Account creation does require the use of a Credit Card.
For minimal latency, we will be creating the MongoDB Atlas, Confluent Cloud Kafka, and GKE clusters, all on the Google Cloud Platform’s us-central1 Region. Available GCP Regions and Zones for MongoDB Atlas, Confluent Cloud, and GKE, vary, based on multiple factors.
For this demo, I suggest creating a free, M0-sized MongoDB cluster. The M0-sized 3-data node cluster, with shared RAM and 512 MB of storage, and currently running MongoDB 4.0.4, is fine for individual development. The us-central1 Region is the only available US Region for the free-tier M0-cluster on GCP. An M0-sized Atlas cluster may take between 7-10 minutes to provision.
MongoDB Atlas’ Web-based management console provides convenient links to cluster details, metrics, alerts, and documentation.
Once the cluster is ready, you can review details about the cluster and each individual cluster node.
In addition to the account owner, create a demo_user
account. This account will be used to authenticate and connect with the MongoDB databases from the storefront services. For this demo, we will use the same, single user account for all three services. In Production, you would most likely have individual users for each service.
Again, for security purposes, Atlas requires you to whitelist the IP address or CIDR block from which the storefront services will connect to the cluster. For now, open the access to your specific IP address using whatsmyip.com, or much less-securely, to all IP addresses (0.0.0.0/0
). Once the GKE cluster and external static IP addresses are created, make sure to come back and update this value; do not leave this wide open to the Internet.
The Java Spring Boot storefront services use a Spring Profile, gke
. According to Spring, Spring Profiles provide a way to segregate parts of your application configuration and make it available only in certain environments. The gke
Spring Profile’s configuration values may be set in a number of ways. For this demo, the majority of the values will be set using Kubernetes Deployment, ConfigMap and Secret resources, shown later.
The first two Spring configuration values will need are the MongoDB Atlas cluster’s connection string and the demo_user
account password. Note these both for later use.
Confluent Cloud Kafka Cluster
Similar to MongoDB Atlas, this post assumes you already have a Confluent Cloud account and an existing project. It is free to set up a Professional account and a new project if you do not already have one. Atlas account creation does require the use of a Credit Card.
The Confluent Cloud web-based management console is shown below. Experienced users of other SaaS platforms may find the Confluent Cloud web-based console a bit sparse on features. In my opinion, the console lacks some necessary features, like cluster observability, individual Kafka topic management, detailed billing history (always says $0?), and persistent history of cluster activities, which survives cluster deletion. It seems like Confluent prefers users to download and configure their Confluent Control Center to get the functionality you might normally expect from a web-based Saas management tool.
As explained earlier, for minimal latency, I suggest creating the MongoDB Atlas cluster, Confluent Cloud Kafka cluster, and the GKE cluster, all on the Google Cloud Platform’s us-central1 Region. For this demo, choose the smallest cluster size available on GCP, in the us-central1 Region, with 1 MB/s R/W throughput and 500 MB of storage. As shown below, the cost will be approximately $0.55/hour. Don’t forget to delete this cluster when you are done with the demonstration, or you will continue to be charged.
Cluster creation of the minimally-sized Confluent Cloud cluster is pretty quick.
Once the cluster is ready, Confluent provides instructions on how to interact with the cluster via the Confluent Cloud CLI. Install the Confluent Cloud CLI, locally, for use later.
As explained earlier, the Java Spring Boot storefront services use a Spring Profile, gke
. Like MongoDB Atlas, the Confluent Cloud Kafka cluster configuration values will be set using Kubernetes ConfigMap and Secret resources, shown later. There are several Confluent Cloud Java configuration values shown in the Client Config Java tab; we will need these for later use.
SASL and JAAS
Some users may not be familiar with the terms, SASL and JAAS. According to Wikipedia, Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols. According to Confluent, Kafka brokers support client authentication via SASL. SASL authentication can be enabled concurrently with SSL encryption (SSL client authentication will be disabled).
There are numerous SASL mechanisms. The PLAIN SASL mechanism (SASL/PLAIN), used by Confluent, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use. The SASL/PLAIN mechanism should only be used with SSL as a transport layer to ensure that clear passwords are not transmitted on the wire without encryption.
According to Wikipedia, Java Authentication and Authorization Service (JAAS) is the Java implementation of the standard Pluggable Authentication Module (PAM) information security framework. According to Confluent, Kafka uses the JAAS for SASL configuration. You must provide JAAS configurations for all SASL authentication mechanisms.
Cluster Authentication
Similar to MongoDB Atlas, we need to authenticate with the Confluent Cloud cluster from the storefront services. The authentication to Confluent Cloud is done with an API Key. Create a new API Key, and note the Key and Secret; these two additional pieces of configuration will be needed later.
Confluent Cloud API Keys can be created and deleted as necessary. For security in Production, API Keys should be created for each service and regularly rotated.
Kafka Topics
With the cluster created, create the storefront service’s three Kafka topics manually, using the Confluent Cloud’s ccloud
CLI tool. First, configure the Confluent Cloud CLI using the ccloud init
command, using your new cluster’s Bootstrap Servers address, API Key, and API Secret. The instructions are shown above Clusters Client Config tab of the Confluent Cloud web-based management interface.
Create the storefront service’s three Kafka topics using the ccloud topic create
command. Use the list
command to confirm they are created.
# manually create kafka topics ccloud topic create accounts.customer.change ccloud topic create fulfillment.order.change ccloud topic create orders.order.fulfill # list kafka topics ccloud topic list accounts.customer.change fulfillment.order.change orders.order.fulfill
Another useful ccloud
command, topic describe
, displays topic replication details. The new topics will have a replication factor of 3 and a partition count of 12.
Adding the --verbose
flag to the command, ccloud --verbose topic describe
, displays low-level topic and cluster configuration details, as well as a log of all topic-related activities.
Kubernetes Resources
The deployment of the three storefront microservices to the dev
Namespace will minimally require the following Kubernetes configuration resources.
- (1) Kubernetes Namespace;
- (3) Kubernetes Deployments;
- (3) Kubernetes Services;
- (1) Kubernetes ConfigMap;
- (2) Kubernetes Secrets;
- (1) Istio 1.0 Gateway;
- (1) Istio 1.0 VirtualService;
- (2) Istio 1.0 ServiceEntry;
The Istio networking.istio.io
v1alpha3
API introduced the last three configuration resources in the list, to control traffic routing into, within, and out of the mesh. There are a total of four new io networking.istio.io
v1alpha3
API routing resources: Gateway, VirtualService, DestinationRule, and ServiceEntry.
Creating and managing such a large number of resources is a common complaint regarding the complexity of Kubernetes. Imagine the resource sprawl when you have dozens of microservices replicated across several namespaces. Fortunately, all resource files for this post are included in the storefront-kafka-docker project’s gke directory.
To follow along with the demo, you will need to make minor modifications to a few of these resources, including the Istio Gateway, Istio VirtualService, two Istio ServiceEntry resources, and two Kubernetes Secret resources.
Istio Gateway & VirtualService
Both the Istio Gateway and VirtualService configuration resources are contained in a single file, istio-gateway.yaml. For the demo, I am using a personal domain, storefront-demo.com
, along with the sub-domain, api.dev
, to host the Storefront API. The domain’s primary A record (‘@’) and sub-domain A record are both associated with the external IP address on the frontend of the load balancer. In the file, this host is configured for the Gateway and VirtualService resources. You can choose to replace the host with your own domain, or simply remove the host block altogether on lines 13–14 and 21–22. Removing the host blocks, you would then use the external IP address on the frontend of the load balancer (explained later in the post) to access the Storefront API (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: networking.istio.io/v1alpha3 | |
kind: Gateway | |
metadata: | |
name: storefront-gateway | |
spec: | |
selector: | |
istio: ingressgateway | |
servers: | |
– port: | |
number: 80 | |
name: http | |
protocol: HTTP | |
hosts: | |
– api.dev.storefront-demo.com | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-dev | |
spec: | |
hosts: | |
– api.dev.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.dev.svc.cluster.local |
Istio ServiceEntry
There are two Istio ServiceEntry configuration resources. Both ServiceEntry resources control egress traffic from the Storefront API services, both of their ServiceEntry Location items are set to MESH_INTERNAL
. The first ServiceEntry, mongodb-atlas-external-mesh.yaml, defines MongoDB Atlas cluster egress traffic from the Storefront API (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: networking.istio.io/v1alpha3 | |
kind: ServiceEntry | |
metadata: | |
name: mongdb-atlas-external-mesh | |
spec: | |
hosts: | |
– <your_atlas_url.gcp.mongodb.net> | |
ports: | |
– name: mongo | |
number: 27017 | |
protocol: MONGO | |
location: MESH_EXTERNAL | |
resolution: NONE |
The other ServiceEntry, confluent-cloud-external-mesh.yaml, defines Confluent Cloud Kafka cluster egress traffic from the Storefront API (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: networking.istio.io/v1alpha3 | |
kind: ServiceEntry | |
metadata: | |
name: confluent-cloud-external-mesh | |
spec: | |
hosts: | |
– <your_cluster_url.us-central1.gcp.confluent.cloud> | |
ports: | |
– name: kafka | |
number: 9092 | |
protocol: TLS | |
location: MESH_EXTERNAL | |
resolution: NONE |
Both need to have their host
items replaced with the appropriate Atlas and Confluent URLs.
Inspecting Istio Resources
The easiest way to view Istio resources is from the command line using the istioctl
and kubectl
CLI tools.
istioctl get gateway istioctl get virtualservices istioctl get serviceentry kubectl describe gateway kubectl describe virtualservices kubectl describe serviceentry
Multiple Namespaces
In this demo, we are only deploying to a single Kubernetes Namespace, dev
. However, Istio will also support routing traffic to multiple namespaces. For example, a typical non-prod Kubernetes cluster might support dev
, test
, and uat
, each associated with a different sub-domain. One way to support multiple Namespaces with Istio 1.0 is to add each host to the Istio Gateway (lines 14–16, below), then create a separate Istio VirtualService for each Namespace. All the VirtualServices are associated with the single Gateway. In the VirtualService, each service’s host address is the fully qualified domain name (FQDN) of the service. Part of the FQDN is the Namespace, which we change for each for each VirtualService (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: networking.istio.io/v1alpha3 | |
kind: Gateway | |
metadata: | |
name: storefront-gateway | |
spec: | |
selector: | |
istio: ingressgateway | |
servers: | |
– port: | |
number: 80 | |
name: http | |
protocol: HTTP | |
hosts: | |
– api.dev.storefront-demo.com | |
– api.test.storefront-demo.com | |
– api.uat.storefront-demo.com | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-dev | |
spec: | |
hosts: | |
– api.dev.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.dev.svc.cluster.local | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-test | |
spec: | |
hosts: | |
– api.test.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.test.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.test.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.test.svc.cluster.local | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-uat | |
spec: | |
hosts: | |
– api.uat.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.uat.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.uat.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.uat.svc.cluster.local |
MongoDB Atlas Secret
There is one Kubernetes Secret for the sensitive MongoDB configuration and one Secret for the sensitive Confluent Cloud configuration. The Kubernetes Secret object type is intended to hold sensitive information, such as passwords, OAuth tokens, and SSH keys.
The mongodb-atlas-secret.yaml file contains the MongoDB Atlas cluster connection string, with the demo_user
username and password, one for each of the storefront service’s databases (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: v1 | |
kind: Secret | |
metadata: | |
name: mongodb-atlas | |
namespace: dev | |
type: Opaque | |
data: | |
mongodb.uri.accounts: your_base64_encoded_value | |
mongodb.uri.fulfillment: your_base64_encoded_value | |
mongodb.uri.orders: your_base64_encoded_value |
Kubernetes Secrets are Base64 encoded. The easiest way to encode the secret values is using the Linux base64
program. The base64
program encodes and decodes Base64 data, as specified in RFC 4648. Pass each MongoDB URI string to the base64
program using echo -n
.
MONGODB_URI=mongodb+srv://demo_user:your_password@your_cluster_address/accounts?retryWrites=true echo -n $MONGODB_URI | base64 bW9uZ29kYitzcnY6Ly9kZW1vX3VzZXI6eW91cl9wYXNzd29yZEB5b3VyX2NsdXN0ZXJfYWRkcmVzcy9hY2NvdW50cz9yZXRyeVdyaXRlcz10cnVl
Repeat this process for the three MongoDB connection strings.
Confluent Cloud Secret
The confluent-cloud-kafka-secret.yaml file contains two data fields in the Secret’s data map, bootstrap.servers
and sasl.jaas.config
. These configuration items were both listed in the Client Config Java tab of the Confluent Cloud web-based management console, as shown previously. The sasl.jaas.config
data field requires the Confluent Cloud cluster API Key and Secret you created earlier. Again, use the base64 encoding process for these two data fields (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: v1 | |
kind: Secret | |
metadata: | |
name: confluent-cloud-kafka | |
namespace: dev | |
type: Opaque | |
data: | |
bootstrap.servers: your_base64_encoded_value | |
sasl.jaas.config: your_base64_encoded_value |
Confluent Cloud ConfigMap
The remaining five Confluent Cloud Kafka cluster configuration values are not sensitive, and therefore, may be placed in a Kubernetes ConfigMap, confluent-cloud-kafka-configmap.yaml (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: v1 | |
kind: ConfigMap | |
metadata: | |
name: confluent-cloud-kafka | |
data: | |
ssl.endpoint.identification.algorithm: "https" | |
sasl.mechanism: "PLAIN" | |
request.timeout.ms: "20000" | |
retry.backoff.ms: "500" | |
security.protocol: "SASL_SSL" |
Accounts Deployment Resource
To see how the services consume the ConfigMap and Secret values, review the Accounts Deployment resource, shown below. Note the environment variables section, on lines 44–90, are a mix of hard-coded values and values referenced from the ConfigMap and two Secrets, shown above (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: v1 | |
kind: Service | |
metadata: | |
name: accounts | |
labels: | |
app: accounts | |
spec: | |
ports: | |
– name: http | |
port: 8080 | |
selector: | |
app: accounts | |
— | |
apiVersion: extensions/v1beta1 | |
kind: Deployment | |
metadata: | |
name: accounts | |
labels: | |
app: accounts | |
spec: | |
replicas: 2 | |
strategy: | |
type: Recreate | |
selector: | |
matchLabels: | |
app: accounts | |
template: | |
metadata: | |
labels: | |
app: accounts | |
annotations: | |
sidecar.istio.io/inject: "true" | |
spec: | |
containers: | |
– name: accounts | |
image: garystafford/storefront-accounts:gke-2.2.0 | |
resources: | |
requests: | |
memory: "250M" | |
cpu: "100m" | |
limits: | |
memory: "400M" | |
cpu: "250m" | |
env: | |
– name: SPRING_PROFILES_ACTIVE | |
value: "gke" | |
– name: SERVER_SERVLET_CONTEXT-PATH | |
value: "/accounts" | |
– name: LOGGING_LEVEL_ROOT | |
value: "INFO" | |
– name: SPRING_DATA_MONGODB_URI | |
valueFrom: | |
secretKeyRef: | |
name: mongodb-atlas | |
key: mongodb.uri.accounts | |
– name: SPRING_KAFKA_BOOTSTRAP-SERVERS | |
valueFrom: | |
secretKeyRef: | |
name: confluent-cloud-kafka | |
key: bootstrap.servers | |
– name: SPRING_KAFKA_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: ssl.endpoint.identification.algorithm | |
– name: SPRING_KAFKA_PROPERTIES_SASL_MECHANISM | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: sasl.mechanism | |
– name: SPRING_KAFKA_PROPERTIES_REQUEST_TIMEOUT_MS | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: request.timeout.ms | |
– name: SPRING_KAFKA_PROPERTIES_RETRY_BACKOFF_MS | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: retry.backoff.ms | |
– name: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG | |
valueFrom: | |
secretKeyRef: | |
name: confluent-cloud-kafka | |
key: sasl.jaas.config | |
– name: SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: security.protocol | |
ports: | |
– containerPort: 8080 | |
imagePullPolicy: IfNotPresent |
Modify Microservices for Confluent Cloud
As explained earlier, Confluent Cloud’s Kafka cluster requires some very specific configuration, based largely on the security features of Confluent Cloud. Connecting to Confluent Cloud requires some minor modifications to the existing storefront service source code. The changes are identical for all three services. To understand the service’s code, I suggest reviewing the previous post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1. Note the following changes are already made to the source code in the gke
git branch, and not necessary for this demo.
The previous Kafka SenderConfig
and ReceiverConfig
Java classes have been converted to Java interfaces. There are four new SenderConfigConfluent
, SenderConfigNonConfluent
, ReceiverConfigConfluent
, and ReceiverConfigNonConfluent
classes, which implement one of the new interfaces. The new classes contain the Spring Boot Profile class-level annotation. One set of Sender and Receiver classes are assigned the @Profile("gke")
annotation, and the others, the @Profile("!gke")
annotation. When the services start, one of the two class implementations are is loaded, depending on the Active Spring Profile, gke
or not gke
. To understand the changes better, examine the Account service’s SenderConfigConfluent.java file (gist).
Line 20: Designates this class as belonging to the gke
Spring Profile.
Line 23: The class now implements an interface.
Lines 25–44: Reference the Confluent Cloud Kafka cluster configuration. The values for these variables will come from the Kubernetes ConfigMap and Secret, described previously, when the services are deployed to GKE.
Lines 55–59: Additional properties that have been added to the Kafka Sender configuration properties, specifically for Confluent Cloud.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.storefront.config; | |
import com.storefront.kafka.Sender; | |
import com.storefront.model.CustomerChangeEvent; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Profile; | |
import org.springframework.kafka.annotation.EnableKafka; | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.kafka.core.ProducerFactory; | |
import org.springframework.kafka.support.serializer.JsonSerializer; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Profile("gke") | |
@Configuration | |
@EnableKafka | |
public class SenderConfigConfluent implements SenderConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.properties.ssl.endpoint.identification.algorithm}") | |
private String sslEndpointIdentificationAlgorithm; | |
@Value("${spring.kafka.properties.sasl.mechanism}") | |
private String saslMechanism; | |
@Value("${spring.kafka.properties.request.timeout.ms}") | |
private String requestTimeoutMs; | |
@Value("${spring.kafka.properties.retry.backoff.ms}") | |
private String retryBackoffMs; | |
@Value("${spring.kafka.properties.security.protocol}") | |
private String securityProtocol; | |
@Value("${spring.kafka.properties.sasl.jaas.config}") | |
private String saslJaasConfig; | |
@Override | |
@Bean | |
public Map<String, Object> producerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | |
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm); | |
props.put("sasl.mechanism", saslMechanism); | |
props.put("request.timeout.ms", requestTimeoutMs); | |
props.put("retry.backoff.ms", retryBackoffMs); | |
props.put("security.protocol", securityProtocol); | |
props.put("sasl.jaas.config", saslJaasConfig); | |
return props; | |
} | |
@Override | |
@Bean | |
public ProducerFactory<String, CustomerChangeEvent> producerFactory() { | |
return new DefaultKafkaProducerFactory<>(producerConfigs()); | |
} | |
@Override | |
@Bean | |
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
@Override | |
@Bean | |
public Sender sender() { | |
return new Sender(); | |
} | |
} |
Once code changes were completed and tested, the Docker Image for each service was rebuilt and uploaded to Docker Hub for public access. When recreating the images, the version of the Java Docker base image was upgraded from the previous post to Alpine OpenJDK 12 (openjdk:12-jdk-alpine
).
Google Kubernetes Engine (GKE) with Istio
Having created the MongoDB Atlas and Confluent Cloud clusters, built the Kubernetes and Istio resources, modified the service’s source code, and pushed the new Docker Images to Docker Hub, the GKE cluster may now be built.
For the sake of brevity, we will manually create the cluster and deploy the resources, using the Google Cloud SDK gcloud and Kubernetes kubectl CLI tools, as opposed to automating with CI/CD tools, like Jenkins or Spinnaker. For this demonstration, I suggest a minimally-sized two-node GKE cluster using n1-standard-2 machine-type instances. The latest available release of Kubernetes on GKE at the time of this post was 1.11.5-gke.5 and Istio 1.03 (Istio on GKE still considered beta). Note Kubernetes and Istio are evolving rapidly, thus the configuration flags often change with newer versions. Check the GKE Clusters tab for the latest clusters create
command format (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Create non-prod Kubernetes cluster on GKE | |
# Constants – CHANGE ME! | |
readonly NAMESPACE='dev' | |
readonly PROJECT='gke-confluent-atlas' | |
readonly CLUSTER='storefront-api' | |
readonly REGION='us-central1' | |
readonly ZONE='us-central1-a' | |
# Create GKE cluster (time in foreground) | |
time \ | |
gcloud beta container \ | |
–project $PROJECT clusters create $CLUSTER \ | |
–zone $ZONE \ | |
–username "admin" \ | |
–cluster-version "1.11.5-gke.5" \ | |
–machine-type "n1-standard-2" \ | |
–image-type "COS" \ | |
–disk-type "pd-standard" \ | |
–disk-size "100" \ | |
–scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \ | |
–num-nodes "2" \ | |
–enable-stackdriver-kubernetes \ | |
–enable-ip-alias \ | |
–network "projects/$PROJECT/global/networks/default" \ | |
–subnetwork "projects/$PROJECT/regions/$REGION/subnetworks/default" \ | |
–default-max-pods-per-node "110" \ | |
–addons HorizontalPodAutoscaling,HttpLoadBalancing,Istio \ | |
–istio-config auth=MTLS_PERMISSIVE \ | |
–issue-client-certificate \ | |
–metadata disable-legacy-endpoints=true \ | |
–enable-autoupgrade \ | |
–enable-autorepair | |
# Get cluster creds | |
gcloud container clusters get-credentials $CLUSTER \ | |
–zone $ZONE –project $PROJECT | |
kubectl config current-context | |
# Create dev Namespace | |
kubectl apply -f ./resources/other/namespaces.yaml | |
# Enable Istio automatic sidecar injection in Dev Namespace | |
kubectl label namespace $NAMESPACE istio-injection=enabled |
Executing these commands successfully will build the cluster and the dev
Namespace, into which all the resources will be deployed. The two-node cluster creation process takes about three minutes on average.
We can also observe the new GKE cluster from the GKE Clusters Details tab.
Creating the GKE cluster also creates several other GCP resources, including a TCP load balancer and three external IP addresses. Shown below in the VPC network External IP addresses tab, there is one IP address associated with each of the two GKE cluster’s VM instances, and one IP address associated with the frontend of the load balancer.
While the TCP load balancer’s frontend is associated with the external IP address, the load balancer’s backend is a target pool, containing the two GKE cluster node machine instances.
A forwarding rule associates the load balancer’s frontend IP address with the backend target pool. External requests to the frontend IP address will be routed to the GKE cluster. From there, requests will be routed by Kubernetes and Istio to the individual storefront service Pods, and through the Istio sidecar (Envoy) proxies. There is an Istio sidecar proxy deployed to each Storefront service Pod.
Below, we see the details of the load balancer’s target pool, containing the two GKE cluster’s VMs.
As shown at the start of the post, a simplified view of the GCP/GKE network routing looks as follows. For brevity, firewall rules and routes are not illustrated in the diagram.
Apply Kubernetes Resources
Again, using kubectl, deploy the three services and associated Kubernetes and Istio resources. Note the Istio Gateway and VirtualService(s) are not deployed to the dev
Namespace since their role is to control ingress and route traffic to the dev
Namespace and the services within it (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Deploy Kubernetes/Istio resources | |
# Constants – CHANGE ME! | |
readonly NAMESPACE='dev' | |
readonly PROJECT='gke-confluent-atlas' | |
readonly CLUSTER='storefront-api' | |
readonly REGION='us-central1' | |
readonly ZONE='us-central1-a' | |
kubectl apply -f ./resources/other/istio-gateway.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/other/mongodb-atlas-external-mesh.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/other/confluent-cloud-external-mesh.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/config/confluent-cloud-kafka-configmap.yaml | |
kubectl apply -f ./resources/config/mongodb-atlas-secret.yaml | |
kubectl apply -f ./resources/config/confluent-cloud-kafka-secret.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/services/accounts.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/services/fulfillment.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/services/orders.yaml |
Once these commands complete successfully, on the Workloads tab, we should observe two Pods of each of the three storefront service Kubernetes Deployments deployed to the dev
Namespace, all six Pods with a Status of ‘OK’. A Deployment controller provides declarative updates for Pods and ReplicaSets.
On the Services tab, we should observe the three storefront service’s Kubernetes Services. A Service in Kubernetes is a REST object.
On the Configuration Tab, we should observe the Kubernetes ConfigMap and two Secrets also deployed to the dev Environment.
Below, we see the confluent-cloud-kafka ConfigMap resource with its data map of Confluent Cloud configuration.
Below, we see the confluent-cloud-kafka Secret with its data map of sensitive Confluent Cloud configuration.
Test the Storefront API
If you recall from part two of the previous post, there are a set of seven Storefront API endpoints that can be called to create sample data and test the API. The HTTP GET Requests hit each service, generate test data, populate the three MongoDB databases, and produce and consume Kafka messages across all three topics. Making these requests is the easiest way to confirm the Storefront API is working properly.
- Sample Customer: accounts/customers/sample
- Sample Orders: orders/customers/sample/orders
- Sample Fulfillment Requests: orders/customers/sample/fulfill
- Sample Processed Order Event: fulfillment/fulfillment/sample/process
- Sample Shipped Order Event: fulfillment/fulfillment/sample/ship
- Sample In-Transit Order Event: fulfillment/fulfillment/sample/in-transit
- Sample Received Order Event: fulfillment/fulfillment/sample/receive
Thee are a wide variety of tools to interact with the Storefront API. The project includes a simple Python script, sample_data.py, which will make HTTP GET requests to each of the above endpoints, after confirming their health, and return a success message.
Postman
Postman, my personal favorite, is also an excellent tool to explore the Storefront API resources. I have the above set of the HTTP GET requests saved in a Postman Collection. Using Postman, below, we see the response from an HTTP GET request to the /accounts/customers
endpoint.
Postman also allows us to create integration tests and run Collections of Requests in batches using Postman’s Collection Runner. To test the Storefront API, below, I used Collection Runner to run a single series of integration tests, intended to confirm the API’s functionality, by checking for expected HTTP response codes and expected values in the response payloads. Postman also shows the response times from the Storefront API. Since this platform was not built to meet Production SLAs, measuring response times is less critical in the Development environment.
Google Stackdriver
If you recall, the GKE cluster had the Stackdriver Kubernetes option enabled, which gives us, amongst other observability features, access to all cluster, node, pod, and container logs. To confirm data is flowing to the MongoDB databases and Kafka topics, we can check the logs from any of the containers. Below we see the logs from the two Accounts Pod containers. Observe the AfterSaveListener
handler firing on an onAfterSave
event, which sends a CustomerChangeEvent
payload to the accounts.customer.change
Kafka topic, without error. These entries confirm that both Atlas and Confluent Cloud are reachable by the GKE-based workloads, and appear to be functioning properly.
MongoDB Atlas Collection View
Review the MongoDB Atlas Clusters Collections tab. In this Development environment, the MongoDB databases and collections are created the first time a service tries to connects to them. In Production, the databases would be created and secured in advance of deploying resources. Once the sample data requests are completed successfully, you should now observe the three Storefront API databases, each with collections of documents.
MongoDB Compass
In addition to the Atlas web-based management console, MongoDB Compass is an excellent desktop tool to explore and manage MongoDB databases. Compass is available for Mac, Linux, and Windows. One of the many great features of Compass is the ability to visualize collection schemas and interactively filter documents. Below we see the fulfillment.requests
collection schema.
Confluent Control Center
Confluent Control Center is a downloadable, web browser-based tool for managing and monitoring Apache Kafka, including your Confluent Cloud clusters. Confluent Control Center provides rich functionality for building and monitoring production data pipelines and streaming applications. Confluent offers a free 30-day trial of Confluent Control Center. Since the Control Center is provided at an additional fee, and I found difficult to configure for Confluent Cloud clusters based on Confluent’s documentation, I chose not to cover it in detail, for this post.
Tear Down Cluster
Delete your Confluent Cloud and MongoDB clusters using their web-based management consoles. To delete the GKE cluster and all deployed Kubernetes resources, use the cluster delete
command. Also, double-check that the external IP addresses and load balancer, associated with the cluster, were also deleted as part of the cluster deletion (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Tear down GKE cluster and associated resources | |
# Constants – CHANGE ME! | |
readonly PROJECT='gke-confluent-atlas' | |
readonly CLUSTER='storefront-api' | |
readonly REGION='us-central1' | |
readonly ZONE='us-central1-a' | |
# Delete GKE cluster (time in foreground) | |
time yes | gcloud beta container clusters delete $CLUSTER –zone $ZONE | |
# Confirm network resources are also deleted | |
gcloud compute forwarding-rules list | |
gcloud compute target-pools list | |
gcloud compute firewall-rules list | |
# In case target-pool associated with Cluster is not deleted | |
yes | gcloud compute target-pools delete \ | |
$(gcloud compute target-pools list \ | |
–filter="region:($REGION)" –project $PROJECT \ | |
| awk 'NR==2 {print $1}') |
Conclusion
In this post, we have seen how easy it is to integrate Cloud-based DBaaS and MaaS products with the managed Kubernetes services from GCP, AWS, and Azure. As this post demonstrated, leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications.
In future posts, we will revisit this Storefront API example, further demonstrating how to enable HTTPS (Securing Your Istio Ingress Gateway with HTTPS) and end-user authentication (Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0)
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.