The Art of Building Open Data Lakes with Apache Hudi, Kafka, Hive, and Debezium

Build near real-time, open-source data lakes on AWS using a combination of Apache Kafka, Hudi, Spark, Hive, and Debezium

Introduction

In the following post, we will learn how to build a data lake on AWS using a combination of open-source software (OSS), including Red Hat’s Debezium, Apache Kafka, Kafka Connect, Apache Hive, Apache Spark, Apache Hudi, and Hudi DeltaStreamer. We will use fully-managed AWS services to host the datasource, the data lake, and the open-source tools. These services include Amazon RDS, MKS, EKS, EMR, and S3.

The architecture and workflow demonstrated in this post

This post is an in-depth follow-up to the video demonstration, Building Open Data Lakes on AWS with Debezium and Apache Hudi.

Workflow

As shown in the architectural diagram above, these are the high-level steps in the demonstration’s workflow:

  1. Changes (inserts, updates, and deletes) are made to the datasource, a PostgreSQL database running on Amazon RDS;
  2. Kafka Connect Source Connector, utilizing Debezium and running on Amazon EKS (Kubernetes), continuously reads data from PostgreSQL WAL using Debezium;
  3. Source Connector creates and stores message schemas in Apicurio Registry, also running on Amazon EKS, in Avro format;
  4. Source Connector transforms and writes data in Apache Avro format to Apache Kafka, running on Amazon MSK;
  5. Kafka Connect Sink Connector, using Confluent S3 Sink Connector, reads messages from Kafka topics using schemas from Apicurio Registry;
  6. Sink Connector writes data to Amazon S3 in Apache Avro format;
  7. Apache Spark, using Hudi DeltaStreamer and running on Amazon EMR, reads message schemas from Apicurio Registry;
  8. DeltaStreamer reads raw Avro-format data from Amazon S3;
  9. DeltaStreamer writes data to Amazon S3 as both Copy on Write (CoW) and Merge on Read (MoR) table types;
  10. DeltaStreamer syncs Hudi tables and partitions to Apache Hive running on Amazon EMR;
  11. Queries are executed against Apache Hive Metastore or directly against Hudi tables using Apache Spark, with data returned from Hudi tables in Amazon S3;

The workflow described above actually contains two independent processes running simultaneously. Steps 2–6 represent the first process, the change data capture (CDC) process. Kafka Connect is used to continuously move changes from the database to Amazon S3. Steps 7–10 represent the second process, the data lake ingestion process. Hudi’s DeltaStreamer reads raw CDC data from Amazon S3 and writes the data back to another location in S3 (the data lake) in Apache Hudi table format. When combined, these processes can give us near real-time, incremental data ingestion of changes from the datasource to the Hudi-managed data lake.

Alternatives

This demonstration’s workflow is only one of many possible workflows to achieve similar outcomes. Alternatives include:

Source Code

All source code for this post and the previous posts in this series are open-sourced and located on GitHub. The specific resources used in this post are found in the debezium_hudi_demo directory of the GitHub repository. There are also two copies of the Museum of Modern Art (MoMA) Collection dataset from Kaggle, specifically prepared for this post, located in the moma_data directory. One copy is a nearly full dataset, and the other is a smaller, cost-effective dev/test version.

Kafka Connect

In this demonstration, Kafka Connect runs on Kubernetes, hosted on the fully-managed Amazon Elastic Kubernetes Service (Amazon EKS). Kafka Connect runs the Source and Sink Connectors.

Source Connector

The Kafka Connect Source Connector, source_connector_moma_postgres_kafka.json, used in steps 2–4 of the workflow, utilizes Debezium to continuously read changes to an Amazon RDS for PostgreSQL database. The PostgreSQL database hosts the MoMA Collection in two tables: artists and artworks.

{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<your_database_hostname>",
"database.port": "5432",
"database.user": "<your_username>",
"database.password": "<your_password>",
"database.dbname": "moma",
"database.server.name": "moma",
"table.include.list": "public.artists,public.artworks",
"plugin.name": "pgoutput",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;,
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2&quot;,
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,db,table,schema,lsn,source.ts_ms"
}

The Debezium Connector for PostgreSQL reads record-level insert, update, and delete entries from PostgreSQL’s write-ahead log (WAL). According to the PostgreSQL documentation, changes to data files must be written only after log records describing the changes have been flushed to permanent storage, thus the name, write-ahead log. The Source Connector then creates and stores Apache Avro message schemas in Apicurio Registry also running on Amazon EKS.

Apicurio Registry UI showing Avro-format Kafka message schemas
Apicurio Registry UI showing part of Avro-format Kafka message value schema for artists

Finally, the Source Connector transforms and writes Avro format messages to Apache Kafka running on the fully-managed Amazon Managed Streaming for Apache Kafka (Amazon MSK). Assuming Kafka’s topic.creation.enable property is set to true, Kafka Connect will create any necessary Kafka topics, one per database table.

Below, we see an example of a Kafka message representing an insert of a record with the artist_id 1 in the MoMA Collection database’s artists table. The record was read from the PostgreSQL WAL, transformed, and written to a corresponding Kafka topic, using the Debezium Connector for PostgreSQL. The first version represents the raw data before being transformed by Debezium. Note that the type of operation (_op) indicates a read (r). Possible values include c for create (or insert), u for update, d for delete, and r for read (applies to snapshots).

{
"payload": {
"before": null,
"after": {
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992
},
"source": {
"version": "1.7.0.Final",
"connector": "postgresql",
"name": "moma",
"ts_ms": 1640703877051,
"snapshot": "true",
"db": "moma",
"sequence": "[null,\"3668170506336\"]",
"schema": "public",
"table": "artists",
"txId": 217094,
"lsn": 3668170506336,
"xmin": null
},
"op": "r",
"ts_ms": 1640703877051,
"transaction": null
}
}

The next version represents the same record after being transformed by Debezium using the event flattening single message transformation (unwrap SMT). The final message structure represents the schema stored in Apicurio Registry. The message structure is identical to the structure of the data written to Amazon S3 by the Sink Connector.

{
"payload": {
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992,
"__op": "r",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3668438941792,
"__source_ts_ms": 1640705109121,
"__deleted": "false"
}
}

Sink Connector

The Kafka Connect Sink Connector, sink_connector_moma_kafka_s3.json, used in steps 5–6 of the workflow, implements the Confluent S3 Sink Connector. The Sink Connector reads the Avro-format messages from Kafka using the schemas stored in Apicurio Registry. It then writes the data to Amazon S3, also in Apache Avro format, based on the same schemas.

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.regex": "moma.public.(.*)",
"table.name.format": "${topic}",
"s3.region": "us-east-1",
"s3.bucket.name": "<your_data_lake_bucket>",
"s3.part.size": 5242880,
"flush.size": 10000,
"rotate.schedule.interval.ms": 60000,
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"behavior.on.null.values": "ignore",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}

Running Kafka Connect

We first start Kafka Connect in the background to be the CDC process.

bin/connect-distributed.sh \
config/connect-distributed.properties \
> /dev/null 2>&1 &
tail -f logs/connect.log

Then, deploy the Kafka Connect Source and Sink Connectors using Kafka Connect’s RESTful API. Using the API, we can also confirm the status of the Connectors.

curl -s -d @"config/source_connector_moma_postgres_kafka.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/source_connector_moma_postgres_kafka/config | jq
curl -s -d @"config/sink_connector_moma_kafka_s3.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/sink_connector_moma_kafka_s3/config | jq
curl -s -X GET http://localhost:8083/connectors | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/source_connector_moma_postgres_kafka/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/sink_connector_moma_kafka_s3/status | jq

To confirm the two Kafka topics, moma.public.artists and moma.public.artworks, were created and contain Avro messages, we can use Kafka’s command-line tools.

# list kafka topics
bin/kafka-topics.sh –list \
–bootstrap-server $BBROKERS \
–command-config config/client-iam.properties
# read first 5 avro-format (binary) messages from topic
bin/kafka-console-consumer.sh \
–topic moma.public.artists \
–from-beginning \
–max-messages 5 \
–property print.value=true \
–property print.offset=true \
–bootstrap-server $BBROKERS \
–consumer.config config/client-iam.properties

In the short video-only clip below, we see the process of deploying the Kafka Connect Source and Sink Connectors and confirming they are working as expected.

Deploying and starting the Kafka Connect Source and Sink Connectors

The Sink Connector writes data to Amazon S3 in batches of 10k messages or every 60 seconds (one-minute intervals). These settings are configurable and highly dependent on your requirements, including message volume, message velocity, real-time analytics requirements, and available compute resources.

Amazon S3 objects containing MoMA Collection artwork records from PostgreSQL

Since we will not be querying this raw Avro-format CDC data in Amazon S3 directly, there is no need to catalog this data in Apache Hive or AWS Glue Data Catalog, a fully-managed Hive-compatible metastore.

Apache Hudi

According to the overview, Apache Hudi (pronounced “hoodie”) is the next-generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality to data lakes. Hudi provides tables, transactions, efficient upserts and deletes, advanced indexes, streaming ingestion services, data clustering, compaction optimizations, and concurrency, all while keeping data in open source file formats.

Without Hudi or an equivalent open-source data lake table format such as Apache Iceberg or Databrick’s Delta Lake, most data lakes are just of bunch of unmanaged flat files. Amazon S3 cannot natively maintain the latest view of the data, to the surprise of many who are more familiar with OLTP-style databases or OLAP-style data warehouses.

DeltaStreamer

DeltaStreamer, aka the HoodieDeltaStreamer utility (part of the hudi-utilities-bundle), used in steps 7–10 of the workflow, provides the way to perform streaming ingestion of data from different sources such as Distributed File System (DFS) and Apache Kafka.

Optionally, HoodieMultiTableDeltaStreamer, a wrapper on top of HoodieDeltaStreamer, ingests multiple tables in a single Spark job, into Hudi datasets. Currently, it only supports sequential processing of tables to be ingested and Copy on Write table type.

We are using HoodieDeltaStreamer to write to both Merge on Read (MoR) and Copy on Write (CoW) table types for demonstration purposes only. The MoR table type is a superset of the CoW table type, which stores data using a combination of columnar-based (e.g., Apache Parquet) plus row-based (e.g., Apache Avro) file formats. Updates are logged to delta files and later compacted to produce new versions of columnar files synchronously or asynchronously. Again, the choice of table types depends on your requirements.

Trade-offs between Hudi table types (table courtesy Apache Hudi documentation)
Trade-offs between Hudi table types (table courtesy Apache Hudi documentation)

Amazon EMR

For this demonstration, I’ve used the recently released Amazon EMR version 6.5.0 configured with Apache Spark 3.1.2 and Apache Hive 3.1.2. EMR 6.5.0 runs Scala version 2.12.10, Python 3.7.10, and OpenJDK Corretto-8.312. I have included the AWS CloudFormation template and parameters file used to create the EMR cluster, on GitHub.

When choosing Apache Spark, Apache Hive, or Presto on EMR 6.5.0, Apache Hudi release 0.9.0 is automatically installed.

Amazon EMR Master Node showing Apache Hudi related resources

DeltaStreamer Configuration

Below, we see the DeltaStreamer properties file, deltastreamer_artists_apicurio_mor.properties. This properties file is referenced by the Spark job that runs DeltaStreamer, shown next. The file contains properties related to the datasource, the data sink, and Apache Hive. The source of the data for DeltaStreamer is the CDC data written to Amazon S3. In this case, the datasource is the objects located in the /topics/moma.public.artworks/partition=0/ S3 object prefix. The data sink is a Hudi MoR table type in Amazon S3. DeltaStreamer will write Parquet data, partitioned by the artist’s nationality, to the /moma_mor/artists/ S3 object prefix. Lastly, DeltaStreamer will sync all tables and table partitions to Apache Hive, including creating the Hive databases and tables if they do not already exist.

# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and SchemaRegistryProvider
include=base.properties
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=moma_mor
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields=nationality
hoodie.datasource.hive_sync.table=artists
hoodie.datasource.write.drop.partition.columns=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=nationality
hoodie.datasource.write.recordkey.field=artist_id
hoodie.deltastreamer.schemaprovider.registry.url=http://<your_registry_url:post>/apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_bucket>/topics/moma.public.artists/partition=0/
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
# https://dacort.dev/posts/updating-partition-values-with-apache-hudi/
# This is required if we want to ensure we upsert a record, even if the partition changes
hoodie.index.type=GLOBAL_BLOOM
# This is required to write the data into the new partition
# defaults to false in Apache Hudi 0.8.0 (EMR 6.4.0), true in Hudi 0.9.0 (EMR 6.5.0)
# hoodie.bloom.index.update.partition.path=true
DeltaStreamer properties file for artists data using MoR table type

Below, we see the equivalent DeltaStreamer properties file for the MoMA artworks, deltastreamer_artworks_apicurio_mor.properties. There are also comparable DeltaStreamer property files for the Hudi CoW tables on GitHub.

# Built for demo of Apache Hudi 0.9.0 (EMR 6.5.0) with Apache Hive and SchemaRegistryProvider
include=base.properties
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=moma_mor
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields=classification
hoodie.datasource.hive_sync.table=artworks
hoodie.datasource.write.drop.partition.columns=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.partitionpath.field=classification
hoodie.datasource.write.recordkey.field=artwork_id
hoodie.deltastreamer.schemaprovider.registry.url=http://<your_registry_url:post>/apis/ccompat/v6/subjects/moma.public.artworks-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_bucket>/topics/moma.public.artworks/partition=0/
# 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
hoodie.parquet.small.file.limit=134217728
# https://dacort.dev/posts/updating-partition-values-with-apache-hudi/
# This is required if we want to ensure we upsert a record, even if the partition changes
hoodie.index.type=GLOBAL_BLOOM
# This is required to write the data into the new partition
# defaults to false in Apache Hudi 0.8.0 (EMR 6.4.0), true in Hudi 0.9.0 (EMR 6.5.0)
# hoodie.bloom.index.update.partition.path=true
DeltaStreamer properties file for artworks data using MoR table type

All DeltaStreamer property files reference Apicurio Registry for the location of the Avro schemas. The schemas are used by both the Kafka Avro-format messages and the CDC-created Avro-format files in Amazon S3. Due to DeltaStreamer’s coupling with Confluent Schema Registry, as opposed to other registries, we must use Apicurio Registry’s Confluent Schema Registry API (Version 6) compatibility API endpoints (e.g., /apis/ccompat/v6/subjects/moma.public.artists-value/versions/latest) when using the org.apache.hudi.utilities.schema.SchemaRegistryProvider datasource option with DeltaStreamer. According to Apicurio, to provide compatibility with Confluent SerDes (Serializer/Deserializer) and other clients, Apicurio Registry implements the API defined by the Confluent Schema Registry.

Apicurio Registry exposes multiple APIs

Running DeltaStreamer

The properties files are loaded by Spark jobs that call the DeltaStreamer library, using spark-submit. Below, we see an example Spark job that calls the DeltaStreamer class. DeltaStreamer reads the raw Avro-format CDC data from S3 and writes the data using the Hudi MoR table type into the /moma_mor/artists/ S3 object prefix. In this Spark particular job, we are using the continuous option. DeltaStreamer runs in continuous mode using this option, running source-fetch, transform, and write in a loop. We are also using the UPSERT write operation (op). Operation options include UPSERT, INSERT, and BULK_INSERT. This set of options is ideal for inserting ongoing changes to CDC data into Hudi tables. You can run jobs in the foreground or background on EMR’s Master Node or as EMR Steps from the Amazon EMR console.

export DATA_LAKE_BUCKET="<your_data_lake_bucket_name>"
# artists data, MoR table type, continuous upserts
spark-submit \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–props "s3://${DATA_LAKE_BUCKET}/hudi/deltastreamer_artists_apicurio_mor.properties" \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–target-base-path "s3://${DATA_LAKE_BUCKET}/moma/artists_mor/" \
–target-table moma_mor.artists \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–enable-sync \
–continuous \
–op UPSERT

Below, we see another example DeltaStreamer Spark job that reads the raw Avro-format CDC data from S3 and writes the data using the MoR table type into the /moma_mor/artworks/ S3 object prefix. This example uses the BULK_INSERT write operation (op) and the filter-dupes option. The filter-dupes option ensures that should duplicate records from the source are dropped/filtered out before INSERT or BULK_INSERT. This set of options is ideal for the initial bulk inserting of existing data into Hudi tables. The job runs one time and completes, unlike the previous example that ran continuously.

export DATA_LAKE_BUCKET="<your_data_lake_bucket_name>"
# artworks data, MoR table type, 1x bulk insert
spark-submit \
–jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
–conf spark.sql.catalogImplementation=hive \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
–table-type MERGE_ON_READ \
–source-ordering-field __source_ts_ms \
–props "s3://${DATA_LAKE_BUCKET}/hudi/deltastreamer_artworks_apicurio_mor.properties" \
–source-class org.apache.hudi.utilities.sources.AvroDFSSource \
–target-base-path "s3://${DATA_LAKE_BUCKET}/moma/artworks_mor/" \
–target-table moma_mor.artworks \
–schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
–enable-sync \
–op BULK_INSERT \
–filter-dupes

Syncing with Hive

The following abridged, video-only clip demonstrates the differences between the Hudi CoW and MoR table types with respect to Apache Hive. In the video, we run the deltastreamer_jobs_bulk_bkgd.sh script, included on GitHub. This script runs four different Apache Spark jobs, using Hudi DeltaStreamer to bulk-ingest all the artists and artworks CDC data from Amazon S3 into both Hudi CoW and MoR table types. Once the four Spark jobs are complete, the script queries Apache Hive and displays the new Hive databases and database tables created by DeltaStreamer.

Hudi DeltaStreamer Spark jobs running on the Amazon EMR

In both the video above and terminal screengrab below, note the difference in the tables created within the two Hive databases, the Hudi CoW table type (moma_cow) and the MoR table type (moma_mor). The MoR table type creates both a read-optimized table (_ro) as well as a real-time table (_rt) for each datasource (e.g., artists_ro and artists_rt).

View of the Apache Hive CoW and MoR database tables

According to documentation, Hudi creates two tables in the Hive metastore for the MoR table type. The first, a table which is a read-optimized view appended with _ro and the second, a table with the same name appended with _rt which is a real-time view. According to Hudi, the read-optimized view exposes columnar Parquet while the real-time view exposes columnar Parquet and/or row-based logs; you can query both tables. The CoW table type creates a single table without a suffix for each datasource (e.g., artists). Below, we see the Hive table structure for the artists_rt table, created by DeltaStreamer, using SHOW CREATE TABLE moma_mor.artists_rt;.

CREATE EXTERNAL TABLE `moma_mor.artists_rt`(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`artist_id` int,
`name` string,
`gender` string,
`birth_year` int,
`death_year` int,
`__op` string,
`__db` string,
`__table` string,
`__schema` string,
`__lsn` bigint,
`__source_ts_ms` bigint,
`__deleted` string)
PARTITIONED BY (
`nationality` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'hoodie.query.as.ro.table'='false',
'path'='s3://<your_data_lake_bucket>/moma/artists_mor')
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://<your_data_lake_bucket>/moma/artists_mor'
TBLPROPERTIES (
'bucketing_version'='2',
'last_commit_time_sync'='20211230180429',
'spark.sql.partitionProvider'='catalog',
'spark.sql.sources.provider'='hudi',
'spark.sql.sources.schema.numPartCols'='1',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"artist_id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"gender","type":"string","nullable":true,"metadata":{}},{"name":"birth_year","type":"integer","nullable":true,"metadata":{}},{"name":"death_year","type":"integer","nullable":true,"metadata":{}},{"name":"__op","type":"string","nullable":true,"metadata":{}},{"name":"__db","type":"string","nullable":true,"metadata":{}},{"name":"__table","type":"string","nullable":true,"metadata":{}},{"name":"__schema","type":"string","nullable":true,"metadata":{}},{"name":"__lsn","type":"long","nullable":true,"metadata":{}},{"name":"__source_ts_ms","type":"long","nullable":true,"metadata":{}},{"name":"__deleted","type":"string","nullable":true,"metadata":{}},{"name":"nationality","type":"string","nullable":true,"metadata":{}}]}',
'spark.sql.sources.schema.partCol.0'='nationality',
'transient_lastDdlTime'='1640919578')

Having run the demonstration’s deltastreamer_jobs_bulk_bkgd.sh script, the resulting object structure in the Hudi-managed section of the Amazon S3 bucket looks as follows.

S3 object structure in Hudi-managed Amazon S3 bucket

Below is an example of Hudi files created in the /moma/artists_cow/ S3 object prefix. When using data lake table formats like Hudi, given its specialized directory structure and the high number of objects, interactions with the data should be abstracted through Hudi’s programming interfaces. Generally speaking, you do not interact directly with the objects in a data lake.

"moma/artists_cow/.hoodie/.aux/.bootstrap/.fileids_$folder$",
"moma/artists_cow/.hoodie/.aux/.bootstrap/.partitions_$folder$",
"moma/artists_cow/.hoodie/.aux/.bootstrap_$folder$",
"moma/artists_cow/.hoodie/.aux_$folder$",
"moma/artists_cow/.hoodie/.temp_$folder$",
"moma/artists_cow/.hoodie/20211231203737.commit",
"moma/artists_cow/.hoodie/20211231203737.commit.requested",
"moma/artists_cow/.hoodie/20211231203737.inflight",
"moma/artists_cow/.hoodie/20211231203738.rollback",
"moma/artists_cow/.hoodie/20211231203738.rollback.inflight",
"moma/artists_cow/.hoodie/archived_$folder$",
"moma/artists_cow/.hoodie/hoodie.properties",
"moma/artists_cow/.hoodie_$folder$",
"moma/artists_cow/nationality=Afghan/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Afghan/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-0_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Afghan_$folder$",
"moma/artists_cow/nationality=Albanian/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Albanian/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-1_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Albanian_$folder$",
"moma/artists_cow/nationality=Algerian/.hoodie_partition_metadata",
"moma/artists_cow/nationality=Algerian/4f39e019-e3d7-4a6a-a7bd-6d07eddd495a-2_0-28-3352_20211231203737.parquet",
"moma/artists_cow/nationality=Algerian_$folder$",
"moma/artists_cow/nationality=American/.hoodie_partition_metadata",
"moma/artists_cow/nationality=American/0065ed77-4a6c-4755-b133-45126310936d-0_502-28-3854_20211231203737.parquet",
"moma/artists_cow/nationality=American/011d5c57-c918-40d8-8518-c3cb56747133-0_15-28-3367_20211231203737.parquet"
Hudi CLI commands used in the next video

Hudi CLI

Optionally, we can inspect the Hudi tables using the Hudi CLI (hudi-cli). The CLI offers an extensive list of available commands. Using the CLI, we can inspect the Hudi tables and their schemas, and review operational statistics like write amplification (the number of bytes written for 1 byte of incoming data), commits, and compactions.

> hudi-cli
help
connect –path s3://<your_data_lake_bucket>/moma/artworks_mor/
connect –path s3://<your_data_lake_bucket>/moma/artworks_cow/
desc
fetch table schema
commits show
stats wa
compactions show all
Using the Hudi CLI from the Amazon EMR Master Node

The following short video-only clip shows the use of the Hudi CLI, running on the Amazon EMR Master Node, to inspect the Hudi tables in S3.

Using the Hudi CLI from the Amazon EMR Master Node

Hudi Data Structure

Recall the sample Kafka message we saw earlier in the post representing an insert of an artist record with the artist_id 1. Below, we see what the same record looks like after being ingested by Hudi DeltaStreamer. Note the five additional fields added by Hudi with the _hoodie_ prefix.

{
"_hoodie_commit_time": "20211227215352",
"_hoodie_commit_seqno": "20211227215352_63_7301",
"_hoodie_record_key": "1",
"_hoodie_partition_path": "nationality=American",
"_hoodie_file_name": "0e91bb5b-aa93-42a9-933d-242f5fda1b8f-0_63-24-4710_20211227215352.parquet",
"artist_id": 1,
"name": "Robert Arneson",
"nationality": "American",
"gender": "Male",
"birth_year": 1930,
"death_year": 1992,
"__op": "r",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3637434647944,
"__source_ts_ms": 1640566580452,
"__deleted": "false"
}

Querying Hudi-managed Data

With the initial data ingestion complete and the CDC and DeltaStreamer processes monitoring for future changes, we can query the resulting data stored in Hudi tables. First, we will make some changes to the PostgreSQL MoMA Collection database to see how Hudi manages the data mutations. We could also make changes directly to the Hudi tables using Hive, Spark, or Presto. However, that would cause our datasource to be out of sync with the Hudi tables, potentially negating the entire CDC process. When developing a data lake, this is a critically important consideration — how changes are introduced to Hudi tables, especially when CDC is involved, and whether data continuity between datasources and the data lake is essential.

For the demonstration, I have made a series of arbitrary updates to a piece of artwork in the MoMA Collection database, ‘Picador (La Pique)’ by Pablo Picasso.

'Picador (La Pique)', by Pablo Picasso
SELECT *
FROM artworks
WHERE artwork_id = 128447 AND classification = 'Print';
firts update (creation date)
UPDATE artworks
SET date = 1959
WHERE artwork_id = 128447;
second update (acquisition date)
UPDATE artworks
SET acquisition_date = '2009-04-15'
WHERE artwork_id = 128447;
third update (in vs. '')
UPDATE artworks
SET dimensions = 'composition: 20 13/16 x 25 3/16 in (52.9 x 64 cm); sheet: 24 7/16 x 29 1/2 in (62.1 x 75 cm)'
WHERE artwork_id = 128447;
fourth update (acquisition date)
UPDATE artworks
SET acquisition_date = '2009-04-19'
WHERE artwork_id = 128447;

Below, note the last four objects shown in S3. Judging by the file names and dates, we can see that the CDC process, using Kafka Connect, has picked up the four updates I made to the record in the database. The Source Connector first wrote the changes to Kafka. The Sink Connector then read those Kafka messages and wrote the data to Amazon S3 in Avro format, as shown below.

Looking again at S3, we can also observe that DeltaStreamer picked up the new CDC objects in Amazon S3 and wrote them to both the Hudi CoW and MoR tables. Note the file types shown below. Given Hudi’s MoR table type structure, Hudi first logged the changes to row-based delta files and later compacted them to produce a new version of the columnar-format Parquet file.

Hudi MoR row-based delta log files and compacted Parquet files

Querying Results from Apache Hive

There are several ways to query Hudi-managed data in S3. In this demonstration, they include against Apache Hive using the hive client from the command line, against Hive using Spark, and against the Hudi tables also using Spark. We could also install Presto on EMR to query the Hudi data directly or via Hive.

Querying the real-time artwork_rt table in Hive after we make each database change, we can observe the data in Hudi reflects the updates. Note that the value of the _hoodie_file_name field for the first three updates is a Hudi delta log file, while the value for the last update is a Parquet file. The Parquet file signifies compaction occurred between the fourth update was made, and the time the Hive query was executed. Lastly, note the type of operation (_op) indicates an update change (u) for all records.

Querying the data in the Hudi MoR real-time table as we make changes to the database

Once all fours database updates are complete and compaction has occurred, we should observe identical results from all Hive tables. Below, note the _hoodie_file_name field for all three tables is a Parquet file. Logically, the Parquet file for the MoR read-optimized and real-time Hive tables is the same.

Querying the same record in all three Hive tables: Hudi MoR _ro and _rt tables and CoW table

Had we queried the data previous to compaction, the results would have differed. Below we have three queries. I further updated the artwork record, changing the date field from 1959 to 1960. The read-optimized MoR table, artworks_ro, still reflects the original date value, 1959, before the update and prior to compaction. The real-time table,artworks_rt , reflects the latest update to the date field, 1960. Note that the value of the _hoodie_file_name field for the read-optimized table is a Parquet file, while the value for the real-time table (artworks_rt), the third and final query, is a delta log file. The delta log allows the real-time table to display the most current state of the data in Hudi.

Querying the same record in all three Hive tables

Below are a few useful Hive commands to query the changes in Hudi.

beeline or hive
beeline connect
!connect jdbc:hive2://localhost:10000/default
SHOW DATABASES;
DESCRIBE DATABASE moma_mor;
USE moma_cow;SHOW TABLES;
USE moma_mor;SHOW TABLES;
USE moma_mor;DESCRIBE artworks_ro;
MSCK REPAIR TABLE moma_mor.artworks_ro;
SHOW PARTITIONS moma_mor.artworks_ro;
ANALYZE TABLE moma_mor.artists_rt COMPUTE STATISTICS;
DESCRIBE EXTENDED moma_mor.artists_rt;
test query performance without caching
set hive.query.results.cache.enabled=false;
100 rows selected (1.394 seconds) <- read-optimized vs. real-time table
SELECT * FROM moma_mor.artworks_ro WHERE department='Prints & Illustrated Books' LIMIT 100;
100 rows selected (2.371 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE department='Prints & Illustrated Books' LIMIT 100;
10 rows selected (0.719 seconds) <- read-optimized vs. real-time table, classification is partitioned
SELECT * FROM moma_mor.artworks_ro WHERE classification='Print' LIMIT 10;
10 rows selected (1.482 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE classification='Print' LIMIT 10;
EXPLAIN EXTENDED SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447 AND classification='Print';
1 row selected (14.126 seconds) <- read-optimized vs. real-time table
SELECT * FROM moma_mor.artworks_ro WHERE artwork_id=128447;
1 row selected (32.877 seconds)
SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447;
1 row selected (1.491 seconds) <- classification is partitioned
SELECT * FROM moma_mor.artworks_rt WHERE artwork_id=128447 AND classification='Print';
84 rows selected (8.618 seconds)
SELECT artworks.title AS title,
artworks.`date` AS created,
artworks.name AS artist,
artists.nationality AS nationality,
artworks.classification AS classification
FROM moma_cow.artworks artworks
JOIN moma_cow.artists artists ON (artworks.artist_id = artists.artist_id)
WHERE artworks.artist_id = 4609
AND nationality = 'Spanish'
AND classification = 'Print'
AND artworks.`date` IS NOT NULL
ORDER BY created, title;

Deletes with Hudi

In addition to inserts and updates (upserts), Apache Hudi can manage deletes. Hudi supports implementing two types of deletes on data stored in Hudi tables: soft deletes and hard deletes. Given this demonstration’s specific configuration for CDC and DeltaStreamer, we will use soft deletes. Soft deletes retain the record key and nullify the other field’s values. Hard deletes, a stronger form of deletion, physically remove any record trace from the Hudi table.

Below, we see the CDC record for the artist with artist_id 441. The event flattening single message transformation (SMT), used by the Debezium-based Kafka Connect Source Connector, adds the __deleted field with a value of true and nullifies all fields except the record’s key, artist_id, which is required.

{
"artist_id" : 441,
"name" : null,
"nationality" : null,
"gender" : null,
"birth_year" : null,
"death_year" : null,
"__op" : {
"string" : "d"
},
"__db" : {
"string" : "moma"
},
"__table" : {
"string" : "artists"
},
"__schema" : {
"string" : "public"
},
"__lsn" : {
"long" : 3692866569488
},
"__source_ts_ms" : {
"long" : 1640814436010
},
"__deleted" : {
"string" : "true"
}
}

Below, we see the same delete record for the artist with artist_id 441 in the Hudi MoR table. All the null fields have been removed.

{
"_hoodie_commit_time": "20211229225047",
"_hoodie_commit_seqno": "20211229225047_1_1",
"_hoodie_record_key": "441",
"_hoodie_partition_path": "nationality=default",
"_hoodie_file_name": "2a98931a-6015-438e-be78-1eff80a75f83-2_1-24-15431_20211229225047.parquet",
"artist_id": 441,
"__op": "d",
"__db": "moma",
"__table": "artists",
"__schema": "public",
"__lsn": 3692866569488,
"__source_ts_ms": 1640814436010,
"__deleted": "true"
}

Below, we see how the deleted record appears in the three Hive CoW and MoR artwork tables. Note the query results from the read-optimized MoR table, artworks_ro, contains two records — the original record (r) and the deleted record (d). The data is partitioned by nationality, and since the record was deleted, the nationality field is changed to null. In S3, Hudi represents this partition as nationality=default. The record now exists in two different Parquet files, within two separate partitions, something to be aware of when querying the read-optimized MoR table.

Results of a database delete as shown in Hive CoW and MoR tables

Time Travel

According to the documentation, Hudi has supported time travel queries since version 0.9.0. With time travel, you can query the previous state of your data. Time travel is particularly useful for use cases, including rollbacks, debugging, and audit history.

To demonstrate time travel queries in Hudi, we start by making some additional changes to the source database. For this demonstration, I made a series of five updates and finally a delete to the artist record with artist_id 299 in the PostgreSQL database over a few-hour period.

first update (birth)
UPDATE public.artists
SET birth_year = 1907
WHERE artist_id = 299;
second update (death)
UPDATE public.artists
SET death_year = 1989
WHERE artist_id = 299;
third update (middle initial)
UPDATE public.artists
SET name = 'Gerhard M. Bakker'
WHERE artist_id = 299;
fourth update (nationality – impacts partitions)
UPDATE public.artists
SET nationality = 'German'
WHERE artist_id = 299;
fifth update (birth)
UPDATE public.artists
SET birth_year = 1905
WHERE artist_id = 299;
delete
DELETE
FROM public.artists
WHERE artist_id = 299;

Once the CDC and DeltaStreamer ingestion processes are complete, we can use Hudi’s time travel query capability to view the state of data in Hudi at different points in time (instants). To do so, we need to provide an as.an.instant date/time value to Spark (see line 21 below).

Based on the time period in which I made the five updates and the delete, I have chosen six instants during that period where I want to examine the state of the record. Below is an example of the PySpark code from a Jupyter Notebook used to perform the six time travel queries against the Hudi MoR artist’s table.

from datetime import timedelta
from dateutil import parser
base_path = "s3://open-data-lake-demo-us-east-1/moma/artists_mor"
instances = [ # times in EST
"2021-12-30 08:00:00", # reflects original record (r)
"2021-12-30 09:00:00", # refects updates 1 and 2 (u)
"2021-12-30 09:30:00", # refects updates 3 (u)
"2021-12-30 11:00:00", # refects updates 4 (u)
"2021-12-30 12:30:00", # refects updates 5 (u)
"2021-12-30 14:00:00", # refects delete (d)
]
for instant in instants:
as_of_instant = parser.parse(instant) + timedelta(hours=5) # adjust EST for UTC
print(f"Record state as of: {as_of_instant}")
artistsSnapshotDF = (
spark.read.format("hudi").option("as.of.instant", as_of_instant).load(base_path)
)
artistsSnapshotDF.createOrReplaceTempView("hudi_artists_snapshot")
spark.sql(
"""
SELECT _hoodie_commit_time, __op, _hoodie_partition_path, name, nationality, gender, birth_year, death_year
FROM hudi_artists_snapshot
WHERE artist_id=299;
"""
).show()

Below, we see the results of the time travel queries. At each instant, we can observe the mutating state of the data in the Hudi MoR Artist’s table, including the initial bulk insert of the existing snapshot of data (r) and the delete record (d). Since the delete made in the PostgreSQL database was recorded as a soft delete in Hudi, as opposed to a hard delete, we are still able to retrieve the record at any instant.

Record state as of: 2021-12-30 13:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230034812| r| nationality=American|Gerhard H. Bakker| American| Male| 1906| 1988|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 14:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230132628| u| nationality=American|Gerhard H. Bakker| American| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 14:30:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230142035| u| nationality=American|Gerhard M. Bakker| American| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 16:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230144237| u| nationality=German|Gerhard M. Bakker| German| Male| 1907| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 17:30:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230171925| u| nationality=German|Gerhard M. Bakker| German| Male| 1905| 1989|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Record state as of: 2021-12-30 19:00:00
+——————-+—-+———————-+—————–+———–+——+———-+———-+
|_hoodie_commit_time|__op|_hoodie_partition_path| name|nationality|gender|birth_year|death_year|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
| 20211230180429| d| nationality=default| null| null| null| null| null|
+——————-+—-+———————-+—————–+———–+——+———-+———-+
Results of the time travel queries, ordered by commit time

In addition to time travel queries, Hudi also offers incremental queries and point in time queries.

Conclusion

Although this post only scratches the surface of the capabilities of Debezium and Hudi, you can see the power of CDC using Kafka Connect and Debezium, combined with Hudi, to build and manage open data lakes on AWS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

  1. Leave a comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: