Posts Tagged CDC

Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS

Import data from an Amazon RDS database into an Amazon S3-based data lake using Amazon EKS, Amazon MSK, and Apache Kafka Connect

Introduction

A data lake, according to AWS, is a centralized repository that allows you to store all your structured and unstructured data at any scale. Data is collected from multiple sources and moved into the data lake. Once in the data lake, data is organized, cataloged, transformed, enriched, and converted to common file formats, optimized for analytics and machine learning.

One of an organization’s first challenges when building a data lake is how to continually import data from different data sources, such as relational and non-relational database engines, enterprise ERP, SCM, CRM, and SIEM software, flat-files, messaging platforms, IoT devices, and logging and metrics collection systems. Each data source will have its own unique method of connectivity, security, data storage format, and data export capabilities. There are many closed- and open-source tools available to help extract data from different data sources.

A popular open-source tool is Kafka Connect, part of the Apache Kafka ecosystem. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. Kafka Connect makes it simple to quickly define connectors that move large collections of data into and out of Kafka.

In the following post, we will learn how to use Kafka Connect to export data from our data source, an Amazon RDS for PostgreSQL relational database, into Kafka. We will then export that data from Kafka into our data sink — a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 will be converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect.

Best of all, to maintain data freshness of the data lake, as data is added or updated in PostgreSQL, Kafka Connect will automatically detect those changes and stream those changes into the data lake. This process is commonly referred to as Change Data Capture (CDC).

High-level architecture for this post’s demonstration

Change Data Capture

According to Gunnar Morling, Principal Software Engineer at Red Hat who works on the Debezium and Hibernate projects and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

Joker 2021: Change data capture pipelines with Debezium and Kafka Streams (image: YouTube)

You can find another good explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.

Query-based vs. Log-based CDC

To effectively demonstrate the difference between query-based and log-based CDC, examine the results of a SQL UPDATE statement, captured with both methods.

UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;

Here is how the change is represented as a JSON message payload using the query-based CDC method described in this post.

{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}

Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.

{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}

In an upcoming post, we will explore Debezium along with Apache Arvo and a schema registry to build a log-based CDC solution using PostgreSQL’s write-ahead log (WAL). In this post, we will examine query-based CDC using the ‘update timestamp’ technique.

Kafka Connect Connectors

In this post, we will use source and sink connectors from Confluent. Confluent is the undisputed leader in providing enterprise-grade managed Kafka through their Confluent Cloud and Confluent Platform products. Confluent offers dozens of source and sink connectors that cover the most popular data sources and sinks. Connectors used in this post will include:

  • Confluent’s Kafka Connect JDBC Source connector imports data from any relational database with a JDBC driver into an Apache Kafka topic. The Kafka Connect JDBC Sink connector exports data from Kafka topics to any relational database with a JDBC driver.
  • Confluent’s Kafka Connect Amazon S3 Sink connector exports data from Apache Kafka topics to S3 objects in either Avro, Parquet, JSON, or Raw Bytes.

Prerequisites

This post will focus on data movement with Kafka Connect, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:

  1. Amazon RDS for PostgreSQL instance (data source);
  2. Amazon S3 bucket (data sink);
  3. Amazon MSK cluster;
  4. Amazon EKS cluster;
  5. Connectivity between the Amazon RDS instance and Amazon MSK cluster;
  6. Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
  7. Ensure the Amazon MSK Configuration has auto.create.topics.enable=true. This setting is false by default;
  8. IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);

As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.

Source Code

All source code for this post, including the Kafka Connect configuration files and the Helm chart, is open-sourced and located on GitHub.

Authentication and Authorization

Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK, Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.

Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (IRSA) allows EKS to access MSK and S3 using IAM (see more details below).

Sample PostgreSQL Database

There are many sample PostgreSQL databases we could use to explore Kafka Connect. One of my favorite, albeit a bit dated, is PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytics costs.

Pagila database schema diagram

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.

Last Updated Trigger

Each table in the Pagila database has a last_update field. A convenient way to detect changes in the Pagila database, and ensure those changes make it from RDS to S3, is to have Kafka Connect use the last_update field. This is a common technique to determine if and when changes were made to data using query-based CDC.

As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.

CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();

Kubernetes-based Kafka Connect

There are several options for deploying and managing Kafka Connect and other required Kafka management tools to Kubernetes on Amazon EKS. Popular solutions include Strimzi and Confluent for Kubernetes (CFK) or building your own Docker Image using the official Apache Kafka binaries. For this post, I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries. I then installed Confluent’s connectors and their dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf OSS container, building your own image can really teach you how Kafka and Kafka Connect work, in my opinion.

If you chose to use the same Kafka Connect Image used in this post, a Helm Chart is included in the post’s GitHub repository. The Helm chart will deploy a single Kubernetes pod to the kafka Namespace on Amazon EKS.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.0.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent

Before deploying the chart, update the value.yaml file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on MSK and to S3 from Kafka Connect running on EKS.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kafka-cluster:*",
"Resource": [
"arn:aws:kafka:us-east-1:111222333444:cluster/*/*",
"arn:aws:kafka:us-east-1:111222333444:group/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:transactional-id/*/*/*",
"arn:aws:kafka:us-east-1:111222333444:topic/*/*/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": "arn:aws:s3:us-east-1:111222333444:<your-bucket-name>/*"
}
]
}

Once the Service Account variable is updated, use the following command to deploy the Helm chart:

helm install kafka-connect-msk ./kafka-connect-msk \
--namespace $NAMESPACE --create-namespace

To get a shell to the running Kafka Connect container, use the following kubectl exec command:

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash
Interacting with Kafka Connect container running on EKS

Configure Bootstrap Brokers

Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone and distributed modes. Since we will use Kafka Connect’s distributed mode, modify the config/connect-distributed.properties file. A complete sample of the configuration file I used in this post is shown below.

Kafka Connect will run within the pod’s container, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:

# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')

Update the config/connect-distributed.properties file.

# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

For convenience when executing Kafka commands, set the BBROKERS environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:

export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"

Confirm Access to Amazon MSK from Kafka Connect

To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties

You can also try listing the existing Kafka consumer groups:

bin/kafka-consumer-groups.sh --list \
  --bootstrap-server $BBROKERS \
  --command-config config/client-iam.properties

If either of these fails, you will likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.

Kafka Connect

I recommend starting Kafka Connect as a background process using either method shown below.

bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
# alternately use nohup
nohup bin/connect-distributed.sh \
config/connect-distributed.properties &

To confirm Kafka Connect started properly, immediately tail the connect.log file. The log will capture any startup errors for troubleshooting.

tail -f logs/connect.log
Kafka Connect log showing Kafka Connect starting as a background process

You can also examine the background process with the ps command to confirm Kafka Connect is running. Note the process with PID 4915, below. Use the kill command along with the PID to stop Kafka Connect if necessary.

Kafka Connect running as a background process

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, the first time it starts up, as defined in the config/connect-distributed.properties file: connect-configs, connect-offsets, and connect-status. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-

Kafka Connect Connectors

This post demonstrates three progressively more complex Kafka Connect source and sink connectors. Each will demonstrate different connector capabilities to import/export and transform data between Amazon RDS for PostgreSQL and Amazon S3.

Connector Source #1

Create a new file (or modify the existing file if using my Kafka Connect container) named config/jdbc_source_connector_postgresql_00.json. Modify lines 3–5, as shown below, to reflect your RDS instance’s JDBC connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.public.",
"poll.interval.ms": 5000,
"mode": "timestamp",
"catalog.pattern": "public",
"table.whitelist": "address, city, country",
"timestamp.column.name": "last_update"
}

This first Kafka Connect source connector uses Confluent’s Kafka Connect JDBC Source connector (io.confluent.connect.jdbc.JdbcSourceConnector) to export data from RDS with a JDBC driver and import that data into a series of Kafka topics. We will be exporting data from three tables in Pagila’s public schema: address, city, and country. We will write that data to a series of topics, arbitrarily prefixed with database name and schema, pagila.public.. The source connector will create the three new topics automatically: pagila.public.address , pagila.public.city , and pagila.public.country.

Note the connector’s mode property value is set to timestamp, and the last_update field is referenced in the timestamp.column.name property. Recall we added the database function and triggers to these three tables earlier in the post, which will update the last_update field whenever a record is created or updated in the Pagila database. In addition to an initial export of the entire table, the source connector will poll the database every 5 seconds (poll.interval.ms property), looking for changes that are newer than the most recently exported last_modified date. This is accomplished by the source connector, using a parameterized query, such as:

SELECT *
FROM "public"."address"
WHERE "public"."address"."last_update" > ?
AND "public"."address"."last_update" < ?
ORDER BY "public"."address"."last_update" ASC

Connector Sink #1

Next, create and configure the first Kafka Connect sink connector. Create a new file or modify config/s3_sink_connector_00.json. Modify line 7, as shown below to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "pagila.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "your-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}

This first Kafka Connect sink connector uses Confluent’s Kafka Connect Amazon S3 Sink connector (io.confluent.connect.s3.S3SinkConnector) to export data from Kafka topics to Amazon S3 objects in JSON format.

Deploy Connectors #1

Deploy the source and sink connectors using the Kafka Connect REST Interface. Many tutorials demonstrate a POST method against the /connectors endpoint. However, this then requires a DELETE and an additional POST to update the connector. Using a PUT against the /config endpoint, you can update the connector without first issuing a DELETE.

curl -s -d @"config/jdbc_source_connector_postgresql_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/config | jq
curl -s -d @"config/s3_sink_connector_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_00/config | jq

You can confirm the source and sink connectors are deployed and running using the following commands:

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_00/status | jq
Kafka Connect source connector running successfully

Errors preventing the connector from starting correctly will be displayed using the /status endpoint, as shown in the example below. In this case, the Kubernetes Service Account associated with the pod lacked the proper IAM permissions to the Amazon S3 target bucket.

Kafka Connect sink connector failed to run with errors

Confirming Success of Connectors #1

The entire contents of the three tables will be exported from RDS to Kafka by the source connector, then exported from Kafka to S3 by the sink connector. To confirm the source connector worked, verify the existence of three new Kafka topics that should have been created: pagila.public.address, pagila.public.city, and pagila.public.country.

bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep pagila.public.

To confirm the sink connector worked, verify the new S3 objects have been created in the data lake’s S3 bucket. If you use the AWS CLI v2’s s3 API, we can view the contents of our target S3 bucket:

aws s3api list-objects \
--bucket your-s3-bucket \
--query 'Contents[].{Key: Key}' \
--output text

You should see approximately 15 new S3 objects (JSON files) in the S3 bucket, whose keys are organized by their topic names. The sink connector flushes new data to S3 every 100 records, or 60 seconds.

topics/pagila.public.address/partition=0/pagila.public.address+0+0000000000.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000200.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000300.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000400.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000500.json
topics/pagila.public.address/partition=0/pagila.public.address+0+0000000600.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000000.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000100.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000200.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000300.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000400.json
topics/pagila.public.city/partition=0/pagila.public.city+0+0000000500.json
topics/pagila.public.country/partition=0/pagila.public.country+0+0000000000.json
topics/pagila.public.country/partition=0/pagila.public.country+0+0000000100.json

You could also use the AWS Management Console to view the S3 bucket’s contents.

Amazon S3 bucket showing results of Kafka Connect S3 sink connector, organized by topic names

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

For example, the address table’s data will look similar to the following using the ‘Query with S3 Select’ feature via the console or API:

{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000
}

Congratulations, you have successfully imported data from a relational database into your data lake using Kafka Connect!

Connector Source #2

Create a new file or modify config/jdbc_source_connector_postgresql_01.json. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.public.alt.",
"poll.interval.ms": 5000,
"mode": "timestamp",
"timestamp.column.name": "last_update",
"catalog.pattern": "public",
"table.whitelist": "address",
"numeric.mapping": "best_fit",
"transforms": "createKey,extractInt,InsertTopic,InsertSourceDetails",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "address_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "address_id",
"validate.non.null": "false",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertSourceDetails.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field": "message_source",
"transforms.InsertSourceDetails.static.value": "pagila"
}

This second Kafka Connect source connector also uses Confluent’s Kafka Connect JDBC Source connector to export data from the just address table with a JDBC driver and import that data into a new Kafka topic, pagila.public.alt.address. The difference with this source connector is transforms, known as Single Message Transformations (SMTs). SMTs are applied to messages as they flow through Connect from RDS to Kafka.

In this connector, there are four transforms, which perform the following common functions:

  1. Extract address_id integer field as the Kafka message key, as detailed in this blog post by Confluence (see ‘Setting the Kafka message key’).
  2. Append Kafka topic name into message as a new static field;
  3. Append database name into message as a new static field;

Connector Sink #2

Create a new file or modify config/s3_sink_connector_01.json. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics": "pagila.public.alt.address",
"s3.region": "us-east-1",
"s3.bucket.name": "you-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}

This second sink connector is nearly identical to the first sink connector, except it only exports data from a single Kafka topic, pagila.public.alt.address, into S3.

Deploy Connectors #2

Deploy the second set of source and sink connectors using the Kafka Connect REST Interface, exactly like the first pair.

curl -s -d @"config/jdbc_source_connector_postgresql_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/config | jq
curl -s -d @"config/s3_sink_connector_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_01/config | jq

Confirming Success of Connectors #2

Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first set, for a total of four connectors.

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_01/status | jq
Kafka Connect source and sink connectors running successfully

To view the results of the first transform, extracting the address_id integer field as the Kafka message key, we can use a Kafka command-line consumer:

bin/kafka-console-consumer.sh \
--topic pagila.public.alt.address \
--offset 102 --partition 0 --max-messages 5 \
--property print.key=true --property print.value=true \
--property print.offset=true --property print.partition=true \
--property print.headers=false --property print.timestamp=false \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties

In the output below, note the beginning of each message, which displays the Kafka message key, identical to the address_id. For example, {"type":"int32","optional":false},"payload":100}.

Output showing messages in the Kafka pagila.public.alt.address topic

Examing the Amazon S3 bucket using the AWS Management Console or the CLI, you should note the fourth set of S3 objects within the /topics/pagila.public.alt.address/ object key prefix.

Amazon S3 bucket showing JSON-format files containing address data

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.alt.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

In the sample data below, note the two new fields that have been appended into each record, a result of the Kafka Connector transforms:

{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}

Congratulations, you have successfully imported more data from a relational database into your data lake, including performing a simple series of transforms using Kafka Connect!

Connector Source #3

Create or modify config/jdbc_source_connector_postgresql_02.json. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/pagila",
"connection.user": "your-username",
"connection.password": "your-password",
"topic.prefix": "pagila.query",
"poll.interval.ms": 5000,
"mode": "timestamp",
"timestamp.column.name": "last_update",
"query": "SELECT * FROM (SELECT a.address_id, a.address, a.address2, city.city, a.district, a.postal_code, country.country, a.phone, a.last_update FROM address AS a INNER JOIN city ON a.city_id = city.city_id INNER JOIN country ON country.country_id = city.country_id ORDER BY address_id) AS subquery",
"incrementing.column.name": "address_id",
"transforms": "createKey,extractInt,InsertTopic,InsertSourceDetails",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "address_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "address_id",
"validate.non.null": "false",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertSourceDetails.type": "org.apache.kafka.connect.transforms.InsertField$Value",
}

Unlike the first two source connectors that export data from tables, this connector uses a SELECT query to export data from the Pagila database’s address , city, and country tables and import the results of that SQL query data into a new Kafka topic, pagila.public.alt.address. The SQL query in the source connector’s configuration is as follows:

SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses

The final parameterized query, executed by the source connector, which allows it to detect changes based on the last_update field is as follows:

SELECT *
FROM (SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
WHERE "last_update" > ?
AND "last_update" < ?
ORDER BY "last_update" ASC

Connector Sink #3

Create or modify config/s3_sink_connector_02.json. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics": "pagila.query",
"s3.region": "us-east-1",
"s3.bucket.name": "your-s3-bucket",
"s3.part.size": 5242880,
"flush.size": 100,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "gzip",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"partition.field.name": "country",
"schema.compatibility": "NONE",
"transforms": "RenameField, insertStaticField1,insertStaticField2,insertStaticField3",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "district:state_province",
"transforms.insertStaticField1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "message_source",
"transforms.insertStaticField1.static.value": "pagila",
"transforms.insertStaticField2.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField2.static.field": "message_source_engine",
"transforms.insertStaticField2.static.value": "postgresql",
"transforms.insertStaticField3.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField3.static.field": "environment",
"transforms.insertStaticField3.static.value": "development"
}

This sink connector is significantly different than the previous two sink connectors. In addition to leveraging SMTs in the corresponding source connector, we are also using them in this sink connector. The sink connect appends three arbitrary static fields to each record as it is written to Amazon S3 — message_source, message_source_engine, and environment using the InsertField transform. The sink connector also renames the district field to state_province using the ReplaceField transform.

The first two sink connectors wrote uncompressed JSON-format files to Amazon S3. This third sink connector optimizes the data imported into S3 for downstream data analytics. The sink connector writes GZIP-compressed Apache Parquet files to Amazon S3. In addition, the compressed Parquet files are partitioned by the country field. Using a columnar file format, compression, and partitioning, queries against the data should be faster and more efficient.

Deploy Connectors #3

Deploy the final source and sink connectors using the Kafka Connect REST Interface, exactly like the first two pairs.

curl -s -d @"config/jdbc_source_connector_postgresql_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/config | jq
curl -s -d @"config/s3_sink_connector_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_02/config | jq

Confirming Success of Connectors #3

Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first two sets, for a total of six connectors.

curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_02/status | jq
Kafka Connect source and sink connectors running successfully

Reviewing the messages within the newpagila.query topic, note the message_topic field has been appended to the message by the source connector but not message_source, message_source_engine, and environment fields. The sink connector appends these fields as it writes the messages to S3. Also, note the district field has yet to be renamed by the sink connector to state_province.

Output showing messages in the Kafka pagila.query topic

Examing the Amazon S3 bucket, again, you should note the fifth set of S3 objects within the /topics/pagila.query/ object key prefix. The Parquet-format files within are partitioned by country.

Amazon S3 bucket showing data partitioned by Country

Within each country partition, there are Parquet files whose records contain addresses within those countries.

Amazon S3 bucket showing GZIP-compressed Apache Parquet-format files

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the s3 API:

export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.query/country=United States/pagila.query+0+0000000003.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json

In the sample data below, note the four new fields that have been appended into each record, a result of the source and sink connector SMTs. Also, note the renamed district field:

{
"address_id": 599,
"address": "1895 Zhezqazghan Drive",
"address2": "",
"city": "Garden Grove",
"state_province": "California",
"postal_code": "36693",
"country": "United States",
"phone": "137809746111",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 6,
"address": "1121 Loja Avenue",
"address2": "",
"city": "San Bernardino",
"state_province": "California",
"postal_code": "17886",
"country": "United States",
"phone": "838635286649",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 18,
"address": "770 Bydgoszcz Avenue",
"address2": "",
"city": "Citrus Heights",
"state_province": "California",
"postal_code": "16266",
"country": "United States",
"phone": "517338314235",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}

Record Updates and Query-based CDC

What happens when we change data within the tables that Kafka Connect is polling every 5 seconds? To answer this question, let’s make a few DML changes:

-- update address field
UPDATE public.address
SET address = '123 CDC Test Lane'
WHERE address_id = 100;
-- update address2 field
UPDATE public.address
SET address2 = 'Apartment #2201'
WHERE address_id = 101;
-- second update to same record
UPDATE public.address
SET address2 = 'Apartment #2202'
WHERE address_id = 101;

-- insert new country
INSERT INTO public.country (country)
values ('Wakanda');
-- should be 110
SELECT country_id FROM country WHERE country='Wakanda';
-- insert new city
INSERT INTO public.city (city, country_id)
VALUES ('Birnin Zana', 110);
-- should be 601
SELECT city_id FROM public.city WHERE country_id=110;
-- update city_id to new city_id
UPDATE public.address
SET phone = city_id = 601
WHERE address_id = 102;
-- second update to same record
UPDATE public.address
SET district = 'Lake Turkana'
WHERE address_id = 102;
-- delete an address record
UPDATE public.customer
SET address_id = 200
WHERE customer_id IN (
SELECT customer_id FROM customer WHERE address_id = 104);
DELETE
FROM public.address
WHERE address_id = 104;

To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The three Kafka Connect source connectors detect changes, which are exported from PostgreSQL to Kafka. The three sink connectors then write these changes to new JSON and Parquet files to the target S3 bucket.

Kafka Connect log showing changes to Pagila database being exported/imported

Viewing Data in the Data Lake

A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

AWS Glue Data Catalog showing five new tables, the result of the AWS Glue Crawler

When writing Parquet into partitions, one shortcoming of the Kafka Connect S3 sink connector is duplicate column names in AWS Glue. As a result, any columns used as partitions are duplicated in the Glue Data Catalog’s database table schema. The issue will result in an error similar to HIVE_INVALID_METADATA: Hive metadata for table pagila_query is invalid: Table descriptor contains duplicate columns when performing queries. To remedy this, predefine the table and the table’s schema. Alternately, edit the Glue Data Catalog table’s schema after crawling and remove the duplicate, non-partition column(s). Below, that would mean removing duplicate country column 7.

AWS Glue Data Catalog table schema showing duplicate column

Performing a typical SQL SELECT query in Athena will return all of the original records as well as the changes we made earlier as duplicate records (same address_id primary key).

Amazon Athena showing the SQL query and the result set
SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM "pagila_kafka_connect"."pagila_query"
WHERE address_id BETWEEN 100 AND 105
ORDER BY address_id;

Note the original records for address_id 100–103 as well as each change we made earlier. The last_update field reflects the date and time the record was created or updated. Also, note the record with address_id 104 in the query results. This is the record we deleted from the Pagila database.

address_id address address2 city state_province postal_code country last_update
100 1308 Arecibo Way Augusta-Richmond County Georgia 30695 United States 2017-02-15 09:45:30.000
100 123 CDC Test Lane Augusta-Richmond County Georgia 30695 United States 2021-08-09 14:10:29.126
101 1599 Plock Drive Tete Tete 71986 Mozambique 2017-02-15 09:45:30.000
101 1599 Plock Drive Apartment #2201 Tete Tete 71986 Mozambique 2021-08-09 14:10:29.467
101 1599 Plock Drive Apartment #2202 Tete Tete 71986 Mozambique 2021-08-09 14:19:03.761
102 669 Firozabad Loop al-Ayn Abu Dhabi 92265 United Arab Emirates 2017-02-15 09:45:30.000
102 669 Firozabad Loop Birnin Zana Abu Dhabi 92265 Wakanda 2021-08-09 14:10:29.789
102 669 Firozabad Loop Birnin Zana Lake Turkana 92265 Wakanda 2021-08-09 15:56:53.323
103 588 Vila Velha Manor Kimchon Kyongsangbuk 51540 South Korea 2017-02-15 09:45:30.000
104 1913 Kamakura Place Jelets Lipetsk 97287 Russian Federation 2017-02-15 09:45:30.000
105 733 Mandaluyong Place Abha Asir 77459 Saudi Arabia 2017-02-15 09:45:30.000
view raw pagila_query_01.csv hosted with ❤ by GitHub

To view only the most current data, we can use Athena’s ROW_NUMBER() function:

SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1
AND address_id BETWEEN 100 AND 105
ORDER BY address_id;

Now, we only see the latest records. Unfortunately, the record we deleted with address_id 104 is still present in the query results.

address_id address address2 city state_province postal_code country last_update
100 123 CDC Test Lane Augusta-Richmond County Georgia 30695 United States 2021-08-09 14:10:29.126
101 1599 Plock Drive Apartment #2202 Tete Tete 71986 Mozambique 2021-08-09 14:19:03.761
102 669 Firozabad Loop Birnin Zana Lake Turkana 92265 Wakanda 2021-08-09 15:56:53.323
103 588 Vila Velha Manor Kimchon Kyongsangbuk 51540 South Korea 2017-02-15 09:45:30.000
104 1913 Kamakura Place Jelets Lipetsk 97287 Russian Federation 2017-02-15 09:45:30.000
105 733 Mandaluyong Place Abha Asir 77459 Saudi Arabia 2017-02-15 09:45:30.000
view raw pagila_query_02.csv hosted with ❤ by GitHub

Using log-based CDC with Debezium, as opposed to query-based CDC, we would have received a record in S3 that indicated the delete. The null value message, shown below, is referred to as a tombstone message in Kafka. Note the ‘before’ syntax with the delete record as opposed to the ‘after’ syntax we observed earlier with the update record.

{
"before": {
"address": "",
"address2": null,
"phone": "",
"district": "",
"last_update": "1970-01-01T00:00:00Z",
"address_id": 104,
"postal_code": null,
"city_id": 0
},

"source": {
"schema": "public",
"sequence": "[\"1101256482032\",\"1101256482032\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1101256483936,
"name": "pagila",
"txId": 17137,
"version": "1.6.1.Final",
"ts_ms": 1628864251512,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "d",
"ts_ms": 1628864251671
}

An inefficient solution to duplicates and deletes with query-based CDC would be to bulk ingest the entire query result set from the Pagila database each time instead of only the changes based on the last_update field. Performing an unbounded query repeatedly on a huge dataset would negatively impact database performance. Notwithstanding, you would still end up with duplicates in the data lake unless you first purged the data in S3 before re-importing the new query results.

Data Movement

Using Amazon Athena, we can easily write the results of our ROW_NUMBER() query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT (CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

High-level architecture for this post’s demonstration

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by country, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.

CREATE TABLE pagila_kafka_connect.pagila_query_processed
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['country'],
external_location='s3://your-s3-bucket/processed/pagila_query'
) AS
SELECT address_id, last_update, address, address2, city,
state_province, postal_code, country
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_update DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1 AND address_id BETWEEN 0 and 100
ORDER BY address_id;

Examing the Amazon S3 bucket, on last time, you should new set of S3 objects within the /processed/pagila_query/ key path. The Parquet-format files, partitioned by country, are the result of the CTAS query.

Amazon S3 bucket showing SNAPPY-compressed Parquet-format files containing CTAS query results

We should now see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS query. We can perform additional queries on the processed data.

Amazon Athena showing query results from the processed data table in AWS Glue Data Catalog

ACID Transactions with a Data Lake

To fully take advantage of CDC and maximize the freshness of data in the data lake, we would also need to adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level updates and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.

Conclusion

This post explored how CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon EKS, Amazon MSK, and Apache Kafka Connect. We learned about query-based CDC for capturing ongoing changes to the source data. In a subsequent post, we will explore log-based CDC using Debezium and see how data lake file formats like Apache Avro, Apache Hudi, Apache Iceberg, and Delta Lake can help us manage the data in our data lake.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment