Posts Tagged CDC
The Art of Building Open Data Lakes with Apache Hudi, Kafka, Hive, and Debezium
Posted by Gary A. Stafford in Analytics, AWS, Big Data, Cloud, Python, Software Development, Technology Consulting on December 31, 2021
Build near real-time, open-source data lakes on AWS using a combination of Apache Kafka, Hudi, Spark, Hive, and Debezium
Introduction
In the following post, we will learn how to build a data lake on AWS using a combination of open-source software (OSS), including Red Hat’s Debezium, Apache Kafka, Kafka Connect, Apache Hive, Apache Spark, Apache Hudi, and Hudi DeltaStreamer. We will use fully-managed AWS services to host the datasource, the data lake, and the open-source tools. These services include Amazon RDS, MKS, EKS, EMR, and S3.

This post is an in-depth follow-up to the video demonstration, Building Open Data Lakes on AWS with Debezium and Apache Hudi.
Workflow
As shown in the architectural diagram above, these are the high-level steps in the demonstration’s workflow:
- Changes (inserts, updates, and deletes) are made to the datasource, a PostgreSQL database running on Amazon RDS;
- Kafka Connect Source Connector, utilizing Debezium and running on Amazon EKS (Kubernetes), continuously reads data from PostgreSQL WAL using Debezium;
- Source Connector creates and stores message schemas in Apicurio Registry, also running on Amazon EKS, in Avro format;
- Source Connector transforms and writes data in Apache Avro format to Apache Kafka, running on Amazon MSK;
- Kafka Connect Sink Connector, using Confluent S3 Sink Connector, reads messages from Kafka topics using schemas from Apicurio Registry;
- Sink Connector writes data to Amazon S3 in Apache Avro format;
- Apache Spark, using Hudi DeltaStreamer and running on Amazon EMR, reads message schemas from Apicurio Registry;
- DeltaStreamer reads raw Avro-format data from Amazon S3;
- DeltaStreamer writes data to Amazon S3 as both Copy on Write (CoW) and Merge on Read (MoR) table types;
- DeltaStreamer syncs Hudi tables and partitions to Apache Hive running on Amazon EMR;
- Queries are executed against Apache Hive Metastore or directly against Hudi tables using Apache Spark, with data returned from Hudi tables in Amazon S3;
The workflow described above actually contains two independent processes running simultaneously. Steps 2–6 represent the first process, the change data capture (CDC) process. Kafka Connect is used to continuously move changes from the database to Amazon S3. Steps 7–10 represent the second process, the data lake ingestion process. Hudi’s DeltaStreamer reads raw CDC data from Amazon S3 and writes the data back to another location in S3 (the data lake) in Apache Hudi table format. When combined, these processes can give us near real-time, incremental data ingestion of changes from the datasource to the Hudi-managed data lake.
Alternatives
This demonstration’s workflow is only one of many possible workflows to achieve similar outcomes. Alternatives include:
- Replace self-managed Kafka Connect with the fully-managed Amazon MSK Connect service.
- Exchange Amazon EMR for AWS Glue Jobs or AWS Glue Studio and the custom AWS Glue Connector for Apache Hudi to ingest data into Hudi tables.
- Replace Apache Hive with AWS Glue Data Catalog, a fully-managed Hive-compatible metastore.
- Replace Apicurio Registry with Confluent Schema Registry or AWS Glue Schema Registry.
- Exchange the Confluent S3 Sink Connector for the Kafka Connect Sink for Hudi, which could greatly simplify the workflow.
- Substitute
HoodieMultiTableDeltaStreamer
for theHoodieDeltaStreamer
utility to quickly ingest multiple tables into Hudi. - Replace Hudi’s AvroDFSSource for the AvroKafkaSource to read directly from Kafka versus Amazon S3, or Hudi’s JdbcSource to read directly from the PostgreSQL database. Hudi has several datasource readers available. Be cognizant of authentication/authorization compatibility/limitations.
- Choose either or both Hudi’s Copy on Write (CoW) and Merge on Read (MoR) table types depending on your workload requirements.
Source Code
All source code for this post and the previous posts in this series are open-sourced and located on GitHub. The specific resources used in this post are found in the debezium_hudi_demo directory of the GitHub repository. There are also two copies of the Museum of Modern Art (MoMA) Collection dataset from Kaggle, specifically prepared for this post, located in the moma_data directory. One copy is a nearly full dataset, and the other is a smaller, cost-effective dev/test version.
Kafka Connect
In this demonstration, Kafka Connect runs on Kubernetes, hosted on the fully-managed Amazon Elastic Kubernetes Service (Amazon EKS). Kafka Connect runs the Source and Sink Connectors.
Source Connector
The Kafka Connect Source Connector, source_connector_moma_postgres_kafka.json
, used in steps 2–4 of the workflow, utilizes Debezium to continuously read changes to an Amazon RDS for PostgreSQL database. The PostgreSQL database hosts the MoMA Collection in two tables: artists and artworks.
The Debezium Connector for PostgreSQL reads record-level insert, update, and delete entries from PostgreSQL’s write-ahead log (WAL). According to the PostgreSQL documentation, changes to data files must be written only after log records describing the changes have been flushed to permanent storage, thus the name, write-ahead log. The Source Connector then creates and stores Apache Avro message schemas in Apicurio Registry also running on Amazon EKS.


Finally, the Source Connector transforms and writes Avro format messages to Apache Kafka running on the fully-managed Amazon Managed Streaming for Apache Kafka (Amazon MSK). Assuming Kafka’s topic.creation.enable
property is set to true
, Kafka Connect will create any necessary Kafka topics, one per database table.
Below, we see an example of a Kafka message representing an insert of a record with the artist_id
1 in the MoMA Collection database’s artists
table. The record was read from the PostgreSQL WAL, transformed, and written to a corresponding Kafka topic, using the Debezium Connector for PostgreSQL. The first version represents the raw data before being transformed by Debezium. Note that the type of operation (_op
) indicates a read (r
). Possible values include c
for create (or insert), u
for update, d
for delete, and r
for read (applies to snapshots).
The next version represents the same record after being transformed by Debezium using the event flattening single message transformation (unwrap SMT). The final message structure represents the schema stored in Apicurio Registry. The message structure is identical to the structure of the data written to Amazon S3 by the Sink Connector.
Sink Connector
The Kafka Connect Sink Connector, sink_connector_moma_kafka_s3.json
, used in steps 5–6 of the workflow, implements the Confluent S3 Sink Connector. The Sink Connector reads the Avro-format messages from Kafka using the schemas stored in Apicurio Registry. It then writes the data to Amazon S3, also in Apache Avro format, based on the same schemas.
Running Kafka Connect
We first start Kafka Connect in the background to be the CDC process.
Then, deploy the Kafka Connect Source and Sink Connectors using Kafka Connect’s RESTful API. Using the API, we can also confirm the status of the Connectors.
To confirm the two Kafka topics, moma.public.artists
and moma.public.artworks
, were created and contain Avro messages, we can use Kafka’s command-line tools.
In the short video-only clip below, we see the process of deploying the Kafka Connect Source and Sink Connectors and confirming they are working as expected.
The Sink Connector writes data to Amazon S3 in batches of 10k messages or every 60 seconds (one-minute intervals). These settings are configurable and highly dependent on your requirements, including message volume, message velocity, real-time analytics requirements, and available compute resources.

Since we will not be querying this raw Avro-format CDC data in Amazon S3 directly, there is no need to catalog this data in Apache Hive or AWS Glue Data Catalog, a fully-managed Hive-compatible metastore.
Apache Hudi
According to the overview, Apache Hudi (pronounced “hoodie”) is the next-generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality to data lakes. Hudi provides tables, transactions, efficient upserts and deletes, advanced indexes, streaming ingestion services, data clustering, compaction optimizations, and concurrency, all while keeping data in open source file formats.
Without Hudi or an equivalent open-source data lake table format such as Apache Iceberg or Databrick’s Delta Lake, most data lakes are just of bunch of unmanaged flat files. Amazon S3 cannot natively maintain the latest view of the data, to the surprise of many who are more familiar with OLTP-style databases or OLAP-style data warehouses.
DeltaStreamer
DeltaStreamer, aka the HoodieDeltaStreamer
utility (part of the hudi-utilities-bundle
), used in steps 7–10 of the workflow, provides the way to perform streaming ingestion of data from different sources such as Distributed File System (DFS) and Apache Kafka.
Optionally, HoodieMultiTableDeltaStreamer
, a wrapper on top of HoodieDeltaStreamer
, ingests multiple tables in a single Spark job, into Hudi datasets. Currently, it only supports sequential processing of tables to be ingested and Copy on Write table type.
We are using HoodieDeltaStreamer
to write to both Merge on Read (MoR) and Copy on Write (CoW) table types for demonstration purposes only. The MoR table type is a superset of the CoW table type, which stores data using a combination of columnar-based (e.g., Apache Parquet) plus row-based (e.g., Apache Avro) file formats. Updates are logged to delta files and later compacted to produce new versions of columnar files synchronously or asynchronously. Again, the choice of table types depends on your requirements.

Amazon EMR
For this demonstration, I’ve used the recently released Amazon EMR version 6.5.0 configured with Apache Spark 3.1.2 and Apache Hive 3.1.2. EMR 6.5.0 runs Scala version 2.12.10, Python 3.7.10, and OpenJDK Corretto-8.312. I have included the AWS CloudFormation template and parameters file used to create the EMR cluster, on GitHub.

When choosing Apache Spark, Apache Hive, or Presto on EMR 6.5.0, Apache Hudi release 0.9.0 is automatically installed.

DeltaStreamer Configuration
Below, we see the DeltaStreamer properties file, deltastreamer_artists_apicurio_mor.properties
. This properties file is referenced by the Spark job that runs DeltaStreamer, shown next. The file contains properties related to the datasource, the data sink, and Apache Hive. The source of the data for DeltaStreamer is the CDC data written to Amazon S3. In this case, the datasource is the objects located in the /topics/moma.public.artworks/partition=0/
S3 object prefix. The data sink is a Hudi MoR table type in Amazon S3. DeltaStreamer will write Parquet data, partitioned by the artist’s nationality, to the /moma_mor/artists/
S3 object prefix. Lastly, DeltaStreamer will sync all tables and table partitions to Apache Hive, including creating the Hive databases and tables if they do not already exist.
Below, we see the equivalent DeltaStreamer properties file for the MoMA artworks, deltastreamer_artworks_apicurio_mor.properties
. There are also comparable DeltaStreamer property files for the Hudi CoW tables on GitHub.
All DeltaStreamer property files reference Apicurio Registry for the location of the Avro schemas. The schemas are used by both the Kafka Avro-format messages and the CDC-created Avro-format files in Amazon S3. Due to DeltaStreamer’s coupling with Confluent Schema Registry, as opposed to other registries, we must use Apicurio Registry’s Confluent Schema Registry API (Version 6) compatibility API endpoints (e.g., /apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest
) when using the org.apache.hudi.utilities.schema.SchemaRegistryProvider
datasource option with DeltaStreamer. According to Apicurio, to provide compatibility with Confluent SerDes (Serializer/Deserializer) and other clients, Apicurio Registry implements the API defined by the Confluent Schema Registry.

Running DeltaStreamer
The properties files are loaded by Spark jobs that call the DeltaStreamer library, using spark-submit
. Below, we see an example Spark job that calls the DeltaStreamer class. DeltaStreamer reads the raw Avro-format CDC data from S3 and writes the data using the Hudi MoR table type into the /moma_mor/artists/
S3 object prefix. In this Spark particular job, we are using the continuous
option. DeltaStreamer runs in continuous mode using this option, running source-fetch, transform, and write in a loop. We are also using the UPSERT
write operation (op
). Operation options include UPSERT
, INSERT
, and BULK_INSERT
. This set of options is ideal for inserting ongoing changes to CDC data into Hudi tables. You can run jobs in the foreground or background on EMR’s Master Node or as EMR Steps from the Amazon EMR console.
Below, we see another example DeltaStreamer Spark job that reads the raw Avro-format CDC data from S3 and writes the data using the MoR table type into the /moma_mor/artworks/
S3 object prefix. This example uses the BULK_INSERT
write operation (op
) and the filter-dupes
option. The filter-dupes
option ensures that should duplicate records from the source are dropped/filtered out before INSERT
or BULK_INSERT
. This set of options is ideal for the initial bulk inserting of existing data into Hudi tables. The job runs one time and completes, unlike the previous example that ran continuously.
Syncing with Hive
The following abridged, video-only clip demonstrates the differences between the Hudi CoW and MoR table types with respect to Apache Hive. In the video, we run the deltastreamer_jobs_bulk_bkgd.sh
script, included on GitHub. This script runs four different Apache Spark jobs, using Hudi DeltaStreamer to bulk-ingest all the artists and artworks CDC data from Amazon S3 into both Hudi CoW and MoR table types. Once the four Spark jobs are complete, the script queries Apache Hive and displays the new Hive databases and database tables created by DeltaStreamer.
In both the video above and terminal screengrab below, note the difference in the tables created within the two Hive databases, the Hudi CoW table type (moma_cow
) and the MoR table type (moma_mor
). The MoR table type creates both a read-optimized table (_ro
) as well as a real-time table (_rt
) for each datasource (e.g., artists_ro
and artists_rt
).

According to documentation, Hudi creates two tables in the Hive metastore for the MoR table type. The first, a table which is a read-optimized view appended with _ro
and the second, a table with the same name appended with _rt
which is a real-time view. According to Hudi, the read-optimized view exposes columnar Parquet while the real-time view exposes columnar Parquet and/or row-based logs; you can query both tables. The CoW table type creates a single table without a suffix for each datasource (e.g., artists
). Below, we see the Hive table structure for the artists_rt
table, created by DeltaStreamer, using SHOW CREATE TABLE moma_mor.artists_rt;
.
Having run the demonstration’s deltastreamer_jobs_bulk_bkgd.sh
script, the resulting object structure in the Hudi-managed section of the Amazon S3 bucket looks as follows.

Below is an example of Hudi files created in the /moma/artists_cow/
S3 object prefix. When using data lake table formats like Hudi, given its specialized directory structure and the high number of objects, interactions with the data should be abstracted through Hudi’s programming interfaces. Generally speaking, you do not interact directly with the objects in a data lake.
Hudi CLI
Optionally, we can inspect the Hudi tables using the Hudi CLI (hudi-cli
). The CLI offers an extensive list of available commands. Using the CLI, we can inspect the Hudi tables and their schemas, and review operational statistics like write amplification (the number of bytes written for 1 byte of incoming data), commits, and compactions.
The following short video-only clip shows the use of the Hudi CLI, running on the Amazon EMR Master Node, to inspect the Hudi tables in S3.
Hudi Data Structure
Recall the sample Kafka message we saw earlier in the post representing an insert of an artist record with the artist_id
1. Below, we see what the same record looks like after being ingested by Hudi DeltaStreamer. Note the five additional fields added by Hudi with the _hoodie_
prefix.
Querying Hudi-managed Data
With the initial data ingestion complete and the CDC and DeltaStreamer processes monitoring for future changes, we can query the resulting data stored in Hudi tables. First, we will make some changes to the PostgreSQL MoMA Collection database to see how Hudi manages the data mutations. We could also make changes directly to the Hudi tables using Hive, Spark, or Presto. However, that would cause our datasource to be out of sync with the Hudi tables, potentially negating the entire CDC process. When developing a data lake, this is a critically important consideration — how changes are introduced to Hudi tables, especially when CDC is involved, and whether data continuity between datasources and the data lake is essential.
For the demonstration, I have made a series of arbitrary updates to a piece of artwork in the MoMA Collection database, ‘Picador (La Pique)’ by Pablo Picasso.
Below, note the last four objects shown in S3. Judging by the file names and dates, we can see that the CDC process, using Kafka Connect, has picked up the four updates I made to the record in the database. The Source Connector first wrote the changes to Kafka. The Sink Connector then read those Kafka messages and wrote the data to Amazon S3 in Avro format, as shown below.

Looking again at S3, we can also observe that DeltaStreamer picked up the new CDC objects in Amazon S3 and wrote them to both the Hudi CoW and MoR tables. Note the file types shown below. Given Hudi’s MoR table type structure, Hudi first logged the changes to row-based delta files and later compacted them to produce a new version of the columnar-format Parquet file.

Querying Results from Apache Hive
There are several ways to query Hudi-managed data in S3. In this demonstration, they include against Apache Hive using the hive client from the command line, against Hive using Spark, and against the Hudi tables also using Spark. We could also install Presto on EMR to query the Hudi data directly or via Hive.
Querying the real-time artwork_rt
table in Hive after we make each database change, we can observe the data in Hudi reflects the updates. Note that the value of the _hoodie_file_name
field for the first three updates is a Hudi delta log file, while the value for the last update is a Parquet file. The Parquet file signifies compaction occurred between the fourth update was made, and the time the Hive query was executed. Lastly, note the type of operation (_op
) indicates an update change (u
) for all records.

Once all fours database updates are complete and compaction has occurred, we should observe identical results from all Hive tables. Below, note the _hoodie_file_name
field for all three tables is a Parquet file. Logically, the Parquet file for the MoR read-optimized and real-time Hive tables is the same.

Had we queried the data previous to compaction, the results would have differed. Below we have three queries. I further updated the artwork record, changing the date
field from 1959 to 1960. The read-optimized MoR table, artworks_ro
, still reflects the original date
value, 1959, before the update and prior to compaction. The real-time table,artworks_rt
, reflects the latest update to the date
field, 1960. Note that the value of the _hoodie_file_name
field for the read-optimized table is a Parquet file, while the value for the real-time table (artworks_rt
), the third and final query, is a delta log file. The delta log allows the real-time table to display the most current state of the data in Hudi.

Below are a few useful Hive commands to query the changes in Hudi.
Deletes with Hudi
In addition to inserts and updates (upserts), Apache Hudi can manage deletes. Hudi supports implementing two types of deletes on data stored in Hudi tables: soft deletes and hard deletes. Given this demonstration’s specific configuration for CDC and DeltaStreamer, we will use soft deletes. Soft deletes retain the record key and nullify the other field’s values. Hard deletes, a stronger form of deletion, physically remove any record trace from the Hudi table.
Below, we see the CDC record for the artist with artist_id
441. The event flattening single message transformation (SMT), used by the Debezium-based Kafka Connect Source Connector, adds the __deleted
field with a value of true
and nullifies all fields except the record’s key, artist_id
, which is required.
Below, we see the same delete record for the artist with artist_id
441 in the Hudi MoR table. All the null fields have been removed.
Below, we see how the deleted record appears in the three Hive CoW and MoR artwork tables. Note the query results from the read-optimized MoR table, artworks_ro
, contains two records — the original record (r
) and the deleted record (d
). The data is partitioned by nationality, and since the record was deleted, the nationality field is changed to null
. In S3, Hudi represents this partition as nationality=default
. The record now exists in two different Parquet files, within two separate partitions, something to be aware of when querying the read-optimized MoR table.

Time Travel
According to the documentation, Hudi has supported time travel queries since version 0.9.0. With time travel, you can query the previous state of your data. Time travel is particularly useful for use cases, including rollbacks, debugging, and audit history.
To demonstrate time travel queries in Hudi, we start by making some additional changes to the source database. For this demonstration, I made a series of five updates and finally a delete to the artist record with artist_id
299 in the PostgreSQL database over a few-hour period.
Once the CDC and DeltaStreamer ingestion processes are complete, we can use Hudi’s time travel query capability to view the state of data in Hudi at different points in time (instants). To do so, we need to provide an as.an.instant
date/time value to Spark (see line 21 below).
Based on the time period in which I made the five updates and the delete, I have chosen six instants during that period where I want to examine the state of the record. Below is an example of the PySpark code from a Jupyter Notebook used to perform the six time travel queries against the Hudi MoR artist’s table.
Below, we see the results of the time travel queries. At each instant, we can observe the mutating state of the data in the Hudi MoR Artist’s table, including the initial bulk insert of the existing snapshot of data (r
) and the delete record (d
). Since the delete made in the PostgreSQL database was recorded as a soft delete in Hudi, as opposed to a hard delete, we are still able to retrieve the record at any instant.
In addition to time travel queries, Hudi also offers incremental queries and point in time queries.
Conclusion
Although this post only scratches the surface of the capabilities of Debezium and Hudi, you can see the power of CDC using Kafka Connect and Debezium, combined with Hudi, to build and manage open data lakes on AWS.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Video Demonstration: Building Open Data Lakes on AWS with Debezium and Apache Hudi
Posted by Gary A. Stafford in Software Development on October 31, 2021
Build an open-source data lake on AWS using a combination of Debezium, Apache Kafka, Apache Hudi, Apache Spark, and Apache Hive
Introduction
In the following recorded demonstration, we will build a simple open data lake on AWS using a combination of open-source software (OSS), including Red Hat’s Debezium, Apache Kafka, and Kafka Connect for change data capture (CDC), and Apache Hive, Apache Spark, Apache Hudi, and Hudi’s DeltaStreamer for managing our data lake. We will use fully-managed AWS services to host the open data lake components, including Amazon RDS, Amazon MKS, Amazon EKS, and EMR.

Demonstration
Source Code
All source code for this post and the previous posts in this series are open-sourced and located on GitHub. The following files are used in the demonstration:
- MoMA data: Uncompress files and import pipe-delimited data to PostgreSQL;
base.properties
: Base Hudi DeltaStreamer properties;deltastreamer_artists_file_based_schema.properties
: Demo-specific Hudi DeltaStreamer properties for MoMA Artists;deltastreamer_artworks_file_based_schema.properties
: Demo-specific Hudi DeltaStreamer properties for MoMA Artworks;source_connector_moma_postgres_kafka.json
: Kafka Connect Source Connector (PostgreSQL to Kafka);sink_connector_moma_kafka_s3.json
: Kafka Connect Sink Connector (Kafka to Amazon S3);moma_debezium_hudi_demo.ipynb
: Jupyter PySpark Notebook;demonstration_notes.md
: Commands used in the demonstration;
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Hydrating a Data Lake using Log-based Change Data Capture (CDC) with Debezium, Apicurio, and Kafka Connect on AWS
Posted by Gary A. Stafford in Analytics, AWS, Big Data, Cloud, Kubernetes on August 21, 2021
Import data from Amazon RDS into Amazon S3 using Amazon MSK, Apache Kafka Connect, Debezium, Apicurio Registry, and Amazon EKS
Introduction
In the last post, Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS, we utilized Kafka Connect to export data from an Amazon RDS for PostgreSQL relational database and import the data into a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 was converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect. To improve data freshness, as data was added or updated in the PostgreSQL database, Kafka Connect automatically detected those changes and streamed them into the data lake using query-based Change Data Capture (CDC).
This follow-up post will examine log-based CDC as a marked improvement over query-based CDC to continuously stream changes from the PostgreSQL database to the data lake. We will perform log-based CDC using Debezium’s Kafka Connect Source Connector for PostgreSQL rather than Confluent’s Kafka Connect JDBC Source connector, which was used in the previous post for query-based CDC. We will store messages as Apache Avro in Kafka running on Amazon Managed Streaming for Apache Kafka (Amazon MSK). Avro message schemas will be stored in Apicurio Registry. The schema registry will run alongside Kafka Connect on Amazon Elastic Kubernetes Service (Amazon EKS).

Change Data Capture
According to Gunnar Morling, Principal Software Engineer at Red Hat, who works on the Debezium and Hibernate projects, and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

You can find another excellent explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.
Query-based vs. Log-based CDC
To demonstrate the high-level differences between query-based and log-based CDC, let’s examine the results of a simple SQL UPDATE statement captured with both CDC methods.
UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;
Here is how that change is represented as a JSON message payload using the query-based CDC method described in the previous post.
{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}
Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.
{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}
Avro and Schema Registry
Apache Avro is a compact, fast, binary data format, according to the documentation. Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic scripting languages since data, together with its schema, is fully self-describing.
We can decouple the data from its schema by using schema registries like the Confluent Schema Registry or Apicurio Registry. According to Apicurio, in a messaging and event streaming architecture, data published to topics and queues must often be serialized or validated using a schema (e.g., Apache Avro, JSON Schema, or Google Protocol Buffers). Of course, schemas can be packaged in each application. Still, it is often a better architectural pattern to register schemas in an external system [schema registry] and then reference them from each application.
It is often a better architectural pattern to register schemas in an external system and then reference them from each application.
Using Debezium’s PostgreSQL source connector, we will store changes from the PostgreSQL database’s write-ahead log (WAL) as Avro in Kafka, running on Amazon MSK. The message’s schema will be stored separately in Apicurio Registry as opposed to with the message, thus reducing the size of the messages in Kafka and allowing for schema validation and schema evolution.

pagila.public.film
schemaDebezium
Debezium, according to their website, continuously monitors your databases and lets any of your applications stream every row-level change in the same order they were committed to the database. Event streams can be used to purge caches, update search indexes, generate derived views and data, and keep other data sources in sync. Debezium is a set of distributed services that capture row-level changes in your databases. Debezium records all row-level changes committed to each database table in a transaction log. Then, each application reads the transaction logs they are interested in, and they see all of the events in the same order in which they occurred. Debezium is built on top of Apache Kafka and integrates with Kafka Connect.
The latest version of Debezium includes support for monitoring MySQL database servers, MongoDB replica sets or sharded clusters, PostgreSQL servers, and SQL Server databases. We will be using Debezium’s PostgreSQL connector to capture row-level changes in the Pagila PostgreSQL database. According to Debezium’s documentation, the first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content committed to the database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.
Prerequisites
Similar to the previous post, this post will focus on data movement, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:
- Amazon RDS for PostgreSQL instance (data source);
- Amazon S3 bucket (data sink);
- Amazon MSK cluster;
- Amazon EKS cluster;
- Connectivity between the Amazon RDS instance and Amazon MSK cluster;
- Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
- Ensure the Amazon MSK Configuration has
auto.create.topics.enable=true
. This setting isfalse
by default; - IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);
As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1
, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.
Source Code
All source code for this post and the previous post, including the Kafka Connect and connector configuration files and the Helm charts, is open-sourced and located on GitHub.GitHub — garystafford/kafka-connect-msk-demo: For the post, Hydrating a Data Lake using Change Data…
For the post, Hydrating a Data Lake using Change Data Capture (CDC), Apache Kafka, and Kubernetes on AWS — GitHub …github.com
Authentication and Authorization
Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK:Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM
Securely decoupling Go-based microservices on Amazon EKS using Amazon MSK with IRSA, SASL/SCRAM, and data encryptionitnext.io
Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (known as IRSA) allows EKS to access MSK and S3 using IAM (see more details below).
Sample PostgreSQL Database
For this post, we will continue to use PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytical query costs.

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.
Last Updated Trigger
Each table in the Pagila database has a last_update
field. A simplistic way to detect changes in the Pagila database is to use the last_update
field. This is a common technique to determine if and when changes were made to data using query-based CDC, as demonstrated in the previous post. As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update
field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.
CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();
Kafka Connect and Schema Registry
There are several options for deploying and managing Kafka Connect, the Kafka management APIs and command-line tools, and the Apicurio Registry. I prefer deploying a containerized solution to Kubernetes on Amazon EKS. Some popular containerized Kafka options include Strimzi, Confluent for Kubernetes (CFK), and Debezium. Another option is building your own Docker Image using the official Apache Kafka binaries. I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries for this post. I then installed the necessary Confluent and Debezium connectors and their associated Java dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf container, building your own image will teach you how Kafka, Kafka Connect, and Debezium work, in my opinion.
In regards to the schema registry, both Confluent and Apicurio offer containerized solutions. Apicurio has three versions of their registry, each with a different storage mechanism: in-memory, SQL, and Kafka. Since we already have an existing Amazon RDS PostgreSQL instance as part of the demonstration, I chose the Apicurio SQL-based registry Docker Image for this post, apicurio/apicurio-registry-sql:2.0.1.Final
.
If you choose to use the same Kafka Connect and Apicurio solution I used in this post, a Helm Chart is included in the post’s GitHub repository, kafka-connect-msk-v2
. The Helm chart will deploy a single Kubernetes pod to the kafka
Namespace on Amazon EKS. The pod comprises both the Kafka Connect and Apicurio Registry containers. The deployment is intended for demonstration purposes and is not designed for use in Production.
apiVersion: v1
kind: Service
metadata:
name: kafka-connect-msk
spec:
type: NodePort
selector:
app: kafka-connect-msk
ports:
- port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.1.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent
- image: apicurio/apicurio-registry-sql:2.0.1.Final
name: apicurio-registry-mem
imagePullPolicy: IfNotPresent
env:
- name: REGISTRY_DATASOURCE_URL
value: jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/apicurio-registry
- name: REGISTRY_DATASOURCE_USERNAME
value: apicurio_registry
- name: REGISTRY_DATASOURCE_PASSWORD
value: 1L0v3Kafka!
Before deploying the chart, create a new PostgreSQL database, user, and grants on your RDS instance for the Apicurio Registry to use for storage:
CREATE DATABASE "apicurio-registry";
CREATE USER apicurio_registry WITH PASSWORD '1L0v3KafKa!';
GRANT CONNECT, CREATE ON DATABASE "apicurio-registry" to apicurio_registry;
Update the Helm chart’s value.yaml
file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName
) and your RDS URL (registryDatasourceUrl
). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on Amazon MSK and to your S3 bucket from Kafka Connect running on Amazon EKS.
Once the variables are updated, use the following command to deploy the Helm chart:
helm install kafka-connect-msk-v2 ./kafka-connect-msk-v2 \
--namespace $NAMESPACE --create-namespace
Confirm the chart was installed successfully by checking the pod’s status:
kubectl get pods -n kafka -l app=kafka-connect-msk

If you have any issues with either container while deploying, review the individual container’s logs:
export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl logs $KAFKA_CONTAINER -n kafka kafka-connect-msk
kubectl logs $KAFKA_CONTAINER -n kafka apicurio-registry-mem
Kafka Connect
Get a shell to the running Kafka Connect container using the kubectl exec
command:
export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -c kafka-connect-msk -- bash

Confirm Access to Registry from Kafka Connect
If the Helm Chart was deployed successfully, you should now observe 11 new tables in the public
schema of the new apicurio-registry
database. Below, we see the new database and tables, as shown in pgAdmin.

Confirm the registry is running and accessible from the Kafka Connect container by calling the registry’s system/info
REST API endpoint:
curl -s http://localhost:8080/apis/registry/v2/system/info | jq

The Apicurio Registry’s Service targets TCP port 8080. The Service is exposed on the Kubernetes worker node’s external IP address at a static port, the NodePort
. To get the NodePort
of the service, use the following command:
kubectl describe services kafka-client-msk -n kafka
To access the Apicurio Registry’s web-based UI, add the NodePort
to the Security Group of the EKS nodes with the source being your IP address, a /32
CIDR block.
To get the external IP address (EXTERNAL-IP
) of any Amazon EKS worker nodes, use the following command:
kubectl get nodes -o wide
Use the <NodeIP>:<NodePort>
combination to access the UI from your web browser, for example, http://54.237.41.128:30433
. The registry will be empty at this point in the demonstration.

Configure Bootstrap Brokers
Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone or distributed modes. Since we will be using Kafka Connect’s distributed mode, modify the config/connect-distributed.properties
file. A complete sample of the configuration file I used in this post is shown below.
Kafka Connect and the schema registry will run on Amazon EKS, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers
property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:
# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')
Update the config/connect-distributed.properties
file.
# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
For convenience when executing Kafka commands, set the BBROKERS
environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:
export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"
Confirm Access to Amazon MSK from Kafka Connect
To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties
You can also try listing the existing Kafka consumer groups:
bin/kafka-consumer-groups.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties
If either of these fails, you likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.
Once configured, start Kafka Connect as a background process.
Kafka Connect
bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
To confirm Kafka Connect starts properly, immediately tail the connect.log
file. The log will capture any startup errors for troubleshooting.
tail -f logs/connect.log

You can also examine the background process with the ps
command to confirm Kafka Connect is running. Note the process with PID 4915, shown below. Use the kill
command along with the PID to stop Kafka Connect if necessary.

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, when Kafka Connect starts up. The topics are defined in the config/connect-distributed.properties
file: connect-configs
, connect-offsets
, and connect-status
. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-
Kafka Connect Connectors
This post demonstrates the use of a set of Kafka Connect source and sink connectors. The source connector is based on the Debezium Source Connector for PostgreSQL and the Apicurio Registry. The sink connector is based on the Confluent Amazon S3 Sink connector and the Apicurio Registry.
Connector Source
Create or modify the file, config/debezium_avro_source_connector_postgresql_05.json
. Update lines 3–6, as shown below, to reflect your RDS instance connection details.
The source connector exports existing data and ongoing changes from six related tables within the Pagila database’s public
schema: actor
, film
, film_actor
, category
, film_category
, and language
. Data will be imported into a corresponding set of six new Kafka topics: pagila.public.actor
, pagila.public.film
, and so forth. (see line 9, above).

Data from the tables is stored in Apache Avro format in Kafka, and the schemas are stored separately in the Apicurio Registry (lines 11–18, above).
Connector Sink
Create or modify the file, config/s3_sink_connector_05_debezium_avro.json
. Update line 7, as shown below to reflect your Amazon S3 bucket’s name.
The sink connector flushes new data to S3 every 300 records or 60 seconds from the six Kafka topics (lines 4–5, 9–10, above). The schema for the data being written to S3 is extracted from the Apicurio Registry (lines 17–24, above).
The sink connector optimizes the raw data imported into S3 for downstream processing by writing GZIP-compressed Apache Parquet files to Amazon S3. Using Parquet’s columnar file format and file compression should help optimize ELT against the raw data once in S3 (lines 12–13, above).
Deploy Connectors
Deploy the source and sink connectors using the Kafka Connect REST Interface:
curl -s -d @"config/debezium_avro_source_connector_postgresql_05.json
" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/debezium_avro_source_connector_postgresql_05
/config | jq
curl -s -d @"config/s3_sink_connector_05_debezium_avro
.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_05_debezium_avro
/config | jq
Confirming the Deployment
Use the following commands to confirm the new set of connectors are deployed and running correctly.
curl -s -X GET http://localhost:8083/connectors | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/debezium_avro_source_connector_postgresql_05
/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_05_debezium_avro
/status | jq

The items stored in Apicurio Registry, such as event schemas and API designs, are known as registry artifacts. If we re-visit the Apicurio Registry’s UI, we should observe 12 artifacts — a ‘key’ and ‘value’ artifact for each of the six tables we exported from the Pagila database.

Examing the Amazon S3, you should note six sets of S3 objects within the /topics/
object key prefix organized by topic name.

Within each topic name key, there should be a set of GZIP-compressed Parquet files.

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the AWS CLI with the s3
API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.film/partition=0/pagila.public.film+0+0000000000.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
In the sample data below, note the metadata-rich structure of the log-based CDC messages as compared to the query-based messages we observed in the previous post:
{
"after": {
"special_features": [
"Deleted Scenes",
"Behind the Scenes"
],
"rental_duration": 6,
"rental_rate": 0.99,
"release_year": 2006,
"length": 86,
"replacement_cost": 20.99,
"rating": "PG",
"description": "A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies",
"language_id": 1,
"title": "ACADEMY DINOSAUR",
"original_language_id": null,
"last_update": "2017-09-10T17:46:03.905795Z",
"film_id": 1
},
"source": {
"schema": "public",
"sequence": "[null,\"1177089474560\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177089474560,
"name": "pagila",
"txId": 18422,
"version": "1.6.1.Final",
"ts_ms": 1629340334432,
"snapshot": "true",
"db": "pagila",
"table": "film"
},
"op": "r",
"ts_ms": 1629340334434
}
Database Changes with Log-based CDC
What happens when we change data within the tables that Debezium and Kafka Connect are monitoring? To answer this question, let’s make a few DML changes to the Pagila database: inserts, updates, and deletes:
INSERT INTO public.category (name)
VALUES ('Techno Thriller');
UPDATE public.film
SET release_year = 2021,
rental_rate = 2.99
WHERE film_id = 1;
UPDATE public.film
SET rental_duration = 3
WHERE film_id = 2;
UPDATE public.film_category
SET category_id = (
SELECT DISTINCT category_id
FROM public.category
WHERE name = 'Techno Thriller')
WHERE film_id = 3;
UPDATE public.actor
SET first_name = upper('Kate'),
last_name = upper('Winslet')
WHERE actor_id = 6;
DELETE
FROM public.film_actor
WHERE film_id = 375;
To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The Kafka Connect source connector detects changes, which are then exported from PostgreSQL to Kafka. The sink connector then writes these changes to Amazon S3.

We can view the S3 bucket, which should now have new Parquet files corresponding to our changes. For example, the two updates we made to the film record with film_id
of 1. Note the operation is an update ("op": "u"
) and the presence of the data in after
block.
{
"after": {
"special_features": [
"Deleted Scenes",
"Behind the Scenes"
],
"rental_duration": 6,
"rental_rate": 2.99,
"release_year": 2021,
"length": 86,
"replacement_cost": 20.99,
"rating": "PG",
"description": "A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies",
"language_id": 1,
"title": "ACADEMY DINOSAUR",
"original_language_id": null,
"last_update": "2021-08-19T03:19:57.073053Z",
"film_id": 1
},
"source": {
"schema": "public",
"sequence": "[\"1177693455424\",\"1177693455424\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177693471392,
"name": "pagila",
"txId": 18445,
"version": "1.6.1.Final",
"ts_ms": 1629343197100,
"snapshot": "false",
"db": "pagila",
"table": "film"
},
"op": "u",
"ts_ms": 1629343197389
}
In another example, we see the delete made in the film_actor
table, to the record with the film_id
of 375. Note the operation is a delete ("op": "d"
) and the presence of the before
block but no after
block.
{
"before": {
"last_update": "1970-01-01T00:00:00Z",
"actor_id": 5,
"film_id": 375
},
"source": {
"schema": "public",
"sequence": "[\"1177693516520\",\"1177693516520\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177693516520,
"name": "pagila",
"txId": 18449,
"version": "1.6.1.Final",
"ts_ms": 1629343198400,
"snapshot": "false",
"db": "pagila",
"table": "film_actor"
},
"op": "d",
"ts_ms": 1629343198426
}
Debezium Event Flattening SMT
The challenge with the Debezium message structure shown above in S3 is the verbosity of the payload and the nested nature of the data. As a result, developing SQL queries against such records would be difficult. For example, given the message structure shown above, even the simplest query in Amazon Athena becomes significantly more complex:
SELECT after.actor_id, after.first_name, after.last_name, after.last_update
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY after.actor_id
ORDER BY after.last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_actor") AS x
WHERE x.row_num = 1
ORDER BY after.actor_id;
To specifically address the needs of different consumers, Debezium offers the event flattening single message transformation (SMT). The event flattening transformation is a Kafka Connect SMT. We covered Kafka Connect SMTs in the previous post. Using the event flattening SMT, we can shape the message received by Kafka to be more attuned to the specific consumers of our data lake. To implement the event flattening SMT, modify and redeploy the source connector, adding additional configuration (lines 19–23, below).
We will include the op
, db
, schema
, lsn
, and source.ts_ms
metadata fields, along with the actual record data (table
) in the transformed message. This means we have chosen to exclude all other fields from the messages. The transform will flatten the message’s nested structure.
Making this change to the message structure by adding the transformation results in new versions of the message’s schemas automatically being added to the Apicurio Registry by the source connector:

pagila.public.film
schemaAs a result of the event flattening SMT by the source connector, our message structure is significantly simplified:
{
"actor_id": 7,
"first_name": "BOB",
"last_name": "MOSTEL",
"last_update": "2021-08-19T21:01:55.090858Z",
"__op": "u",
"__db": "pagila",
"__schema": "public",
"__table": "actor",
"__lsn": 1191920555344,
"__source_ts_ms": 1629406915091,
"__deleted": "false"
}
Note the new __deleted
field, which results from lines 21–22 of the source connector configuration, shown above. Debezium keeps tombstone records for DELETE operations in the event stream and adds __deleted
, set to true
or false
. Below, we see an example of two DELETE operations on the film_actor
table.
{
"actor_id": 52,
"film_id": 376,
"last_update": "1970-01-01T00:00:00Z",
"__op": "d",
"__db": "pagila",
"__schema": "public",
"__table": "film_actor",
"__lsn": 1192390296016,
"__source_ts_ms": 1629408869556,
"__deleted": "true"
}
{
"actor_id": 60,
"film_id": 376,
"last_update": "1970-01-01T00:00:00Z",
"__op": "d",
"__db": "pagila",
"__schema": "public",
"__table": "film_actor",
"__lsn": 1192390298976,
"__source_ts_ms": 1629408869556,
"__deleted": "true"
}
Viewing Data in the Data Lake
A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

With the data crawled and cataloged in Glue, let’s perform some additional changes to the Pagila database’s film
table.
UPDATE public.film
SET release_year = 2019,
rental_rate = 3.99
WHERE film_id = 1;
UPDATE public.film
SET rental_duration = 4
WHERE film_id = 2;
UPDATE public.film
SET rental_duration = 7
WHERE film_id = 2;
INSERT INTO public.category (name)
VALUES ('Steampunk');
UPDATE public.film_category
SET category_id = (
SELECT DISTINCT category_id
FROM public.category
WHERE name = 'Steampunk')
WHERE film_id = 3;
UPDATE public.film
SET release_year = 2017,
rental_rate = 3.99
WHERE film_id = 4;
UPDATE public.film_actor
SET film_id = 100
WHERE film_id = 5;
UPDATE public.film_category
SET film_id = 100
WHERE film_id = 5;
UPDATE public.inventory
SET film_id = 100
WHERE film_id = 5;
DELETE
FROM public.film
WHERE film_id = 5;
We should be able to almost immediately observe these database changes by executing a query with Amazon Athena. The changes are propagated from PostgreSQL to Kafka to S3 within seconds or less by Kafka Connect based on the connector configurations. Performing a typical query in Athena will return all of the original records as well as any updates or deletes we made as duplicate records (records identical film_id
primary keys).
SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp
FROM "pagila_kafka_connect"."pagila_public_film"
ORDER BY film_id, timestamp

Note the original records as well as each change we made earlier. The timestamp
field, derived from the __source_ts_ms
metadata field represents the server time at which the transaction was committed, according to Debezium. Also, note the records with their film_id
of 5 in the query results — the record we deleted from the film
table. The field values are (mostly) null in the latest record, except for any fields with default values in the Pagila table definition. If there are default values (e.g., rental_duration smallint default 3 not null
or rental_rate numeric(4,2) default 4.99 not null
) set on a field, those values end up in the deleted record when using the event flattening SMT. It doesn’t negatively impact anything except adding additional size to a tombstone record (unclear if this is expected behavior with Debezium or an artifact of the WAL entry).
To view only the most current data and ignore deleted records, we can use the ROW_NUMBER()
function and add a predicate to check the value of the __deleted
field:
SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY film_id
ORDER BY __source_ts_ms DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_film") AS x
WHERE x.row_num = 1
AND __deleted != 'true'
ORDER BY film_id

Now we only see the latest records, including the removal of any deleted records. Although this method is effective for a single set of records, the query is far too intricate to apply to complex joins and aggregations, in my opinion.
Data Movement
Using Amazon Athena, we can easily write the results of our ROW_NUMBER()
query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT
(CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT
statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by the movie rating
, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.
CREATE TABLE pagila_kafka_connect.pagila_public_film_refined
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['rating'],
external_location='s3://my-s3-table/refined/film/'
) AS
SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp, rating
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY film_id
ORDER BY __source_ts_ms DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_film") AS x
WHERE x.row_num = 1
AND __deleted = 'false'
ORDER BY film_id

Examing the Amazon S3 bucket, again, you should observe a new set of S3 objects within the /refined/film/
key path, partitioned by rating
.

We should also see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS statement. We can perform additional queries on the refined dataset.
SELECT *
FROM "pagila_kafka_connect"."pagila_public_film_refined"
ORDER BY film_id

CRUD Operations in the Data Lake
To fully take advantage of CDC and maximize the freshness of data in the data lake, we would need to also adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level upserts and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.
Conclusion
This post explored how log-based CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon MSK, Amazon EKS, Apache Kafka Connect, Debezium, Apache Avro, and Apicurio Registry. In a subsequent post, we will learn how data lake file formats like Apache Hudi, Apache Iceberg, and Delta Lake, along with Apache Spark Structured Streaming, can help us actively manage the data in our data lake.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS
Posted by Gary A. Stafford in AWS, Cloud, Enterprise Software Development, Kubernetes, Serverless on August 11, 2021
Import data from an Amazon RDS database into an Amazon S3-based data lake using Amazon EKS, Amazon MSK, and Apache Kafka Connect
Introduction
A data lake, according to AWS, is a centralized repository that allows you to store all your structured and unstructured data at any scale. Data is collected from multiple sources and moved into the data lake. Once in the data lake, data is organized, cataloged, transformed, enriched, and converted to common file formats, optimized for analytics and machine learning.
One of an organization’s first challenges when building a data lake is how to continually import data from different data sources, such as relational and non-relational database engines, enterprise ERP, SCM, CRM, and SIEM software, flat-files, messaging platforms, IoT devices, and logging and metrics collection systems. Each data source will have its own unique method of connectivity, security, data storage format, and data export capabilities. There are many closed- and open-source tools available to help extract data from different data sources.
A popular open-source tool is Kafka Connect, part of the Apache Kafka ecosystem. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. Kafka Connect makes it simple to quickly define connectors that move large collections of data into and out of Kafka.
In the following post, we will learn how to use Kafka Connect to export data from our data source, an Amazon RDS for PostgreSQL relational database, into Kafka. We will then export that data from Kafka into our data sink — a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 will be converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect.
Best of all, to maintain data freshness of the data lake, as data is added or updated in PostgreSQL, Kafka Connect will automatically detect those changes and stream those changes into the data lake. This process is commonly referred to as Change Data Capture (CDC).

Change Data Capture
According to Gunnar Morling, Principal Software Engineer at Red Hat who works on the Debezium and Hibernate projects and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

You can find another good explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.
Query-based vs. Log-based CDC
To effectively demonstrate the difference between query-based and log-based CDC, examine the results of a SQL UPDATE statement, captured with both methods.
UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;
Here is how the change is represented as a JSON message payload using the query-based CDC method described in this post.
{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}
Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.
{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}
In an upcoming post, we will explore Debezium along with Apache Arvo and a schema registry to build a log-based CDC solution using PostgreSQL’s write-ahead log (WAL). In this post, we will examine query-based CDC using the ‘update timestamp’ technique.
Kafka Connect Connectors
In this post, we will use source and sink connectors from Confluent. Confluent is the undisputed leader in providing enterprise-grade managed Kafka through their Confluent Cloud and Confluent Platform products. Confluent offers dozens of source and sink connectors that cover the most popular data sources and sinks. Connectors used in this post will include:
- Confluent’s Kafka Connect JDBC Source connector imports data from any relational database with a JDBC driver into an Apache Kafka topic. The Kafka Connect JDBC Sink connector exports data from Kafka topics to any relational database with a JDBC driver.
- Confluent’s Kafka Connect Amazon S3 Sink connector exports data from Apache Kafka topics to S3 objects in either Avro, Parquet, JSON, or Raw Bytes.
Prerequisites
This post will focus on data movement with Kafka Connect, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:
- Amazon RDS for PostgreSQL instance (data source);
- Amazon S3 bucket (data sink);
- Amazon MSK cluster;
- Amazon EKS cluster;
- Connectivity between the Amazon RDS instance and Amazon MSK cluster;
- Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
- Ensure the Amazon MSK Configuration has
auto.create.topics.enable=true
. This setting isfalse
by default; - IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);
As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1
, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.
Source Code
All source code for this post, including the Kafka Connect configuration files and the Helm chart, is open-sourced and located on GitHub.
Authentication and Authorization
Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK, Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.
Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (IRSA) allows EKS to access MSK and S3 using IAM (see more details below).
Sample PostgreSQL Database
There are many sample PostgreSQL databases we could use to explore Kafka Connect. One of my favorite, albeit a bit dated, is PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytics costs.

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.
Last Updated Trigger
Each table in the Pagila database has a last_update
field. A convenient way to detect changes in the Pagila database, and ensure those changes make it from RDS to S3, is to have Kafka Connect use the last_update
field. This is a common technique to determine if and when changes were made to data using query-based CDC.
As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update
field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.
CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();
Kubernetes-based Kafka Connect
There are several options for deploying and managing Kafka Connect and other required Kafka management tools to Kubernetes on Amazon EKS. Popular solutions include Strimzi and Confluent for Kubernetes (CFK) or building your own Docker Image using the official Apache Kafka binaries. For this post, I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries. I then installed Confluent’s connectors and their dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf OSS container, building your own image can really teach you how Kafka and Kafka Connect work, in my opinion.
If you chose to use the same Kafka Connect Image used in this post, a Helm Chart is included in the post’s GitHub repository. The Helm chart will deploy a single Kubernetes pod to the kafka
Namespace on Amazon EKS.
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.0.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent
Before deploying the chart, update the value.yaml
file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName
). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on MSK and to S3 from Kafka Connect running on EKS.
Once the Service Account variable is updated, use the following command to deploy the Helm chart:
helm install kafka-connect-msk ./kafka-connect-msk \
--namespace $NAMESPACE --create-namespace
To get a shell to the running Kafka Connect container, use the following kubectl exec
command:
export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash

Configure Bootstrap Brokers
Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone and distributed modes. Since we will use Kafka Connect’s distributed mode, modify the config/connect-distributed.properties
file. A complete sample of the configuration file I used in this post is shown below.
Kafka Connect will run within the pod’s container, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers
property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:
# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')
Update the config/connect-distributed.properties
file.
# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
For convenience when executing Kafka commands, set the BBROKERS
environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:
export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"
Confirm Access to Amazon MSK from Kafka Connect
To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties
You can also try listing the existing Kafka consumer groups:
bin/kafka-consumer-groups.sh --list \ --bootstrap-server $BBROKERS \ --command-config config/client-iam.properties
If either of these fails, you will likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.
Kafka Connect
I recommend starting Kafka Connect as a background process using either method shown below.
bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
# alternately use nohup
nohup bin/connect-distributed.sh \
config/connect-distributed.properties &
To confirm Kafka Connect started properly, immediately tail the connect.log
file. The log will capture any startup errors for troubleshooting.
tail -f logs/connect.log

You can also examine the background process with the ps
command to confirm Kafka Connect is running. Note the process with PID 4915, below. Use the kill
command along with the PID to stop Kafka Connect if necessary.

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, the first time it starts up, as defined in the config/connect-distributed.properties
file: connect-configs
, connect-offsets
, and connect-status
. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-
Kafka Connect Connectors
This post demonstrates three progressively more complex Kafka Connect source and sink connectors. Each will demonstrate different connector capabilities to import/export and transform data between Amazon RDS for PostgreSQL and Amazon S3.
Connector Source #1
Create a new file (or modify the existing file if using my Kafka Connect container) named config/jdbc_source_connector_postgresql_00.json
. Modify lines 3–5, as shown below, to reflect your RDS instance’s JDBC connection details.
This first Kafka Connect source connector uses Confluent’s Kafka Connect JDBC Source connector (io.confluent.connect.jdbc.JdbcSourceConnector
) to export data from RDS with a JDBC driver and import that data into a series of Kafka topics. We will be exporting data from three tables in Pagila’s public
schema: address
, city
, and country
. We will write that data to a series of topics, arbitrarily prefixed with database name and schema, pagila.public.
. The source connector will create the three new topics automatically: pagila.public.address
, pagila.public.city
, and pagila.public.country
.
Note the connector’s mode
property value is set to timestamp
, and the last_update
field is referenced in the timestamp.column.name
property. Recall we added the database function and triggers to these three tables earlier in the post, which will update the last_update
field whenever a record is created or updated in the Pagila database. In addition to an initial export of the entire table, the source connector will poll the database every 5 seconds (poll.interval.ms
property), looking for changes that are newer than the most recently exported last_modified
date. This is accomplished by the source connector, using a parameterized query, such as:
SELECT *
FROM "public"."address"
WHERE "public"."address"."last_update" > ?
AND "public"."address"."last_update" < ?
ORDER BY "public"."address"."last_update" ASC
Connector Sink #1
Next, create and configure the first Kafka Connect sink connector. Create a new file or modify config/s3_sink_connector_00.json
. Modify line 7, as shown below to reflect your Amazon S3 bucket name.
This first Kafka Connect sink connector uses Confluent’s Kafka Connect Amazon S3 Sink connector (io.confluent.connect.s3.S3SinkConnector
) to export data from Kafka topics to Amazon S3 objects in JSON format.
Deploy Connectors #1
Deploy the source and sink connectors using the Kafka Connect REST Interface. Many tutorials demonstrate a POST
method against the /connectors
endpoint. However, this then requires a DELETE
and an additional POST
to update the connector. Using a PUT
against the /config
endpoint, you can update the connector without first issuing a DELETE
.
curl -s -d @"config/jdbc_source_connector_postgresql_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/config | jq
curl -s -d @"config/s3_sink_connector_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_00/config | jq

You can confirm the source and sink connectors are deployed and running using the following commands:
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_00/status | jq

Errors preventing the connector from starting correctly will be displayed using the /status
endpoint, as shown in the example below. In this case, the Kubernetes Service Account associated with the pod lacked the proper IAM permissions to the Amazon S3 target bucket.

Confirming Success of Connectors #1
The entire contents of the three tables will be exported from RDS to Kafka by the source connector, then exported from Kafka to S3 by the sink connector. To confirm the source connector worked, verify the existence of three new Kafka topics that should have been created: pagila.public.address
, pagila.public.city
, and pagila.public.country
.
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep pagila.public.
To confirm the sink connector worked, verify the new S3 objects have been created in the data lake’s S3 bucket. If you use the AWS CLI v2’s s3
API, we can view the contents of our target S3 bucket:
aws s3api list-objects \
--bucket your-s3-bucket \
--query 'Contents[].{Key: Key}' \
--output text
You should see approximately 15 new S3 objects (JSON files) in the S3 bucket, whose keys are organized by their topic names. The sink connector flushes new data to S3 every 100 records, or 60 seconds.
You could also use the AWS Management Console to view the S3 bucket’s contents.

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
For example, the address
table’s data will look similar to the following using the ‘Query with S3 Select’ feature via the console or API:
{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000
}
Congratulations, you have successfully imported data from a relational database into your data lake using Kafka Connect!
Connector Source #2
Create a new file or modify config/jdbc_source_connector_postgresql_01.json
. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.
This second Kafka Connect source connector also uses Confluent’s Kafka Connect JDBC Source connector to export data from the just address
table with a JDBC driver and import that data into a new Kafka topic, pagila.public.alt.address
. The difference with this source connector is transforms, known as Single Message Transformations (SMTs). SMTs are applied to messages as they flow through Connect from RDS to Kafka.
In this connector, there are four transforms, which perform the following common functions:
- Extract
address_id
integer field as the Kafka message key, as detailed in this blog post by Confluence (see ‘Setting the Kafka message key’). - Append Kafka topic name into message as a new static field;
- Append database name into message as a new static field;
Connector Sink #2
Create a new file or modify config/s3_sink_connector_01.json
. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.
This second sink connector is nearly identical to the first sink connector, except it only exports data from a single Kafka topic, pagila.public.alt.address
, into S3.
Deploy Connectors #2
Deploy the second set of source and sink connectors using the Kafka Connect REST Interface, exactly like the first pair.
curl -s -d @"config/jdbc_source_connector_postgresql_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/config | jq
curl -s -d @"config/s3_sink_connector_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_01/config | jq
Confirming Success of Connectors #2
Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first set, for a total of four connectors.
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_01/status | jq

To view the results of the first transform, extracting the address_id
integer field as the Kafka message key, we can use a Kafka command-line consumer:
bin/kafka-console-consumer.sh \
--topic pagila.public.alt.address \
--offset 102 --partition 0 --max-messages 5 \
--property print.key=true --property print.value=true \
--property print.offset=true --property print.partition=true \
--property print.headers=false --property print.timestamp=false \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
In the output below, note the beginning of each message, which displays the Kafka message key, identical to the address_id
. For example, {"type":"int32","optional":false},"payload":100}
.

topicExaming the Amazon S3 bucket using the AWS Management Console or the CLI, you should note the fourth set of S3 objects within the /topics/pagila.public.alt.address/
object key prefix.

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.alt.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
In the sample data below, note the two new fields that have been appended into each record, a result of the Kafka Connector transforms:
{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
Congratulations, you have successfully imported more data from a relational database into your data lake, including performing a simple series of transforms using Kafka Connect!
Connector Source #3
Create or modify config/jdbc_source_connector_postgresql_02.json
. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.
Unlike the first two source connectors that export data from tables, this connector uses a SELECT query to export data from the Pagila database’s address
, city
, and country
tables and import the results of that SQL query data into a new Kafka topic, pagila.public.alt.address
. The SQL query in the source connector’s configuration is as follows:
SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
The final parameterized query, executed by the source connector, which allows it to detect changes based on the last_update
field is as follows:
SELECT *
FROM (SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
WHERE "last_update" > ?
AND "last_update" < ?
ORDER BY "last_update" ASC
Connector Sink #3
Create or modify config/s3_sink_connector_02.json
. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.
This sink connector is significantly different than the previous two sink connectors. In addition to leveraging SMTs in the corresponding source connector, we are also using them in this sink connector. The sink connect appends three arbitrary static fields to each record as it is written to Amazon S3 — message_source
, message_source_engine
, and environment
using the InsertField transform. The sink connector also renames the district
field to state_province
using the ReplaceField transform.
The first two sink connectors wrote uncompressed JSON-format files to Amazon S3. This third sink connector optimizes the data imported into S3 for downstream data analytics. The sink connector writes GZIP-compressed Apache Parquet files to Amazon S3. In addition, the compressed Parquet files are partitioned by the country
field. Using a columnar file format, compression, and partitioning, queries against the data should be faster and more efficient.
Deploy Connectors #3
Deploy the final source and sink connectors using the Kafka Connect REST Interface, exactly like the first two pairs.
curl -s -d @"config/jdbc_source_connector_postgresql_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/config | jq
curl -s -d @"config/s3_sink_connector_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_02/config | jq
Confirming Success of Connectors #3
Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first two sets, for a total of six connectors.
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_02/status | jq

Reviewing the messages within the newpagila.query
topic, note the message_topic
field has been appended to the message by the source connector but not message_source
, message_source_engine
, and environment
fields. The sink connector appends these fields as it writes the messages to S3. Also, note the district
field has yet to be renamed by the sink connector to state_province
.

pagila.query
topicExaming the Amazon S3 bucket, again, you should note the fifth set of S3 objects within the /topics/pagila.query/
object key prefix. The Parquet-format files within are partitioned by country
.

Within each country
partition, there are Parquet files whose records contain addresses within those countries.

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the s3
API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.query/country=United States/pagila.query+0+0000000003.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
In the sample data below, note the four new fields that have been appended into each record, a result of the source and sink connector SMTs. Also, note the renamed district
field:
{
"address_id": 599,
"address": "1895 Zhezqazghan Drive",
"address2": "",
"city": "Garden Grove",
"state_province": "California",
"postal_code": "36693",
"country": "United States",
"phone": "137809746111",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 6,
"address": "1121 Loja Avenue",
"address2": "",
"city": "San Bernardino",
"state_province": "California",
"postal_code": "17886",
"country": "United States",
"phone": "838635286649",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 18,
"address": "770 Bydgoszcz Avenue",
"address2": "",
"city": "Citrus Heights",
"state_province": "California",
"postal_code": "16266",
"country": "United States",
"phone": "517338314235",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
Record Updates and Query-based CDC
What happens when we change data within the tables that Kafka Connect is polling every 5 seconds? To answer this question, let’s make a few DML changes:
-- update address field
UPDATE public.address
SET address = '123 CDC Test Lane'
WHERE address_id = 100;
-- update address2 field
UPDATE public.address
SET address2 = 'Apartment #2201'
WHERE address_id = 101;
-- second update to same record
UPDATE public.address
SET address2 = 'Apartment #2202'
WHERE address_id = 101;
-- insert new country
INSERT INTO public.country (country)
values ('Wakanda');
-- should be 110
SELECT country_id FROM country WHERE country='Wakanda';
-- insert new city
INSERT INTO public.city (city, country_id)
VALUES ('Birnin Zana', 110);
-- should be 601
SELECT city_id FROM public.city WHERE country_id=110;
-- update city_id to new city_id
UPDATE public.address
SET phone = city_id = 601
WHERE address_id = 102;
-- second update to same record
UPDATE public.address
SET district = 'Lake Turkana'
WHERE address_id = 102;
-- delete an address record
UPDATE public.customer
SET address_id = 200
WHERE customer_id IN (
SELECT customer_id FROM customer WHERE address_id = 104);
DELETE
FROM public.address
WHERE address_id = 104;
To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The three Kafka Connect source connectors detect changes, which are exported from PostgreSQL to Kafka. The three sink connectors then write these changes to new JSON and Parquet files to the target S3 bucket.

Viewing Data in the Data Lake
A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

When writing Parquet into partitions, one shortcoming of the Kafka Connect S3 sink connector is duplicate column names in AWS Glue. As a result, any columns used as partitions are duplicated in the Glue Data Catalog’s database table schema. The issue will result in an error similar to HIVE_INVALID_METADATA: Hive metadata for table pagila_query is invalid: Table descriptor contains duplicate columns
when performing queries. To remedy this, predefine the table and the table’s schema. Alternately, edit the Glue Data Catalog table’s schema after crawling and remove the duplicate, non-partition column(s). Below, that would mean removing duplicate country
column 7.

Performing a typical SQL SELECT query in Athena will return all of the original records as well as the changes we made earlier as duplicate records (same address_id
primary key).

SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM "pagila_kafka_connect"."pagila_query"
WHERE address_id BETWEEN 100 AND 105
ORDER BY address_id;
Note the original records for address_id
100–103 as well as each change we made earlier. The last_update
field reflects the date and time the record was created or updated. Also, note the record with address_id
104 in the query results. This is the record we deleted from the Pagila database.
To view only the most current data, we can use Athena’s ROW_NUMBER()
function:
SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1
AND address_id BETWEEN 100 AND 105
ORDER BY address_id;
Now, we only see the latest records. Unfortunately, the record we deleted with address_id
104 is still present in the query results.
Using log-based CDC with Debezium, as opposed to query-based CDC, we would have received a record in S3 that indicated the delete. The null value message, shown below, is referred to as a tombstone message in Kafka. Note the ‘before’ syntax with the delete record as opposed to the ‘after’ syntax we observed earlier with the update record.
{
"before": {
"address": "",
"address2": null,
"phone": "",
"district": "",
"last_update": "1970-01-01T00:00:00Z",
"address_id": 104,
"postal_code": null,
"city_id": 0
},
"source": {
"schema": "public",
"sequence": "[\"1101256482032\",\"1101256482032\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1101256483936,
"name": "pagila",
"txId": 17137,
"version": "1.6.1.Final",
"ts_ms": 1628864251512,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "d",
"ts_ms": 1628864251671
}
An inefficient solution to duplicates and deletes with query-based CDC would be to bulk ingest the entire query result set from the Pagila database each time instead of only the changes based on the last_update
field. Performing an unbounded query repeatedly on a huge dataset would negatively impact database performance. Notwithstanding, you would still end up with duplicates in the data lake unless you first purged the data in S3 before re-importing the new query results.
Data Movement
Using Amazon Athena, we can easily write the results of our ROW_NUMBER()
query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT
(CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT
statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by country
, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.
CREATE TABLE pagila_kafka_connect.pagila_query_processed
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['country'],
external_location='s3://your-s3-bucket/processed/pagila_query'
) AS
SELECT address_id, last_update, address, address2, city,
state_province, postal_code, country
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_update DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1 AND address_id BETWEEN 0 and 100
ORDER BY address_id;
Examing the Amazon S3 bucket, on last time, you should new set of S3 objects within the /processed/pagila_query/
key path. The Parquet-format files, partitioned by country, are the result of the CTAS query.

We should now see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS query. We can perform additional queries on the processed data.

ACID Transactions with a Data Lake
To fully take advantage of CDC and maximize the freshness of data in the data lake, we would also need to adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level updates and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.
Conclusion
This post explored how CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon EKS, Amazon MSK, and Apache Kafka Connect. We learned about query-based CDC for capturing ongoing changes to the source data. In a subsequent post, we will explore log-based CDC using Debezium and see how data lake file formats like Apache Avro, Apache Hudi, Apache Iceberg, and Delta Lake can help us manage the data in our data lake.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.