Archive for category Kubernetes
Building and Deploying Cloud-Native Quarkus-based Java Applications to Kubernetes
Posted by Gary A. Stafford in AWS, Cloud, DevOps, Java Development, Kubernetes, Software Development on June 1, 2022
Developing, testing, building, and deploying Native Quarkus-based Java microservices to Kubernetes on AWS, using GitOps
Introduction
Although it may no longer be the undisputed programming language leader, according to many developer surveys, Java still ranks right up there with Go, Python, C/C++, and JavaScript. Given Java’s continued popularity, especially amongst enterprises, and the simultaneous rise of cloud-native software development, vendors have focused on creating purpose-built, modern JVM-based frameworks, tooling, and standards for developing applications — specifically, microservices.
Leading JVM-based microservice application frameworks typically provide features such as native support for a Reactive programming model, MicroProfile, GraalVM Native Image, OpenAPI and Swagger definition generation, GraphQL, CORS (Cross-Origin Resource Sharing), gRPC (gRPC Remote Procedure Calls), CDI (Contexts and Dependency Injection), service discovery, and distributed tracing.
Leading JVM-based Microservices Frameworks
Review lists of the most popular cloud-native microservices framework for Java, and you are sure to find Spring Boot with Spring Cloud, Micronaut, Helidon, and Quarkus at or near the top.
Spring Boot with Spring Cloud
According to their website, Spring makes programming Java quicker, easier, and safer for everybody. Spring’s focus on speed, simplicity, and productivity has made it the world’s most popular Java framework. Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications that you can just run. Spring Boot’s many purpose-built features make it easy to build and run your microservices in production at scale. However, the distributed nature of microservices brings challenges. Spring Cloud can help with service discovery, load-balancing, circuit-breaking, distributed tracing, and monitoring with several ready-to-run cloud patterns. It can even act as an API gateway.
Helidon
Oracle’s Helidon is a cloud-native, open‑source set of Java libraries for writing microservices that run on a fast web core powered by Netty. Helidon supports MicroProfile, a reactive programming model, and, similar to Micronaut, Spring, and Quarkus, it supports GraalVM Native Image.
Micronaut
According to their website, the Micronaut framework is a modern, open-source, JVM-based, full-stack toolkit for building modular, easily testable microservice and serverless applications. Micronaut supports a polyglot programming model, discovery services, distributed tracing, and aspect-oriented programming (AOP). In addition, Micronaut offers quick startup time, blazing-fast throughput, and a minimal memory footprint.
Quarkus
Quarkus, developed and sponsored by RedHat, is self-described as the ‘Supersonic Subatomic Java.’ Quarkus is a cloud-native, Kubernetes-native, [Linux] container first, microservices first framework for writing Java applications. Quarkus is a Kubernetes Native Java stack tailored for OpenJDK HotSpot and GraalVM, crafted from over fifty best-of-breed Java libraries and standards.
Developing Native Quarkus Microservices
In the following post, we will develop, build, test, deploy, and monitor a native Quarkus microservice application to Kubernetes. The RESTful service will expose a rich Application Programming Interface (API) and interacts with a PostgreSQL database on the backend.

Some of the features of the Quarkus application in this post include:
- Hibernate Object Relational Mapper (ORM), the de facto Jakarta Persistence API (formerly Java Persistence API) implementation
- Hibernate Reactive with Panache, a Quarkus-specific library that simplifies the development of Hibernate Reactive entities
- RESTEasy Reactive, a new JAX-RS implementation, works with the common Vert.x layer and is thus fully reactive
- Advanced RESTEasy Reactive Jackson support for JSON serialization
- Reactive PostgreSQL client
- Built on Mandrel, a downstream distribution of the GraalVM community edition
- Built with Gradle, the modern, open-source build automation tool focused on flexibility and performance
TL;DR
Do you want to explore the source code for this post’s Quarkus microservice application or deploy it to Kubernetes before reading the full article? All the source code and Kubernetes resources are open-source and available on GitHub:
git clone --depth 1 -b main \
https://github.com/garystafford/tickit-srv.git
The latest Docker Image is available on docker.io:
docker pull garystafford/tickit-srv:<latest-tag>
Quarkus Projects with IntelliJ IDE
Although not a requirement, I used JetBrains IntelliJ IDEA 2022 (Ultimate Edition) to develop and test the post’s Quarkus application. Bootstrapping Quarkus projects with IntelliJ is easy. Using the Quarkus plugin bundled with the Ultimate edition, developers can quickly create a Quarkus project.

The Quarkus plugin’s project creation wizard is based on code.quarkus.io. If you have bootstrapped a Spring Initializr project, code.quarkus.io works very similar to start.spring.io.

Visual Studio Code
RedHat also provides a Quarkus extension for the popular Visual Studio Code IDE.

Gradle
This post uses Gradle instead of Maven to develop, test, build, package, and deploy the Quarkus application to Kubernetes. Based on the packages selected in the new project setup shown above, the Quarkus plugin’s project creation wizard creates the following build.gradle
file (Lombak added separately).
The wizard also created the following gradle.properties
file, which has been updated to the latest release of Quarkus available at the time of this post, 2.9.2.
Gradle and Quarkus
You can use the Quarkus CLI or the Quarkus Maven plugin to scaffold a Gradle project. Taking a dependency on the Quarkus plugin adds several additional Quarkus tasks to Gradle. We will use Gradle to develop, test, build, containerize, and deploy the Quarkus microservice application to Kubernetes. The quarkusDev
, quarkusTest
, and quarkusBuild
tasks will be particularly useful in this post.

Java Compilation
The Quarkus application in this post is compiled as a native image with the most recent Java 17 version of Mandrel, a downstream distribution of the GraalVM community edition.
GraalVM and Native Image
According to the documentation, GraalVM is a high-performance JDK distribution. It is designed to accelerate the execution of applications written in Java and other JVM languages while also providing runtimes for JavaScript, Ruby, Python, and other popular languages.
Further, according to GraalVM, Native Image is a technology to ahead-of-time compile Java code to a stand-alone executable, called a native image. This executable includes the application classes, classes from its dependencies, runtime library classes, and statically linked native code from the JDK. The Native Image builder (native-image
) is a utility that processes all classes of an application and their dependencies, including those from the JDK. It statically analyzes data to determine which classes and methods are reachable during the application execution.
Mandrel
Mandrel is a downstream distribution of the GraalVM community edition. Mandrel’s main goal is to provide a native-image
release specifically to support Quarkus. The aim is to align the native-image
capabilities from GraalVM with OpenJDK and Red Hat Enterprise Linux libraries to improve maintainability for native Quarkus applications. Mandrel can best be described as a distribution of a regular OpenJDK with a specially packaged GraalVM Native Image builder (native-image
).
Docker Image
Once complied, the native Quarkus executable will run within the quarkus-micro-image:1.0
base runtime image deployed to Kubernetes. Quarkus provides this base image to ease the containerization of native executables. It has a minimal footprint (10.9 compressed/29.5 MB uncompressed) compared to other images. For example, the latest UBI (Universal Base Image) Quarkus Mandrel image (ubi-quarkus-mandrel:22.1.0.0-Final-java17
) is 714 MB uncompressed, while the OpenJDK 17 image (openjdk:17-jdk
) is 471 MB uncompressed. Even RedHat’s Universal Base Image Minimal image (ubi-minimal:8.6
) is 93.4 MB uncompressed.

An even smaller option from Quarkus is a distroless base image (quarkus-distroless-image:1.0)
is only 9.2 MB compressed / 22.7 MB uncompressed. Quarkus is careful to note that distroless image support is experimental and should not be used in production without rigorous testing.
PostgreSQL Database
For the backend data persistence tier of the Quarkus application, we will use PostgreSQL. All DDL (Data Definition Language) and DML (Data Manipulation Language) statements used in the post were tested with the most current version of PostgreSQL 14.
There are many PostgreSQL-compatible sample databases available that could be used for this post. I am using the TICKIT sample database provided by AWS and designed for Amazon Redshift, AWS’s cloud data warehousing service. The database consists of seven tables — two fact tables and five dimensions tables — in a traditional data warehouse star schema.
For this post, I have remodeled the TICKIT database’s star schema into a normalized relational data model optimized for the Quarkus application. The most significant change to the database is splitting the original Users
dimension table into two separate tables — buyer
and seller
. This change will allow for better separation of concerns (SoC), scalability, and increased protection of Personal Identifiable Information (PII).

Source Code
Each of the six tables in the PostgreSQL TICKIT database is represented by an Entity, Repository, and Resource Java class.

Entity Class
Java Persistence is the API for managing persistence and object/relational mapping. The Java Persistence API (JPA) provides Java developers with an object/relational mapping facility for managing relational data in Java applications. Each table in the PostgreSQL TICKIT database is represented by a Java Persistence Entity, as indicated by the Entity
annotation on the class declaration. The annotation specifies that the class is an entity.

Each entity class extends the PanacheEntityBase
class, part of the io.quarkus.hibernate.orm.panache
package. According to the Quarkus documentation, You can specify your own custom ID strategy, which is done in this post’s example, by extending PanacheEntityBase
instead of PanacheEntity
.
If you do not want to bother defining getters/setters for your entities, which we did not in the post’s example, extending PanacheEntityBase
, Quarkus will generate them for you. Alternately, extend PanacheEntity
and take advantage of the default ID it provides if you are not using a custom ID strategy.
The example SaleEntity
class shown below is typical of the Quarkus application’s entities. The entity class contains several additional JPA annotations in addition to Entity
, including Table
, NamedQueries
, Id
, SequenceGenerator
, GeneratedValue
, and Column
. The entity class also leverages Project Lombok annotations. Lombok generates two boilerplate constructors, one that takes no arguments (NoArgsConstructor
) and one that takes one argument for every field (AllArgsConstructor
).
The SaleEntity
class also defines two many-to-one relationships, with the ListingEntity
and BuyerEntity
entity classes. This relationship mirrors the database’s data model, as reflected in the schema diagram above. The relationships are defined using the ManyToOne
and JoinColumn
JPA annotations.
Given the relationships between the entities, a saleEntity
object, represented as a nested JSON object, would look as follows:
Repository Class
Each table in the PostgreSQL TICKIT database also has a corresponding repository class, often referred to as the ‘repository pattern.’ The repository class implements the PanacheRepositoryBase
interface, part of the io.quarkus.hibernate.orm.panache
package. The PanacheRepositoryBase
Java interface represents a Repository for a specific type of Entity. According to the documentation, if you are using repositories and have a custom ID strategy, then you will want to extend PanacheRepositoryBase
instead of PanacheRepository
and specify your ID type as an extra type parameter. Implementing the PanacheRepositoryBase
will give you the same methods on the PanacheEntityBase
.

The repository class allows us to leverage the methods already available through PanacheEntityBase
and add additional custom methods. For example, the repository class contains a custom method listWithPaging
. This method retrieves (GET
) a list of SaleEntity
objects with the added benefit of being able to indicate the page number, page size, sort by field, and sort direction.
Since there is a many-to-one relationship between the SaleEntity
class and the ListingEntity
and BuyerEntity
entity classes, we also have two custom methods that retrieve all SaleEntity
objects by either the BuyerEntity
ID or the EventEntity
ID. These two methods call the SQL queries in the SaleEntity, annotated with the JPA NamedQueries
/NamedQuery
annotations on the class declaration.
SmallRye Mutiny
Each method defined in the repository class returns a SmallRye Mutiny Uni<T>
. According to the website, Mutiny is an intuitive, event-driven Reactive programming library for Java. Mutiny provides a simple but powerful asynchronous development model that lets you build reactive applications. Mutiny can be used in any Java application exhibiting asynchrony, including reactive microservices, data streaming, event processing, API gateways, and network utilities.
Uni
Again, according to Mutiny’s documentation, a Uni
represents a stream that can only emit either an item or a failure event. A Uni<T>
is a specialized stream that emits only an item or a failure. Typically, Uni<T>
are great for representing asynchronous actions such as a remote procedure call, an HTTP request, or an operation producing a single result. A Uni
represents a lazy asynchronous action. It follows the subscription pattern, meaning that the action is only triggered once a UniSubscriber
subscribes to the Uni
.
Resource Class
Lastly, each table in the PostgreSQL TICKIT database has a corresponding resource class. According to the Quarkus documentation, all the operations defined within PanacheEntityBase
are available on your repository, so using it is exactly the same as using the active record pattern, except you need to inject it. We inject the corresponding repository class into the resource class, exposing all the available methods of the repository and PanacheRepositoryBase
. For example, note the custom listWithPaging
method below, which was declared in the SaleRepository
class.

Similar to the repository class, each method defined in the resource class also returns a SmallRye Mutiny (io.smallrye.mutiny
) Uni<T>
.
The repository defines HTTP methods (POST
, GET
, PUT
, and DELETE
) corresponding to CRUD operations on the database (Create, Read, Update, and Delete). The methods are annotated with the corresponding javax.ws.rs
annotation, indicating the type of HTTP request they respond to. The javax.ws.rs
package contains high-level interfaces and annotations used to create RESTful service resources, such as our Quarkus application.
The POST
, PUT
, and DELETE
annotated methods all have the io.quarkus.hibernate.reactive.panache.common.runtime
package’s ReactiveTransactional
annotation associated with them. We use this annotation on methods to run them in a reactive Mutiny.Session.Transation
. If the annotated method returns a Uni
, which they do, this has precisely the same behavior as if the method was enclosed in a call to Mutiny.Session.withTransaction(java.util.function.Function)
. If the method call fails, the complete transaction is rolled back.
Developer Experience
Quarkus has several features to enhance the developer experience. Features include Dev Services, Dev UI, live reload of code without requiring a rebuild and restart of the application, continuous testing where tests run immediately after code changes have been saved, configuration profiles, Hibernate ORM, JUnit, and REST Assured integrations. Using these Quarkus features, it’s easy to develop and test Quarkus applications.
Configuration Profiles
Similar to Spring, Quarkus works with configuration profiles. According to RedHat, you can use different configuration profiles depending on your environment. Configuration profiles enable you to have multiple configurations in the same application.properties
file and select between them using a profile name. Quarkus recognizes three default profiles:
- dev: Activated in development mode
- test: Activated when running tests
- prod: The default profile when not running in development or test mode
In the application.properties
file, the profile is prefixed using %environment.
format. For example, when defining Quarkus’ log level as INFO
, you add the common quarkus.log.level=INFO
property. However, to change only the test environment’s log level to DEBUG
, corresponding to the test
profile, you would add a property with the %test.
prefix, such as %test.quarkus.log.level=DEBUG
.
Dev Services
Quarkus supports the automatic provisioning of unconfigured services in development and test mode, referred to as Dev Services. If you include an extension and do not configure it, then Quarkus will automatically start the relevant service using Test containers behind the scenes and wire up your application to use this service.
When developing your Quarkus application, you could create your own local PostgreSQL database, for example, with Docker:
And the corresponding application configuration properties:
Zero-Config Database
Alternately, we can rely on Dev Services, using a feature referred to as zero config setup. Quarkus provides you with a zero-config database out of the box; no database configuration is required. Quarkus takes care of provisioning the database, running your DDL and DML statements to create database objects and populate the database with test data, and finally, de-provisioning the database container when the development or test session is completed. The database Dev Services will be enabled when a reactive or JDBC datasource extension is present in the application and the database URL has not been configured.
Using the quarkusDev
Gradle task, we can start the application running, as shown in the video below. Note the two new Docker containers that are created. Also, note the project’s import.sql
SQL script is run automatically, executing all DDL and DML statements to prepare and populate the database.
Bootstrapping the TICKIT Database
When using Hibernate ORM with Quarkus, we have several options regarding how the database is handled when the Quarkus application starts. These are defined in the application.properties file. The quarkus.hibernate-orm.database.generation
property determines whether the database schema is generated or not. drop-and-create
is ideal in development mode, as shown above. This property defaults to none
, however, if Dev Services is in use and no other extensions that manage the schema are present, this will default to drop-and-create
. Accepted values: none
, create
, drop-and-create
, drop
, update
, validate
. For development and testing modes, we are using Dev Services with the default value of drop-and-create
. For this post, we assume the database and schema already exist in production.
A second property, quarkus.hibernate-orm.sql-load-script
, provides the path to a file containing the SQL statements to execute when Hibernate ORM starts. In dev and test modes, it defaults to import.sql
. Simply add an import.sql
file in the root of your resources directory, Hibernate will be picked up without having to set this property. The project contains an import.sql
script to create all database objects and a small amount of test data. You can also explicitly set different files for different profiles and prefix the property with the profile (e.g., %dev.
or %test.
).
%dev.quarkus.hibernate-orm.database.generation=drop-and-create
%dev.quarkus.hibernate-orm.sql-load-script=import.sql
Another option is Flyway, the popular database migration tool commonly used in JVM environments. Quarkus provides first-class support for using Flyway.
Dev UI
According to the documentation, Quarkus now ships with a new experimental Dev UI, which is available in dev mode (when you start Quarkus with Gradle’s quarkusDev task) at /q/dev
by default. It allows you to quickly visualize all the extensions currently loaded, see their status and go directly to their documentation. In addition to access to loaded extensions, you can review logs and run tests in the Dev UI.

Configuration
From the Dev UI, you can access and modify the Quarkus application’s application configuration.

You also can view the configuration of Dev Services, including the running containers and no-config database config.

Quarkus REST Score Console
With RESTEasy Reactive extension loaded, you can access the Quarkus REST Score Console from the Dev UI. The REST Score Console shows endpoint performance through scores and color-coding: green, yellow, or red. RedHat published a recent blog that talks about the scoring process and how to optimize the performance endpoints. Three measurements show whether a REST reactive application can be optimized further.

Application Testing
Quarkus enables robust JVM-based and Native continuous testing by providing integrations with common test frameworks, such as including JUnit, Mockito, and REST Assured. Many of Quarkus’ testing features are enabled through annotations, such as QuarkusTestResource
, QuarkusTest
, QuarkusIntegrationTest
, and TransactionalQuarkusTest
.
Quarkus supports the use of mock objects using two different approaches. You can either use CDI alternatives to mock out a bean for all test classes or use QuarkusMock
to mock out beans on a per-test basis. This includes integration with Mockito.
The REST Assured integration is particularly useful for testing the Quarkus microservice API application. According to their website, REST Assured is a Java DSL for simplifying testing of REST-based services. It supports the most common HTTP request methods and can be used to validate and verify the response of these requests. REST Assured uses the given()
, when()
, then()
methods of testing made popular as part of Behavior-Driven Development (BDD).
The tests can be run using the the quarkusTest
Gradle task. The application contains a small number of integration tests to demonstrate this feature.

Swagger and OpenAPI
Quarkus provides the Smallrye OpenAPI extension compliant with the MicroProfile OpenAPI specification, which allows you to generate an API OpenAPI v3 specification and expose the Swagger UI. The /q/swagger-ui
resource exposes the Swagger UI, allowing you to visualize and interact with the Quarkus API’s resources without having any implementation logic in place.

Resources can be tested using the Swagger UI without writing any code.

OpenAPI Specification (formerly Swagger Specification) is an API description format for REST APIs. The /q/openapi
resource allows you to generate an OpenAPI v3 specification file. An OpenAPI file allows you to describe your entire API.

The OpenAPI v3 specification can be saved as a file and imported into applications like Postman, the API platform for building and using APIs.


GitOps with GitHub Actions
For this post, GitOps is used to continuously test, build, package, and deploy the Quarkus microservice application to Kubernetes. Specifically, the post uses GitHub Actions. GitHub Actions is a continuous integration and continuous delivery (CI/CD) platform that allows you to automate your build, test, and deployment pipelines. Workflows are defined in the .github/workflows
directory in a repository, and a repository can have multiple workflows, each of which can perform a different set of tasks.

Two GitHub Actions are associated with this post’s GitHub repository. The first action, build-test.yml
, natively builds and tests the source code in a native Mandrel container on each push to GitHub. The second action (shown below), docker-build-push.yml
, builds and containerizes the natively-built executable, pushes it to Docker’s Container Registry (docker.io), and finally deploys the application to Kubernetes. This action is triggered by pushing a new Git Tag to GitHub.

There are several Quarkus configuration properties included in the action’s build step. Alternately, these properties could be defined in the application.properties
file. However, I have decided to include them as part of the Gradle build task since they are specific to the type of build and container registry and Kubernetes platform I am pushing to artifacts.
Kubernetes Resources
The Kubernetes resources YAML file, created by the Quarkus build, is also uploaded and saved as an artifact in GitHub by the final step in the GitHub Action.

Quarkus automatically generates ServiceAccount
, Role
, RoleBinding
, Service
, Deployment
resources.
Choosing a Kubernetes Platform
The only cloud provider-specific code is in the second GitHub action.
In this case, the application is being deployed to an existing Amazon Elastic Kubernetes Service (Amazon EKS), a fully managed, certified Kubernetes conformant service from AWS. These steps can be easily replaced with steps to deploy to other Cloud platforms, such as Microsoft’s Azure Kubernetes Service (AKS) or Google Cloud’s Google Kubernetes Engine (GKE).
GitHub Secrets
Some of the properties use GitHub environment variables, and others use secure GitHub repository encrypted secrets. Secrets are used to secure Docker credentials used to push the Quarkus application image to Docker’s image repository, AWS IAM credentials, and the base64 encoded contents of the kubeconfig
file required to deploy to Kubernetes on AWS when using the kodermax/kubectl-aws-eks@master
GitHub action.

Docker
Reviewing the configuration properties included in the action’s build step, note the Mandrel container used to build the native Quarkus application, quay.io/quarkus/ubi-quarkus-mandrel:22.1.0.0-Final-java17
. Also, note the project’s Docker file is used to build the final Docker image, pushed to the image repository, and then used to provision containers on Kubernetes, src/main/docker/Dockerfile.native-micro
. This Dockerfile uses the quay.io/quarkus/quarkus-micro-image:1.0
base image to containerize the native Quarkus application.
The properties also define the image’s repository name and tag (e.g., garystafford/tickit-srv:1.1.0
).


Kubernetes
In addition to creating the ticket
Namespace in advance, a Kubernetes secret is pre-deployed to the ticket
Namespace. The GitHub Action also requires a Role and RoleBinding to deploy the workload to the Kubernetes cluster. Lastly, a HorizontalPodAutoscaler (HPA) is used to automatically scale the workload.
export NAMESPACE=tickit# Namespace
kubectl create namespace ${NAMESPACE}# Role and RoleBinding for GitHub Actions to deploy to Amazon EKS
kubectl apply -f kubernetes/github_actions_role.yml -n ${NAMESPACE}# Secret
kubectl apply -f kubernetes/secret.yml -n ${NAMESPACE}# HorizontalPodAutoscaler (HPA)
kubectl apply -f kubernetes/tickit-srv-hpa.yml -n ${NAMESPACE}
As part of the configuration properties included in the action’s build step, note the use of Kubernetes secrets.
-Dquarkus.kubernetes-config.secrets=tickit
-Dquarkus.kubernetes-config.secrets.enabled=true
This secret contains base64 encoded sensitive credentials and connection values to connect to the Production PostgreSQL database. For this post, I have pre-built an Amazon RDS for PostgreSQL database instance, created the ticket database and required database objects, and lastly, imported the sample data included in the GitHub repository, garystafford/tickit-srv-data
.
The five keys seen in the Secret are used in the application.properties
file to provide access to the Production PostgreSQL database from the Quakus application.
An even better alternative to using Kubernetes secrets on Amazon EKS is AWS Secrets and Configuration Provider (ASCP) for the Kubernetes Secrets Store CSI Driver. AWS Secrets Manager stores secrets as files mounted in Amazon EKS pods.
AWS Architecture
The GitHub Action pushes the application’s image to Docker’s Container Registry (docker.io), then deploys the application to Kubernetes. Alternately, you could use AWS’s Amazon Elastic Container Registry (Amazon ECR). Amazon EKS pulls the image from Docker as it creates the Kubernetes Pod containers.
There are many ways to route traffic from a requestor to the Quarkus application running on Kubernetes. For this post, the Quarkus application is exposed as a Kubernetes Service on a NodePort. For this post, I have registered a domain, example-api.com
, with Amazon Route 53 and a corresponding TLS certificate with AWS Certificate Manager. Inbound requests to the Quarkus application are directed to a subdomain, ticket.example-api.com
using HTTPS or port 443. Amazon Route 53 routes those requests to a Layer 7 application load balancer (ALB). The ALB then routes those requests to the Amazon EKS Kubernetes cluster on the NodePort using simple round-robin load balancing. Requests will be routed automatically by Kubernetes to the appropriate worker node and Kubernetes pod. The response then traverses a similar path back to the requestor.

Results
If the GitHub action is successful, any push of code changes to GitHub results in the deployment of the application to Kubernetes.

We can also view the deployed Quarkus application resources using the Kubernetes Dashboard.

Metrics
The post’s Quarkus application implements the micrometer-registry-prometheus
extension. The Micrometer metrics library exposes runtime and application metrics. Micrometer defines a core library, providing a registration mechanism for metrics and core metric types.

Using the Micrometer extension, a metrics resource is exposed at /q/metrics
, which can be scraped and visualized by tools such as Prometheus. AWS offers its fully-managed Amazon Managed Service for Prometheus (AMP), which easily integrates with Amazon EKS.

Using Prometheus as a datasource, we can build dashboards in Grafana to observe the Quarkus Application metrics. Similar to AMP, AWS offers its fully managed Amazon Managed Grafana (AMG).

Centralized Log Management
According to Quarkus documentation, internally, Quarkus uses JBoss Log Manager and the JBoss Logging facade. You can use the JBoss Logging facade inside your code or any of the supported Logging APIs, including JDK java.util.logging
(aka JUL), JBoss Logging, SLF4J, and Apache Commons Logging. Quarkus will send them to JBoss Log Manager.
There are many ways to centralize logs. For example, you can send these logs to open-source centralized log management systems like Graylog, Elastic Stack, fka ELK (Elasticsearch, Logstash, Kibana), EFK (Elasticsearch, Fluentd, Kibana), and OpenSearch with Fluent Bit.
If you are using Kubernetes, the simplest way is to send logs to the console and integrate a central log manager inside your cluster. Since the Quarkus application in this post is running on Amazon EKS, I have chosen Amazon OpenSearch Service with Fluent Bit, an open-source and multi-platform Log Processor and Forwarder. Fluent Bit is fully compatible with Docker and Kubernetes environments. Amazon provides an excellent workshop on installing and configuring Amazon OpenSearch Service with Fluent Bit.


Conclusion
As we learned in this post, Quarkus, the ‘Supersonic Subatomic Java’ framework, is a cloud-native, Kubernetes-native, container first, microservices first framework for writing Java applications. We observed how to build, test, and deploy a RESTful Quarkus Native application to Kubernetes.
Quarkus has capabilities and features well beyond this post’s scope. In a future post, we will explore other abilities of Quarkus, including observability, GraphQL integration, caching, database proxying, tracing and debugging, message queues, data pipelines, and streaming analytics.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author.
End-to-End Data Discovery, Observability, and Governance on AWS with LinkedIn’s Open-source DataHub
Posted by Gary A. Stafford in Analytics, AWS, Azure, Bash Scripting, Build Automation, Cloud, DevOps, GCP, Kubernetes, Python, Software Development, SQL, Technology Consulting on March 26, 2022
Use DataHub’s data catalog capabilities to collect, organize, enrich, and search for metadata across multiple platforms
Introduction
According to Shirshanka Das, Founder of LinkedIn DataHub, Apache Gobblin, and Acryl Data, one of the simplest definitions for a data catalog can be found on the Oracle website: “Simply put, a data catalog is an organized inventory of data assets in the organization. It uses metadata to help organizations manage their data. It also helps data professionals collect, organize, access, and enrich metadata to support data discovery and governance.”
Another succinct description of a data catalog’s purpose comes from Alation: “a collection of metadata, combined with data management and search tools, that helps analysts and other data users to find the data that they need, serves as an inventory of available data, and provides information to evaluate the fitness of data for intended uses.”
Working with many organizations in the area of Analytics, one of the more common requests I receive regards choosing and implementing a data catalog. Organizations have datasources hosted in corporate data centers, on AWS, by SaaS providers, and with other Cloud Service Providers. Several of these organizations have recently gravitated to DataHub, the open-source metadata platform for the modern data stack, originally developed by LinkedIn.

In this post, we will explore the capabilities of DataHub to build a centralized data catalog on AWS for datasources hosted in multiple AWS accounts, SaaS providers, cloud service providers, and corporate data centers. I will demonstrate how to build a DataHub data catalog using out-of-the-box data source plugins for automated metadata ingestion.

Data Catalog Competitors
Data catalogs are not new; technologies such as data dictionaries have been around as far back as the 1980’s. Gartner publishes their Metadata Management (EMM) Solutions Reviews and Ratings and Metadata Management Magic Quadrant. These reports contain a comprehensive list of traditional commercial enterprise players, modern cloud-native SaaS vendors, and Cloud Service Provider (CSP) offerings. DBMS Tools also hosts a comprehensive list of 30 data catalogs. A sampling of current data catalogs includes:
Open Source Software
Commercial
- Acryl Data (based on LinkedIn’s DataHub)
- Atlan
- Stemma (based on Lyft’s Amundsen)
- Talend
- Alation
- Collibra
- data.world
Cloud Service Providers
Data Catalog Features
DataHub describes itself as “a modern data catalog built to enable end-to-end data discovery, data observability, and data governance.” Sorting through vendor’s marketing jargon and hype, standard features of leading data catalogs include:
- Metadata ingestion
- Data discovery
- Data governance
- Data observability
- Data lineage
- Data dictionary
- Data classification
- Usage/popularity statistics
- Sensitive data handling
- Data fitness (aka data quality or data profiling)
- Manage both technical and business metadata
- Business glossary
- Tagging
- Natively supported datasource integrations
- Advanced metadata search
- Fine-grain authentication and authorization
- UI- and API-based interaction
Datasources
When considering a data catalog solution, in my experience, the most common datasources that customers want to discover, inventory, and search include:
- Relational databases and other OLTP datasources such as PostgreSQL, MySQL, Microsoft SQL Server, and Oracle
- Cloud Data Warehouses and other OLAP datasources such as Amazon Redshift, Snowflake, and Google BigQuery
- NoSQL datasources such as MongoDB, MongoDB Atlas, and Azure Cosmos DB
- Persistent event-streaming platforms such as Apache Kafka (Amazon MSK and Confluent)
- Distributed storage datasets (e.g., Data Lakes) such as Amazon S3, Apache Hive, and AWS Glue Data Catalogs
- Business Intelligence (BI), dashboards, and data visualization sources such as Looker, Tableau, and Microsoft Power BI
- ETL sources, such as Apache Spark, Apache Airflow, Apache NiFi, and dbt
DataHub on AWS
DataHub’s convenient AWS setup guide covers options to deploy DataHub to AWS. For this post, I have hosted DataHub on Kubernetes, using Amazon Elastic Kubernetes Service (Amazon EKS). Alternately, you could choose Google Kubernetes Engine (GKE) on Google Cloud or Azure Kubernetes Service (AKS) on Microsoft Azure.
Conveniently, DataHub offers a Helm chart, making deployment to Kubernetes straightforward. Furthermore, Helm charts are easily integrated with popular CI/CD tools. For this post, I’ve used ArgoCD, the declarative GitOps continuous delivery tool for Kubernetes, to deploy the DataHub Helm charts to Amazon EKS.

According to the documentation, DataHub consists of four main components: GMS, MAE Consumer (optional), MCE Consumer (optional), and Frontend. Kubernetes deployment for each of the components is defined as sub-charts under the main DataHub Helm chart.
External Storage Layer Dependencies
Four external storage layer dependencies power the main DataHub components: Kafka, Local DB (MySQL, Postgres, or MariaDB), Search Index (Elasticsearch), and Graph Index (Neo4j or Elasticsearch). DataHub has provided a separate DataHub Prerequisites Helm chart for the dependencies. The dependencies must be deployed before deploying DataHub.
Alternately, you can substitute AWS managed services for the external storage layer dependencies, which is also detailed in the Deploying to AWS documentation. AWS managed service dependency substitutions include Amazon RDS for MySQL, Amazon OpenSearch (fka Amazon Elasticsearch), and Amazon Managed Streaming for Apache Kafka (Amazon MSK). According to DataHub, support for using AWS Neptune as the Graph Index is coming soon.
DataHub CLI and Plug-ins
DataHub comes with the datahub
CLI, allowing you to perform many common operations on the command line. You can install and use the DataHub CLI within your development environment or integrate it with your CI/CD tooling.

DataHub uses a plugin architecture. Plugins allow you to install only the datasource dependencies you need. For example, if you want to ingest metadata from Amazon Athena, just install the Athena plugin: pip install 'acryl-datahub[athena]'
. DataHub Source, Sink, and Transformer plugins can be displayed using the datahub check plugins
CLI command.


Secure Metadata Ingestion
Often, datasources are not externally accessible for security reasons. Further, many datasources may not be accessible to individual users, especially in higher environments like UAT, Staging, and Production. They are only accessible to applications or CI/CD tooling. To overcome these limitations when extracting metadata with DataHub, I prefer to perform my DataHub-related development and testing locally but execute all DataHub ingestion securely on AWS.
In my local development environment, I use JetBrains PyCharm to author the Python and YAML-based DataHub configuration files and ingestion pipeline recipes, then commit those files to git and push them to a private GitHub repository. Finally, I use GitHub Actions to test DataHub files.
To run DataHub ingestion jobs and push the results to DataHub running in Kubernetes on Amazon EKS, I have built a custom Python-based Docker container. The container runs the DataHub CLI, required DataHub plugins, and any additional Python dependencies. The container’s pod has the appropriate AWS IAM permissions, using IAM Roles for Service Accounts (IRSA), to securely access datasources to ingest and the DataHub application.
Schedule and Monitor Pipelines
Scheduling and managing multiple metadata ingestion jobs on AWS is best handled with Apache Airflow with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Ingestion jobs run as Airflow DAG tasks, which call the EKS-based DataHub CLI container. With MWAA, datasource connections, credentials, and other sensitive configurations can be kept secure and not be exposed externally or in plain text.
When running the ingestion pipelines on AWS with DataHub, all communications between AWS-based datasources, ingestion jobs running in Airflow, and DataHub, should use secure private IP addressing and DNS resolution instead of transferring metadata over the Internet. Make sure to create all the necessary VPC peering connections, network route table configurations, and VPC endpoints to connect all relevant services.
SaaS services such as Snowflake or MongoDB Atlas, services provided by other Cloud Service Providers such as Google Cloud and Microsoft Azure, and datasources in corporate datasources require alternate networking and security strategies to access metadata securely.

Markup or Code?
According to the documentation, a DataHub recipe is a configuration file that tells ingestion scripts where to pull data from (source) and where to put it (sink). Recipes normally contain a source
, sink
, and transformers
configuration section. Mark-up language-based job automation written in YAML, JSON, or Domain Specific Languages (DSLs) is often an alternative to writing code. DataHub recipes can be written in YAML. The example recipe shown below is used to ingest metadata from an Amazon RDS for PostgreSQL database, running on AWS.
YAML-based recipes can also use automatic environment variable expansion for convenience, automation, and security. It is considered best practice to secure sensitive configuration values, such as database credentials, in a secure location and reference them as environment variables. For example, note the server: ${DATAHUB_REST_ENDPOINT}
entry in the sink
section below. The DATAHUB_REST_ENDPOINT
environment variable is set ahead of time and re-used for all ingestion jobs. Sensitive database connection information has also been variablized and stored separately.
Using Python
You can configure and run a pipeline entirely from within a custom Python script using DataHub’s Python API as an alternative to YAML. Below, we see two nearly identical ingestion recipes to the YAML above, written in Python. Writing ingestion pipeline logic programmatically gives you increased flexibility for automation, error checking, unit-testing, and notification. Below is a basic pipeline written in Python. The code is functional, but not very Pythonic, secure, scalable, or Production ready.
The second version of the same pipeline is more Production ready. The code is more Pythonic in nature and makes use of error checking, logging, and the AWS Systems Manager (SSM) Parameter Store. Like recipes written in YAML, environment variables can be used for convenience and security. In this example, commonly reused and sensitive connection configuration items have been extracted and placed in the SSM Parameter Store. Additional configuration is pulled from the environment, such as AWS Account ID and AWS Region. The script loads these values at runtime.
Sinking to DataHub
When syncing metadata to DataHub, you have two choices, the GMS REST API or Kafka. According to DataHub, the advantage of the REST-based interface is that any errors can immediately be reported. On the other hand, the advantage of the Kafka-based interface is that it is asynchronous and can handle higher throughput. For this post, I am DataHub’s REST API.


Column-level Metadata
In addition to column names and data types, it is possible to extract column descriptions and key types from certain datasources. Column descriptions, tags, and glossary terms can also be input through the DataHub UI. Below, we see an example of an Amazon Redshift fact table, whose table and column descriptions were ingested as part of the metadata.

Business Glossary
DataHub can assign business glossary terms to entities. The DataHub Business Glossary plugin pulls business glossary metadata from a YAML-based configuration file.
Business glossary terms can be reviewed in the Glossary Terms tab of the DataHub’s UI. Below, we see the three terms associated with the Classification
glossary node: Confidential
, HighlyConfidential
, and Sensitive
.

We can search for entities inventoried in DataHub using their assigned business glossary terms.

Finally, we see an example of an AWS Athena data catalog table with business glossary terms applied to columns within the table’s schema.

SQL-based Profiler
DataHub also can extract statistics about entities in DataHub using the SQL-based Profiler. According to the DataHub documentation, the Profiler can extract the following:
- Row and column counts for each table
- Column null counts and proportions
- Column distinct counts and proportions
- Column min, max, mean, median, standard deviation, quantile values
- Column histograms or frequencies of unique values
In addition, we can also track the historical stats for each profiled entity each time metadata is ingested.


Data Lineage
DataHub’s data lineage features allow us to view upstream and downstream relationships between different types of entities. DataHub can trace lineage across multiple platforms, datasets, pipelines, charts, and dashboards.
Below, we see a simple example of dataset entity-to-entity lineage in Amazon Redshift and then Apache Spark on Amazon EMR. The fact table has a downstream relationship to four database views. The views are based on SQL queries that include the upstream table as a datasource.


DataHub Analytics
DataHub provides basic metadata quality and usage analytics in the DataHub UI: user activity, counts of datasource types, business glossary terms, environments, and actions.


Conclusion
In this post, we explored the features of a data catalog and learned about some of the leading commercial and open-source data catalogs. Next, we learned how DataHub could collect, organize, enrich, and search metadata across multiple datasources. Lastly, we discovered how easy it is to catalog metadata from datasources spread across multiple CSP, SaaS providers, and corporate data centers, and centralize those results in DataHub.
In addition to the basic features reviewed in this post, DataHub offers a growing number of additional capabilities, including GraphQL and Timeline APIs, robust authentication and authorization, application monitoring observability, and Great Expectations integration. All these qualities make DataHub an excellent choice for a data catalog.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Hydrating a Data Lake using Log-based Change Data Capture (CDC) with Debezium, Apicurio, and Kafka Connect on AWS
Posted by Gary A. Stafford in Analytics, AWS, Big Data, Cloud, Kubernetes on August 21, 2021
Import data from Amazon RDS into Amazon S3 using Amazon MSK, Apache Kafka Connect, Debezium, Apicurio Registry, and Amazon EKS
Introduction
In the last post, Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS, we utilized Kafka Connect to export data from an Amazon RDS for PostgreSQL relational database and import the data into a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 was converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect. To improve data freshness, as data was added or updated in the PostgreSQL database, Kafka Connect automatically detected those changes and streamed them into the data lake using query-based Change Data Capture (CDC).
This follow-up post will examine log-based CDC as a marked improvement over query-based CDC to continuously stream changes from the PostgreSQL database to the data lake. We will perform log-based CDC using Debezium’s Kafka Connect Source Connector for PostgreSQL rather than Confluent’s Kafka Connect JDBC Source connector, which was used in the previous post for query-based CDC. We will store messages as Apache Avro in Kafka running on Amazon Managed Streaming for Apache Kafka (Amazon MSK). Avro message schemas will be stored in Apicurio Registry. The schema registry will run alongside Kafka Connect on Amazon Elastic Kubernetes Service (Amazon EKS).

Change Data Capture
According to Gunnar Morling, Principal Software Engineer at Red Hat, who works on the Debezium and Hibernate projects, and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

You can find another excellent explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.
Query-based vs. Log-based CDC
To demonstrate the high-level differences between query-based and log-based CDC, let’s examine the results of a simple SQL UPDATE statement captured with both CDC methods.
UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;
Here is how that change is represented as a JSON message payload using the query-based CDC method described in the previous post.
{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}
Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.
{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}
Avro and Schema Registry
Apache Avro is a compact, fast, binary data format, according to the documentation. Avro relies on schemas. When Avro data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic scripting languages since data, together with its schema, is fully self-describing.
We can decouple the data from its schema by using schema registries like the Confluent Schema Registry or Apicurio Registry. According to Apicurio, in a messaging and event streaming architecture, data published to topics and queues must often be serialized or validated using a schema (e.g., Apache Avro, JSON Schema, or Google Protocol Buffers). Of course, schemas can be packaged in each application. Still, it is often a better architectural pattern to register schemas in an external system [schema registry] and then reference them from each application.
It is often a better architectural pattern to register schemas in an external system and then reference them from each application.
Using Debezium’s PostgreSQL source connector, we will store changes from the PostgreSQL database’s write-ahead log (WAL) as Avro in Kafka, running on Amazon MSK. The message’s schema will be stored separately in Apicurio Registry as opposed to with the message, thus reducing the size of the messages in Kafka and allowing for schema validation and schema evolution.

pagila.public.film
schemaDebezium
Debezium, according to their website, continuously monitors your databases and lets any of your applications stream every row-level change in the same order they were committed to the database. Event streams can be used to purge caches, update search indexes, generate derived views and data, and keep other data sources in sync. Debezium is a set of distributed services that capture row-level changes in your databases. Debezium records all row-level changes committed to each database table in a transaction log. Then, each application reads the transaction logs they are interested in, and they see all of the events in the same order in which they occurred. Debezium is built on top of Apache Kafka and integrates with Kafka Connect.
The latest version of Debezium includes support for monitoring MySQL database servers, MongoDB replica sets or sharded clusters, PostgreSQL servers, and SQL Server databases. We will be using Debezium’s PostgreSQL connector to capture row-level changes in the Pagila PostgreSQL database. According to Debezium’s documentation, the first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content committed to the database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.
Prerequisites
Similar to the previous post, this post will focus on data movement, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:
- Amazon RDS for PostgreSQL instance (data source);
- Amazon S3 bucket (data sink);
- Amazon MSK cluster;
- Amazon EKS cluster;
- Connectivity between the Amazon RDS instance and Amazon MSK cluster;
- Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
- Ensure the Amazon MSK Configuration has
auto.create.topics.enable=true
. This setting isfalse
by default; - IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);
As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1
, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.
Source Code
All source code for this post and the previous post, including the Kafka Connect and connector configuration files and the Helm charts, is open-sourced and located on GitHub.GitHub — garystafford/kafka-connect-msk-demo: For the post, Hydrating a Data Lake using Change Data…
For the post, Hydrating a Data Lake using Change Data Capture (CDC), Apache Kafka, and Kubernetes on AWS — GitHub …github.com
Authentication and Authorization
Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK:Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM
Securely decoupling Go-based microservices on Amazon EKS using Amazon MSK with IRSA, SASL/SCRAM, and data encryptionitnext.io
Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (known as IRSA) allows EKS to access MSK and S3 using IAM (see more details below).
Sample PostgreSQL Database
For this post, we will continue to use PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytical query costs.

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.
Last Updated Trigger
Each table in the Pagila database has a last_update
field. A simplistic way to detect changes in the Pagila database is to use the last_update
field. This is a common technique to determine if and when changes were made to data using query-based CDC, as demonstrated in the previous post. As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update
field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.
CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();
Kafka Connect and Schema Registry
There are several options for deploying and managing Kafka Connect, the Kafka management APIs and command-line tools, and the Apicurio Registry. I prefer deploying a containerized solution to Kubernetes on Amazon EKS. Some popular containerized Kafka options include Strimzi, Confluent for Kubernetes (CFK), and Debezium. Another option is building your own Docker Image using the official Apache Kafka binaries. I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries for this post. I then installed the necessary Confluent and Debezium connectors and their associated Java dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf container, building your own image will teach you how Kafka, Kafka Connect, and Debezium work, in my opinion.
In regards to the schema registry, both Confluent and Apicurio offer containerized solutions. Apicurio has three versions of their registry, each with a different storage mechanism: in-memory, SQL, and Kafka. Since we already have an existing Amazon RDS PostgreSQL instance as part of the demonstration, I chose the Apicurio SQL-based registry Docker Image for this post, apicurio/apicurio-registry-sql:2.0.1.Final
.
If you choose to use the same Kafka Connect and Apicurio solution I used in this post, a Helm Chart is included in the post’s GitHub repository, kafka-connect-msk-v2
. The Helm chart will deploy a single Kubernetes pod to the kafka
Namespace on Amazon EKS. The pod comprises both the Kafka Connect and Apicurio Registry containers. The deployment is intended for demonstration purposes and is not designed for use in Production.
apiVersion: v1
kind: Service
metadata:
name: kafka-connect-msk
spec:
type: NodePort
selector:
app: kafka-connect-msk
ports:
- port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.1.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent
- image: apicurio/apicurio-registry-sql:2.0.1.Final
name: apicurio-registry-mem
imagePullPolicy: IfNotPresent
env:
- name: REGISTRY_DATASOURCE_URL
value: jdbc:postgresql://your-pagila-database-url.us-east-1.rds.amazonaws.com:5432/apicurio-registry
- name: REGISTRY_DATASOURCE_USERNAME
value: apicurio_registry
- name: REGISTRY_DATASOURCE_PASSWORD
value: 1L0v3Kafka!
Before deploying the chart, create a new PostgreSQL database, user, and grants on your RDS instance for the Apicurio Registry to use for storage:
CREATE DATABASE "apicurio-registry";
CREATE USER apicurio_registry WITH PASSWORD '1L0v3KafKa!';
GRANT CONNECT, CREATE ON DATABASE "apicurio-registry" to apicurio_registry;
Update the Helm chart’s value.yaml
file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName
) and your RDS URL (registryDatasourceUrl
). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on Amazon MSK and to your S3 bucket from Kafka Connect running on Amazon EKS.
Once the variables are updated, use the following command to deploy the Helm chart:
helm install kafka-connect-msk-v2 ./kafka-connect-msk-v2 \
--namespace $NAMESPACE --create-namespace
Confirm the chart was installed successfully by checking the pod’s status:
kubectl get pods -n kafka -l app=kafka-connect-msk

If you have any issues with either container while deploying, review the individual container’s logs:
export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl logs $KAFKA_CONTAINER -n kafka kafka-connect-msk
kubectl logs $KAFKA_CONTAINER -n kafka apicurio-registry-mem
Kafka Connect
Get a shell to the running Kafka Connect container using the kubectl exec
command:
export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -c kafka-connect-msk -- bash

Confirm Access to Registry from Kafka Connect
If the Helm Chart was deployed successfully, you should now observe 11 new tables in the public
schema of the new apicurio-registry
database. Below, we see the new database and tables, as shown in pgAdmin.

Confirm the registry is running and accessible from the Kafka Connect container by calling the registry’s system/info
REST API endpoint:
curl -s http://localhost:8080/apis/registry/v2/system/info | jq

The Apicurio Registry’s Service targets TCP port 8080. The Service is exposed on the Kubernetes worker node’s external IP address at a static port, the NodePort
. To get the NodePort
of the service, use the following command:
kubectl describe services kafka-client-msk -n kafka
To access the Apicurio Registry’s web-based UI, add the NodePort
to the Security Group of the EKS nodes with the source being your IP address, a /32
CIDR block.
To get the external IP address (EXTERNAL-IP
) of any Amazon EKS worker nodes, use the following command:
kubectl get nodes -o wide
Use the <NodeIP>:<NodePort>
combination to access the UI from your web browser, for example, http://54.237.41.128:30433
. The registry will be empty at this point in the demonstration.

Configure Bootstrap Brokers
Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone or distributed modes. Since we will be using Kafka Connect’s distributed mode, modify the config/connect-distributed.properties
file. A complete sample of the configuration file I used in this post is shown below.
Kafka Connect and the schema registry will run on Amazon EKS, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers
property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:
# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')
Update the config/connect-distributed.properties
file.
# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
For convenience when executing Kafka commands, set the BBROKERS
environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:
export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"
Confirm Access to Amazon MSK from Kafka Connect
To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties
You can also try listing the existing Kafka consumer groups:
bin/kafka-consumer-groups.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties
If either of these fails, you likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.
Once configured, start Kafka Connect as a background process.
Kafka Connect
bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
To confirm Kafka Connect starts properly, immediately tail the connect.log
file. The log will capture any startup errors for troubleshooting.
tail -f logs/connect.log

You can also examine the background process with the ps
command to confirm Kafka Connect is running. Note the process with PID 4915, shown below. Use the kill
command along with the PID to stop Kafka Connect if necessary.

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, when Kafka Connect starts up. The topics are defined in the config/connect-distributed.properties
file: connect-configs
, connect-offsets
, and connect-status
. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-
Kafka Connect Connectors
This post demonstrates the use of a set of Kafka Connect source and sink connectors. The source connector is based on the Debezium Source Connector for PostgreSQL and the Apicurio Registry. The sink connector is based on the Confluent Amazon S3 Sink connector and the Apicurio Registry.
Connector Source
Create or modify the file, config/debezium_avro_source_connector_postgresql_05.json
. Update lines 3–6, as shown below, to reflect your RDS instance connection details.
The source connector exports existing data and ongoing changes from six related tables within the Pagila database’s public
schema: actor
, film
, film_actor
, category
, film_category
, and language
. Data will be imported into a corresponding set of six new Kafka topics: pagila.public.actor
, pagila.public.film
, and so forth. (see line 9, above).

Data from the tables is stored in Apache Avro format in Kafka, and the schemas are stored separately in the Apicurio Registry (lines 11–18, above).
Connector Sink
Create or modify the file, config/s3_sink_connector_05_debezium_avro.json
. Update line 7, as shown below to reflect your Amazon S3 bucket’s name.
The sink connector flushes new data to S3 every 300 records or 60 seconds from the six Kafka topics (lines 4–5, 9–10, above). The schema for the data being written to S3 is extracted from the Apicurio Registry (lines 17–24, above).
The sink connector optimizes the raw data imported into S3 for downstream processing by writing GZIP-compressed Apache Parquet files to Amazon S3. Using Parquet’s columnar file format and file compression should help optimize ELT against the raw data once in S3 (lines 12–13, above).
Deploy Connectors
Deploy the source and sink connectors using the Kafka Connect REST Interface:
curl -s -d @"config/debezium_avro_source_connector_postgresql_05.json
" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/debezium_avro_source_connector_postgresql_05
/config | jq
curl -s -d @"config/s3_sink_connector_05_debezium_avro
.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_05_debezium_avro
/config | jq
Confirming the Deployment
Use the following commands to confirm the new set of connectors are deployed and running correctly.
curl -s -X GET http://localhost:8083/connectors | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/debezium_avro_source_connector_postgresql_05
/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_05_debezium_avro
/status | jq

The items stored in Apicurio Registry, such as event schemas and API designs, are known as registry artifacts. If we re-visit the Apicurio Registry’s UI, we should observe 12 artifacts — a ‘key’ and ‘value’ artifact for each of the six tables we exported from the Pagila database.

Examing the Amazon S3, you should note six sets of S3 objects within the /topics/
object key prefix organized by topic name.

Within each topic name key, there should be a set of GZIP-compressed Parquet files.

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the AWS CLI with the s3
API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.film/partition=0/pagila.public.film+0+0000000000.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
In the sample data below, note the metadata-rich structure of the log-based CDC messages as compared to the query-based messages we observed in the previous post:
{
"after": {
"special_features": [
"Deleted Scenes",
"Behind the Scenes"
],
"rental_duration": 6,
"rental_rate": 0.99,
"release_year": 2006,
"length": 86,
"replacement_cost": 20.99,
"rating": "PG",
"description": "A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies",
"language_id": 1,
"title": "ACADEMY DINOSAUR",
"original_language_id": null,
"last_update": "2017-09-10T17:46:03.905795Z",
"film_id": 1
},
"source": {
"schema": "public",
"sequence": "[null,\"1177089474560\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177089474560,
"name": "pagila",
"txId": 18422,
"version": "1.6.1.Final",
"ts_ms": 1629340334432,
"snapshot": "true",
"db": "pagila",
"table": "film"
},
"op": "r",
"ts_ms": 1629340334434
}
Database Changes with Log-based CDC
What happens when we change data within the tables that Debezium and Kafka Connect are monitoring? To answer this question, let’s make a few DML changes to the Pagila database: inserts, updates, and deletes:
INSERT INTO public.category (name)
VALUES ('Techno Thriller');
UPDATE public.film
SET release_year = 2021,
rental_rate = 2.99
WHERE film_id = 1;
UPDATE public.film
SET rental_duration = 3
WHERE film_id = 2;
UPDATE public.film_category
SET category_id = (
SELECT DISTINCT category_id
FROM public.category
WHERE name = 'Techno Thriller')
WHERE film_id = 3;
UPDATE public.actor
SET first_name = upper('Kate'),
last_name = upper('Winslet')
WHERE actor_id = 6;
DELETE
FROM public.film_actor
WHERE film_id = 375;
To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The Kafka Connect source connector detects changes, which are then exported from PostgreSQL to Kafka. The sink connector then writes these changes to Amazon S3.

We can view the S3 bucket, which should now have new Parquet files corresponding to our changes. For example, the two updates we made to the film record with film_id
of 1. Note the operation is an update ("op": "u"
) and the presence of the data in after
block.
{
"after": {
"special_features": [
"Deleted Scenes",
"Behind the Scenes"
],
"rental_duration": 6,
"rental_rate": 2.99,
"release_year": 2021,
"length": 86,
"replacement_cost": 20.99,
"rating": "PG",
"description": "A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies",
"language_id": 1,
"title": "ACADEMY DINOSAUR",
"original_language_id": null,
"last_update": "2021-08-19T03:19:57.073053Z",
"film_id": 1
},
"source": {
"schema": "public",
"sequence": "[\"1177693455424\",\"1177693455424\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177693471392,
"name": "pagila",
"txId": 18445,
"version": "1.6.1.Final",
"ts_ms": 1629343197100,
"snapshot": "false",
"db": "pagila",
"table": "film"
},
"op": "u",
"ts_ms": 1629343197389
}
In another example, we see the delete made in the film_actor
table, to the record with the film_id
of 375. Note the operation is a delete ("op": "d"
) and the presence of the before
block but no after
block.
{
"before": {
"last_update": "1970-01-01T00:00:00Z",
"actor_id": 5,
"film_id": 375
},
"source": {
"schema": "public",
"sequence": "[\"1177693516520\",\"1177693516520\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1177693516520,
"name": "pagila",
"txId": 18449,
"version": "1.6.1.Final",
"ts_ms": 1629343198400,
"snapshot": "false",
"db": "pagila",
"table": "film_actor"
},
"op": "d",
"ts_ms": 1629343198426
}
Debezium Event Flattening SMT
The challenge with the Debezium message structure shown above in S3 is the verbosity of the payload and the nested nature of the data. As a result, developing SQL queries against such records would be difficult. For example, given the message structure shown above, even the simplest query in Amazon Athena becomes significantly more complex:
SELECT after.actor_id, after.first_name, after.last_name, after.last_update
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY after.actor_id
ORDER BY after.last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_actor") AS x
WHERE x.row_num = 1
ORDER BY after.actor_id;
To specifically address the needs of different consumers, Debezium offers the event flattening single message transformation (SMT). The event flattening transformation is a Kafka Connect SMT. We covered Kafka Connect SMTs in the previous post. Using the event flattening SMT, we can shape the message received by Kafka to be more attuned to the specific consumers of our data lake. To implement the event flattening SMT, modify and redeploy the source connector, adding additional configuration (lines 19–23, below).
We will include the op
, db
, schema
, lsn
, and source.ts_ms
metadata fields, along with the actual record data (table
) in the transformed message. This means we have chosen to exclude all other fields from the messages. The transform will flatten the message’s nested structure.
Making this change to the message structure by adding the transformation results in new versions of the message’s schemas automatically being added to the Apicurio Registry by the source connector:

pagila.public.film
schemaAs a result of the event flattening SMT by the source connector, our message structure is significantly simplified:
{
"actor_id": 7,
"first_name": "BOB",
"last_name": "MOSTEL",
"last_update": "2021-08-19T21:01:55.090858Z",
"__op": "u",
"__db": "pagila",
"__schema": "public",
"__table": "actor",
"__lsn": 1191920555344,
"__source_ts_ms": 1629406915091,
"__deleted": "false"
}
Note the new __deleted
field, which results from lines 21–22 of the source connector configuration, shown above. Debezium keeps tombstone records for DELETE operations in the event stream and adds __deleted
, set to true
or false
. Below, we see an example of two DELETE operations on the film_actor
table.
{
"actor_id": 52,
"film_id": 376,
"last_update": "1970-01-01T00:00:00Z",
"__op": "d",
"__db": "pagila",
"__schema": "public",
"__table": "film_actor",
"__lsn": 1192390296016,
"__source_ts_ms": 1629408869556,
"__deleted": "true"
}
{
"actor_id": 60,
"film_id": 376,
"last_update": "1970-01-01T00:00:00Z",
"__op": "d",
"__db": "pagila",
"__schema": "public",
"__table": "film_actor",
"__lsn": 1192390298976,
"__source_ts_ms": 1629408869556,
"__deleted": "true"
}
Viewing Data in the Data Lake
A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

With the data crawled and cataloged in Glue, let’s perform some additional changes to the Pagila database’s film
table.
UPDATE public.film
SET release_year = 2019,
rental_rate = 3.99
WHERE film_id = 1;
UPDATE public.film
SET rental_duration = 4
WHERE film_id = 2;
UPDATE public.film
SET rental_duration = 7
WHERE film_id = 2;
INSERT INTO public.category (name)
VALUES ('Steampunk');
UPDATE public.film_category
SET category_id = (
SELECT DISTINCT category_id
FROM public.category
WHERE name = 'Steampunk')
WHERE film_id = 3;
UPDATE public.film
SET release_year = 2017,
rental_rate = 3.99
WHERE film_id = 4;
UPDATE public.film_actor
SET film_id = 100
WHERE film_id = 5;
UPDATE public.film_category
SET film_id = 100
WHERE film_id = 5;
UPDATE public.inventory
SET film_id = 100
WHERE film_id = 5;
DELETE
FROM public.film
WHERE film_id = 5;
We should be able to almost immediately observe these database changes by executing a query with Amazon Athena. The changes are propagated from PostgreSQL to Kafka to S3 within seconds or less by Kafka Connect based on the connector configurations. Performing a typical query in Athena will return all of the original records as well as any updates or deletes we made as duplicate records (records identical film_id
primary keys).
SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp
FROM "pagila_kafka_connect"."pagila_public_film"
ORDER BY film_id, timestamp

Note the original records as well as each change we made earlier. The timestamp
field, derived from the __source_ts_ms
metadata field represents the server time at which the transaction was committed, according to Debezium. Also, note the records with their film_id
of 5 in the query results — the record we deleted from the film
table. The field values are (mostly) null in the latest record, except for any fields with default values in the Pagila table definition. If there are default values (e.g., rental_duration smallint default 3 not null
or rental_rate numeric(4,2) default 4.99 not null
) set on a field, those values end up in the deleted record when using the event flattening SMT. It doesn’t negatively impact anything except adding additional size to a tombstone record (unclear if this is expected behavior with Debezium or an artifact of the WAL entry).
To view only the most current data and ignore deleted records, we can use the ROW_NUMBER()
function and add a predicate to check the value of the __deleted
field:
SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY film_id
ORDER BY __source_ts_ms DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_film") AS x
WHERE x.row_num = 1
AND __deleted != 'true'
ORDER BY film_id

Now we only see the latest records, including the removal of any deleted records. Although this method is effective for a single set of records, the query is far too intricate to apply to complex joins and aggregations, in my opinion.
Data Movement
Using Amazon Athena, we can easily write the results of our ROW_NUMBER()
query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT
(CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT
statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by the movie rating
, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.
CREATE TABLE pagila_kafka_connect.pagila_public_film_refined
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['rating'],
external_location='s3://my-s3-table/refined/film/'
) AS
SELECT film_id, title, release_year, rental_rate, rental_duration,
date_format(from_unixtime(__source_ts_ms/1000), '%Y-%m-%d %h:%i:%s') AS timestamp, rating
FROM
(SELECT *,
ROW_NUMBER()
OVER ( PARTITION BY film_id
ORDER BY __source_ts_ms DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_public_film") AS x
WHERE x.row_num = 1
AND __deleted = 'false'
ORDER BY film_id

Examing the Amazon S3 bucket, again, you should observe a new set of S3 objects within the /refined/film/
key path, partitioned by rating
.

We should also see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS statement. We can perform additional queries on the refined dataset.
SELECT *
FROM "pagila_kafka_connect"."pagila_public_film_refined"
ORDER BY film_id

CRUD Operations in the Data Lake
To fully take advantage of CDC and maximize the freshness of data in the data lake, we would need to also adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level upserts and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.
Conclusion
This post explored how log-based CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon MSK, Amazon EKS, Apache Kafka Connect, Debezium, Apache Avro, and Apicurio Registry. In a subsequent post, we will learn how data lake file formats like Apache Hudi, Apache Iceberg, and Delta Lake, along with Apache Spark Structured Streaming, can help us actively manage the data in our data lake.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Hydrating a Data Lake using Query-based CDC with Apache Kafka Connect and Kubernetes on AWS
Posted by Gary A. Stafford in AWS, Cloud, Enterprise Software Development, Kubernetes, Serverless on August 11, 2021
Import data from an Amazon RDS database into an Amazon S3-based data lake using Amazon EKS, Amazon MSK, and Apache Kafka Connect
Introduction
A data lake, according to AWS, is a centralized repository that allows you to store all your structured and unstructured data at any scale. Data is collected from multiple sources and moved into the data lake. Once in the data lake, data is organized, cataloged, transformed, enriched, and converted to common file formats, optimized for analytics and machine learning.
One of an organization’s first challenges when building a data lake is how to continually import data from different data sources, such as relational and non-relational database engines, enterprise ERP, SCM, CRM, and SIEM software, flat-files, messaging platforms, IoT devices, and logging and metrics collection systems. Each data source will have its own unique method of connectivity, security, data storage format, and data export capabilities. There are many closed- and open-source tools available to help extract data from different data sources.
A popular open-source tool is Kafka Connect, part of the Apache Kafka ecosystem. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. Kafka Connect makes it simple to quickly define connectors that move large collections of data into and out of Kafka.
In the following post, we will learn how to use Kafka Connect to export data from our data source, an Amazon RDS for PostgreSQL relational database, into Kafka. We will then export that data from Kafka into our data sink — a data lake built on Amazon Simple Storage Service (Amazon S3). The data imported into S3 will be converted to Apache Parquet columnar storage file format, compressed, and partitioned for optimal analytics performance, all using Kafka Connect.
Best of all, to maintain data freshness of the data lake, as data is added or updated in PostgreSQL, Kafka Connect will automatically detect those changes and stream those changes into the data lake. This process is commonly referred to as Change Data Capture (CDC).

Change Data Capture
According to Gunnar Morling, Principal Software Engineer at Red Hat who works on the Debezium and Hibernate projects and well-known industry speaker, there are two types of Change Data Capture — Query-based and Log-based CDC. Gunnar detailed the differences between the two types of CDC in his talk at the Joker International Java Conference in February 2021, Change data capture pipelines with Debezium and Kafka Streams.

You can find another good explanation of CDC in the recent post by Lewis Gavin of Rockset, Change Data Capture: What It Is and How to Use It.
Query-based vs. Log-based CDC
To effectively demonstrate the difference between query-based and log-based CDC, examine the results of a SQL UPDATE statement, captured with both methods.
UPDATE public.address
SET address2 = 'Apartment #1234'
WHERE address_id = 105;
Here is how the change is represented as a JSON message payload using the query-based CDC method described in this post.
{
"address_id": 105,
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"district": "Asir",
"city_id": 2,
"postal_code": "77459",
"phone": "196568435814",
"last_update": "2021-08-13T00:43:38.508Z"
}
Here is how the same change is represented as a JSON message payload using log-based CDC with Debezium. Note the metadata-rich structure of the log-based CDC message as compared to the query-based message.
{
"after": {
"address": "733 Mandaluyong Place",
"address2": "Apartment #1234",
"phone": "196568435814",
"district": "Asir",
"last_update": "2021-08-13T00:43:38.508453Z",
"address_id": 105,
"postal_code": "77459",
"city_id": 2
},
"source": {
"schema": "public",
"sequence": "[\"1090317720392\",\"1090317720392\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1090317720624,
"name": "pagila",
"txId": 16973,
"version": "1.6.1.Final",
"ts_ms": 1628815418508,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "u",
"ts_ms": 1628815418815
}
In an upcoming post, we will explore Debezium along with Apache Arvo and a schema registry to build a log-based CDC solution using PostgreSQL’s write-ahead log (WAL). In this post, we will examine query-based CDC using the ‘update timestamp’ technique.
Kafka Connect Connectors
In this post, we will use source and sink connectors from Confluent. Confluent is the undisputed leader in providing enterprise-grade managed Kafka through their Confluent Cloud and Confluent Platform products. Confluent offers dozens of source and sink connectors that cover the most popular data sources and sinks. Connectors used in this post will include:
- Confluent’s Kafka Connect JDBC Source connector imports data from any relational database with a JDBC driver into an Apache Kafka topic. The Kafka Connect JDBC Sink connector exports data from Kafka topics to any relational database with a JDBC driver.
- Confluent’s Kafka Connect Amazon S3 Sink connector exports data from Apache Kafka topics to S3 objects in either Avro, Parquet, JSON, or Raw Bytes.
Prerequisites
This post will focus on data movement with Kafka Connect, not how to deploy the required AWS resources. To follow along with the post, you will need the following resources already deployed and configured on AWS:
- Amazon RDS for PostgreSQL instance (data source);
- Amazon S3 bucket (data sink);
- Amazon MSK cluster;
- Amazon EKS cluster;
- Connectivity between the Amazon RDS instance and Amazon MSK cluster;
- Connectivity between the Amazon EKS cluster and Amazon MSK cluster;
- Ensure the Amazon MSK Configuration has
auto.create.topics.enable=true
. This setting isfalse
by default; - IAM Role associated with Kubernetes service account (known as IRSA) that will allow access from EKS to MSK and S3 (see details below);
As shown in the architectural diagram above, I am using three separate VPCs within the same AWS account and AWS Region, us-east-1
, for Amazon RDS, Amazon EKS, and Amazon MSK. The three VPCs are connected using VPC Peering. Ensure you expose the correct ingress ports, and the corresponding CIDR ranges on your Amazon RDS, Amazon EKS, and Amazon MSK Security Groups. For additional security and cost savings, use a VPC endpoint to ensure private communications between Amazon EKS and Amazon S3.
Source Code
All source code for this post, including the Kafka Connect configuration files and the Helm chart, is open-sourced and located on GitHub.
Authentication and Authorization
Amazon MSK provides multiple authentication and authorization methods to interact with the Apache Kafka APIs. For example, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients and Apache Kafka ACLs to allow or deny actions. In my last post, I demonstrated the use of SASL/SCRAM and Kafka ACLs with Amazon MSK, Securely Decoupling Applications on Amazon EKS using Kafka with SASL/SCRAM.
Any MSK authentication and authorization should work with Kafka Connect, assuming you correctly configure Amazon MSK, Amazon EKS, and Kafka Connect. For this post, we are using IAM Access Control. An IAM Role associated with a Kubernetes service account (IRSA) allows EKS to access MSK and S3 using IAM (see more details below).
Sample PostgreSQL Database
There are many sample PostgreSQL databases we could use to explore Kafka Connect. One of my favorite, albeit a bit dated, is PostgreSQL’s Pagila database. The database contains simulated movie rental data. The dataset is fairly small, making it less ideal for ‘big data’ use cases but small enough to quickly install and minimize data storage and analytics costs.

Before continuing, create a new database on the Amazon RDS PostgreSQL instance and populate it with the Pagila sample data. A few people have posted updated versions of this database with easy-to-install SQL scripts. Check out the Pagila scripts provided by Devrim Gündüz on GitHub and also by Robert Treat on GitHub.
Last Updated Trigger
Each table in the Pagila database has a last_update
field. A convenient way to detect changes in the Pagila database, and ensure those changes make it from RDS to S3, is to have Kafka Connect use the last_update
field. This is a common technique to determine if and when changes were made to data using query-based CDC.
As changes are made to records in these tables, an existing database function and a trigger to each table will ensure the last_update
field is automatically updated to the current date and time. You can find further information on how the database function and triggers work with Kafka Connect in this post, kafka connect in action, part 3, by Dominick Lombardo.
CREATE OR REPLACE FUNCTION update_last_update_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_last_update_column_address
BEFORE UPDATE
ON address
FOR EACH ROW
EXECUTE PROCEDURE update_last_update_column();
Kubernetes-based Kafka Connect
There are several options for deploying and managing Kafka Connect and other required Kafka management tools to Kubernetes on Amazon EKS. Popular solutions include Strimzi and Confluent for Kubernetes (CFK) or building your own Docker Image using the official Apache Kafka binaries. For this post, I chose to build my own Kafka Connect Docker Image using the latest Kafka binaries. I then installed Confluent’s connectors and their dependencies into the Kafka installation. Although not as efficient as using an off-the-shelf OSS container, building your own image can really teach you how Kafka and Kafka Connect work, in my opinion.
If you chose to use the same Kafka Connect Image used in this post, a Helm Chart is included in the post’s GitHub repository. The Helm chart will deploy a single Kubernetes pod to the kafka
Namespace on Amazon EKS.
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-msk
labels:
app: kafka-connect-msk
component: service
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-connect-msk
component: service
template:
metadata:
labels:
app: kafka-connect-msk
component: service
spec:
serviceAccountName: kafka-connect-msk-iam-serviceaccount
containers:
- image: garystafford/kafka-connect-msk:1.0.0
name: kafka-connect-msk
imagePullPolicy: IfNotPresent
Before deploying the chart, update the value.yaml
file with the name of your Kubernetes Service Account associated with the Kafka Connect pod (serviceAccountName
). The IAM Policy attached to the IAM Role associated with the pod’s Service Account should provide sufficient access to Kafka running on the Amazon MSK cluster from EKS. The policy should also provide access to your S3 bucket, as detailed here by Confluent. Below is an example of an (overly broad) IAM Policy that would allow full access to any Kafka clusters running on MSK and to S3 from Kafka Connect running on EKS.
Once the Service Account variable is updated, use the following command to deploy the Helm chart:
helm install kafka-connect-msk ./kafka-connect-msk \
--namespace $NAMESPACE --create-namespace
To get a shell to the running Kafka Connect container, use the following kubectl exec
command:
export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-connect-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash

Configure Bootstrap Brokers
Before starting Kafka Connect, you will need to modify Kafka Connect’s configuration file. Kafka Connect is capable of running workers in standalone and distributed modes. Since we will use Kafka Connect’s distributed mode, modify the config/connect-distributed.properties
file. A complete sample of the configuration file I used in this post is shown below.
Kafka Connect will run within the pod’s container, while Kafka and Apache ZooKeeper run on Amazon MSK. Update the bootstrap.servers
property to reflect your own comma-delimited list of Amazon MSK Kafka Bootstrap Brokers. To get the list of the Bootstrap Brokers for your Amazon MSK cluster, use the AWS Management Console, or the following AWS CLI commands:
# get the msk cluster's arn
aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn'
# use msk arn to get the brokers
aws kafka get-bootstrap-brokers --cluster-arn your-msk-cluster-arn
# alternately, if you only have one cluster, then
aws kafka get-bootstrap-brokers --cluster-arn $(
aws kafka list-clusters | jq -r '.ClusterInfoList[0].ClusterArn')
Update the config/connect-distributed.properties
file.
# ***** CHANGE ME! *****
bootstrap.servers=b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
# kafka connect auth using iam
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect producer auth using iam
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# kafka connect consumer auth using iam
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
For convenience when executing Kafka commands, set the BBROKERS
environment variable to the same comma-delimited list of Kafka Bootstrap Brokers, for example:
export BBROKERS="b-1.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098,b-2.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098, b-3.your-cluster.123abc.c2.kafka.us-east-1.amazonaws.com:9098"
Confirm Access to Amazon MSK from Kafka Connect
To confirm you have access to Kafka running on Amazon MSK, from the Kafka Connect container running on Amazon EKS, try listing the exiting Kafka topics:
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties
You can also try listing the existing Kafka consumer groups:
bin/kafka-consumer-groups.sh --list \ --bootstrap-server $BBROKERS \ --command-config config/client-iam.properties
If either of these fails, you will likely have networking or security issues blocking access from Amazon EKS to Amazon MSK. Check your VPC Peering, Route Tables, IAM/IRSA, and Security Group ingress settings. Any one of these items can cause communications issues between the container and Kafka running on Amazon MSK.
Kafka Connect
I recommend starting Kafka Connect as a background process using either method shown below.
bin/connect-distributed.sh \
config/connect-distributed.properties > /dev/null 2>&1 &
# alternately use nohup
nohup bin/connect-distributed.sh \
config/connect-distributed.properties &
To confirm Kafka Connect started properly, immediately tail the connect.log
file. The log will capture any startup errors for troubleshooting.
tail -f logs/connect.log

You can also examine the background process with the ps
command to confirm Kafka Connect is running. Note the process with PID 4915, below. Use the kill
command along with the PID to stop Kafka Connect if necessary.

If configured properly, Kafka Connect will create three new topics, referred to as Kafka Connect internal topics, the first time it starts up, as defined in the config/connect-distributed.properties
file: connect-configs
, connect-offsets
, and connect-status
. According to Confluent, Connect stores connector and task configurations, offsets, and status in these topics. The Internal topics must have a high replication factor, a compaction cleanup policy, and an appropriate number of partitions. These new topics can be confirmed using the following command.
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep connect-
Kafka Connect Connectors
This post demonstrates three progressively more complex Kafka Connect source and sink connectors. Each will demonstrate different connector capabilities to import/export and transform data between Amazon RDS for PostgreSQL and Amazon S3.
Connector Source #1
Create a new file (or modify the existing file if using my Kafka Connect container) named config/jdbc_source_connector_postgresql_00.json
. Modify lines 3–5, as shown below, to reflect your RDS instance’s JDBC connection details.
This first Kafka Connect source connector uses Confluent’s Kafka Connect JDBC Source connector (io.confluent.connect.jdbc.JdbcSourceConnector
) to export data from RDS with a JDBC driver and import that data into a series of Kafka topics. We will be exporting data from three tables in Pagila’s public
schema: address
, city
, and country
. We will write that data to a series of topics, arbitrarily prefixed with database name and schema, pagila.public.
. The source connector will create the three new topics automatically: pagila.public.address
, pagila.public.city
, and pagila.public.country
.
Note the connector’s mode
property value is set to timestamp
, and the last_update
field is referenced in the timestamp.column.name
property. Recall we added the database function and triggers to these three tables earlier in the post, which will update the last_update
field whenever a record is created or updated in the Pagila database. In addition to an initial export of the entire table, the source connector will poll the database every 5 seconds (poll.interval.ms
property), looking for changes that are newer than the most recently exported last_modified
date. This is accomplished by the source connector, using a parameterized query, such as:
SELECT *
FROM "public"."address"
WHERE "public"."address"."last_update" > ?
AND "public"."address"."last_update" < ?
ORDER BY "public"."address"."last_update" ASC
Connector Sink #1
Next, create and configure the first Kafka Connect sink connector. Create a new file or modify config/s3_sink_connector_00.json
. Modify line 7, as shown below to reflect your Amazon S3 bucket name.
This first Kafka Connect sink connector uses Confluent’s Kafka Connect Amazon S3 Sink connector (io.confluent.connect.s3.S3SinkConnector
) to export data from Kafka topics to Amazon S3 objects in JSON format.
Deploy Connectors #1
Deploy the source and sink connectors using the Kafka Connect REST Interface. Many tutorials demonstrate a POST
method against the /connectors
endpoint. However, this then requires a DELETE
and an additional POST
to update the connector. Using a PUT
against the /config
endpoint, you can update the connector without first issuing a DELETE
.
curl -s -d @"config/jdbc_source_connector_postgresql_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/config | jq
curl -s -d @"config/s3_sink_connector_00.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_00/config | jq

You can confirm the source and sink connectors are deployed and running using the following commands:
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_00/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_00/status | jq

Errors preventing the connector from starting correctly will be displayed using the /status
endpoint, as shown in the example below. In this case, the Kubernetes Service Account associated with the pod lacked the proper IAM permissions to the Amazon S3 target bucket.

Confirming Success of Connectors #1
The entire contents of the three tables will be exported from RDS to Kafka by the source connector, then exported from Kafka to S3 by the sink connector. To confirm the source connector worked, verify the existence of three new Kafka topics that should have been created: pagila.public.address
, pagila.public.city
, and pagila.public.country
.
bin/kafka-topics.sh --list \
--bootstrap-server $BBROKERS \
--command-config config/client-iam.properties \
| grep pagila.public.
To confirm the sink connector worked, verify the new S3 objects have been created in the data lake’s S3 bucket. If you use the AWS CLI v2’s s3
API, we can view the contents of our target S3 bucket:
aws s3api list-objects \
--bucket your-s3-bucket \
--query 'Contents[].{Key: Key}' \
--output text
You should see approximately 15 new S3 objects (JSON files) in the S3 bucket, whose keys are organized by their topic names. The sink connector flushes new data to S3 every 100 records, or 60 seconds.
You could also use the AWS Management Console to view the S3 bucket’s contents.

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
For example, the address
table’s data will look similar to the following using the ‘Query with S3 Select’ feature via the console or API:
{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000
}
Congratulations, you have successfully imported data from a relational database into your data lake using Kafka Connect!
Connector Source #2
Create a new file or modify config/jdbc_source_connector_postgresql_01.json
. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.
This second Kafka Connect source connector also uses Confluent’s Kafka Connect JDBC Source connector to export data from the just address
table with a JDBC driver and import that data into a new Kafka topic, pagila.public.alt.address
. The difference with this source connector is transforms, known as Single Message Transformations (SMTs). SMTs are applied to messages as they flow through Connect from RDS to Kafka.
In this connector, there are four transforms, which perform the following common functions:
- Extract
address_id
integer field as the Kafka message key, as detailed in this blog post by Confluence (see ‘Setting the Kafka message key’). - Append Kafka topic name into message as a new static field;
- Append database name into message as a new static field;
Connector Sink #2
Create a new file or modify config/s3_sink_connector_01.json
. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.
This second sink connector is nearly identical to the first sink connector, except it only exports data from a single Kafka topic, pagila.public.alt.address
, into S3.
Deploy Connectors #2
Deploy the second set of source and sink connectors using the Kafka Connect REST Interface, exactly like the first pair.
curl -s -d @"config/jdbc_source_connector_postgresql_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/config | jq
curl -s -d @"config/s3_sink_connector_01.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_01/config | jq
Confirming Success of Connectors #2
Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first set, for a total of four connectors.
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_01/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_01/status | jq

To view the results of the first transform, extracting the address_id
integer field as the Kafka message key, we can use a Kafka command-line consumer:
bin/kafka-console-consumer.sh \
--topic pagila.public.alt.address \
--offset 102 --partition 0 --max-messages 5 \
--property print.key=true --property print.value=true \
--property print.offset=true --property print.partition=true \
--property print.headers=false --property print.timestamp=false \
--bootstrap-server $BBROKERS \
--consumer.config config/client-iam.properties
In the output below, note the beginning of each message, which displays the Kafka message key, identical to the address_id
. For example, {"type":"int32","optional":false},"payload":100}
.

topicExaming the Amazon S3 bucket using the AWS Management Console or the CLI, you should note the fourth set of S3 objects within the /topics/pagila.public.alt.address/
object key prefix.

Use the Amazon S3 console’s ‘Query with S3 Select’ to view the data contained in the JSON-format files. Alternately, you can use the s3 API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.public.alt.address/partition=0/pagila.public.address+0+0000000100.json"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"JSON": {"Type": "DOCUMENT"}, "CompressionType": "NONE"}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
In the sample data below, note the two new fields that have been appended into each record, a result of the Kafka Connector transforms:
{
"address_id": 100,
"address": "1308 Arecibo Way",
"address2": "",
"district": "Georgia",
"city_id": 41,
"postal_code": "30695",
"phone": "6171054059",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 101,
"address": "1599 Plock Drive",
"address2": "",
"district": "Tete",
"city_id": 534,
"postal_code": "71986",
"phone": "817248913162",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
{
"address_id": 102,
"address": "669 Firozabad Loop",
"address2": "",
"district": "Abu Dhabi",
"city_id": 12,
"postal_code": "92265",
"phone": "412903167998",
"last_update": 1487151930000,
"message_topic": "pagila.public.alt.address",
"message_source": "pagila"
}
Congratulations, you have successfully imported more data from a relational database into your data lake, including performing a simple series of transforms using Kafka Connect!
Connector Source #3
Create or modify config/jdbc_source_connector_postgresql_02.json
. Modify lines 3–5, as shown below, to reflect your RDS instance connection details.
Unlike the first two source connectors that export data from tables, this connector uses a SELECT query to export data from the Pagila database’s address
, city
, and country
tables and import the results of that SQL query data into a new Kafka topic, pagila.public.alt.address
. The SQL query in the source connector’s configuration is as follows:
SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
The final parameterized query, executed by the source connector, which allows it to detect changes based on the last_update
field is as follows:
SELECT *
FROM (SELECT a.address_id,
a.address,
a.address2,
city.city,
a.district,
a.postal_code,
country.country,
a.phone,
a.last_update
FROM address AS a
INNER JOIN city ON a.city_id = city.city_id
INNER JOIN country ON country.country_id = city.country_id
ORDER BY address_id) AS addresses
WHERE "last_update" > ?
AND "last_update" < ?
ORDER BY "last_update" ASC
Connector Sink #3
Create or modify config/s3_sink_connector_02.json
. Modify line 6, as shown below, to reflect your Amazon S3 bucket name.
This sink connector is significantly different than the previous two sink connectors. In addition to leveraging SMTs in the corresponding source connector, we are also using them in this sink connector. The sink connect appends three arbitrary static fields to each record as it is written to Amazon S3 — message_source
, message_source_engine
, and environment
using the InsertField transform. The sink connector also renames the district
field to state_province
using the ReplaceField transform.
The first two sink connectors wrote uncompressed JSON-format files to Amazon S3. This third sink connector optimizes the data imported into S3 for downstream data analytics. The sink connector writes GZIP-compressed Apache Parquet files to Amazon S3. In addition, the compressed Parquet files are partitioned by the country
field. Using a columnar file format, compression, and partitioning, queries against the data should be faster and more efficient.
Deploy Connectors #3
Deploy the final source and sink connectors using the Kafka Connect REST Interface, exactly like the first two pairs.
curl -s -d @"config/jdbc_source_connector_postgresql_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/config | jq
curl -s -d @"config/s3_sink_connector_02.json" \
-H "Content-Type: application/json" \
-X PUT http://localhost:8083/connectors/s3_sink_connector_02/config | jq
Confirming Success of Connectors #3
Use the same commands as before to confirm the new set of connectors are deployed and running, alongside the first two sets, for a total of six connectors.
curl -s -X GET http://localhost:8083/connectors | \
jq '. | sort_by(.)'
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/jdbc_source_connector_postgresql_02/status | jq
curl -s -H "Content-Type: application/json" \
-X GET http://localhost:8083/connectors/s3_sink_connector_02/status | jq

Reviewing the messages within the newpagila.query
topic, note the message_topic
field has been appended to the message by the source connector but not message_source
, message_source_engine
, and environment
fields. The sink connector appends these fields as it writes the messages to S3. Also, note the district
field has yet to be renamed by the sink connector to state_province
.

pagila.query
topicExaming the Amazon S3 bucket, again, you should note the fifth set of S3 objects within the /topics/pagila.query/
object key prefix. The Parquet-format files within are partitioned by country
.

Within each country
partition, there are Parquet files whose records contain addresses within those countries.

Use the Amazon S3 console’s ‘Query with S3 Select’ again to view the data contained in the Parquet-format files. Alternately, you can use the s3
API:
export SINK_BUCKET="your-s3-bucket"
export KEY="topics/pagila.query/country=United States/pagila.query+0+0000000003.gz.parquet"
aws s3api select-object-content \
--bucket $SINK_BUCKET \
--key $KEY \
--expression "select * from s3object limit 5" \
--expression-type "SQL" \
--input-serialization '{"Parquet": {}}' \
--output-serialization '{"JSON": {}}' "output.json" \
&& cat output.json | jq \
&& rm output.json
In the sample data below, note the four new fields that have been appended into each record, a result of the source and sink connector SMTs. Also, note the renamed district
field:
{
"address_id": 599,
"address": "1895 Zhezqazghan Drive",
"address2": "",
"city": "Garden Grove",
"state_province": "California",
"postal_code": "36693",
"country": "United States",
"phone": "137809746111",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 6,
"address": "1121 Loja Avenue",
"address2": "",
"city": "San Bernardino",
"state_province": "California",
"postal_code": "17886",
"country": "United States",
"phone": "838635286649",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
{
"address_id": 18,
"address": "770 Bydgoszcz Avenue",
"address2": "",
"city": "Citrus Heights",
"state_province": "California",
"postal_code": "16266",
"country": "United States",
"phone": "517338314235",
"last_update": "2017-02-15T09:45:30.000Z",
"message_topic": "pagila.query",
"message_source": "pagila",
"message_source_engine": "postgresql",
"environment": "development"
}
Record Updates and Query-based CDC
What happens when we change data within the tables that Kafka Connect is polling every 5 seconds? To answer this question, let’s make a few DML changes:
-- update address field
UPDATE public.address
SET address = '123 CDC Test Lane'
WHERE address_id = 100;
-- update address2 field
UPDATE public.address
SET address2 = 'Apartment #2201'
WHERE address_id = 101;
-- second update to same record
UPDATE public.address
SET address2 = 'Apartment #2202'
WHERE address_id = 101;
-- insert new country
INSERT INTO public.country (country)
values ('Wakanda');
-- should be 110
SELECT country_id FROM country WHERE country='Wakanda';
-- insert new city
INSERT INTO public.city (city, country_id)
VALUES ('Birnin Zana', 110);
-- should be 601
SELECT city_id FROM public.city WHERE country_id=110;
-- update city_id to new city_id
UPDATE public.address
SET phone = city_id = 601
WHERE address_id = 102;
-- second update to same record
UPDATE public.address
SET district = 'Lake Turkana'
WHERE address_id = 102;
-- delete an address record
UPDATE public.customer
SET address_id = 200
WHERE customer_id IN (
SELECT customer_id FROM customer WHERE address_id = 104);
DELETE
FROM public.address
WHERE address_id = 104;
To see how these changes propagate, first, examine the Kafka Connect logs. Below, we see example log events corresponding to some of the database changes shown above. The three Kafka Connect source connectors detect changes, which are exported from PostgreSQL to Kafka. The three sink connectors then write these changes to new JSON and Parquet files to the target S3 bucket.

Viewing Data in the Data Lake
A convenient way to examine both the existing data and ongoing data changes in our data lake is to crawl and catalog the S3 bucket’s contents with AWS Glue, then query the results with Amazon Athena. AWS Glue’s Data Catalog is an Apache Hive-compatible, fully-managed, persistent metadata store. AWS Glue can store the schema, metadata, and location of our data in S3. Amazon Athena is a serverless Presto-based (PrestoDB) ad-hoc analytics engine, which can query AWS Glue Data Catalog tables and the underlying S3-based data.

When writing Parquet into partitions, one shortcoming of the Kafka Connect S3 sink connector is duplicate column names in AWS Glue. As a result, any columns used as partitions are duplicated in the Glue Data Catalog’s database table schema. The issue will result in an error similar to HIVE_INVALID_METADATA: Hive metadata for table pagila_query is invalid: Table descriptor contains duplicate columns
when performing queries. To remedy this, predefine the table and the table’s schema. Alternately, edit the Glue Data Catalog table’s schema after crawling and remove the duplicate, non-partition column(s). Below, that would mean removing duplicate country
column 7.

Performing a typical SQL SELECT query in Athena will return all of the original records as well as the changes we made earlier as duplicate records (same address_id
primary key).

SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM "pagila_kafka_connect"."pagila_query"
WHERE address_id BETWEEN 100 AND 105
ORDER BY address_id;
Note the original records for address_id
100–103 as well as each change we made earlier. The last_update
field reflects the date and time the record was created or updated. Also, note the record with address_id
104 in the query results. This is the record we deleted from the Pagila database.
To view only the most current data, we can use Athena’s ROW_NUMBER()
function:
SELECT address_id, address, address2, city, state_province,
postal_code, country, last_update
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_UPDATE DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1
AND address_id BETWEEN 100 AND 105
ORDER BY address_id;
Now, we only see the latest records. Unfortunately, the record we deleted with address_id
104 is still present in the query results.
Using log-based CDC with Debezium, as opposed to query-based CDC, we would have received a record in S3 that indicated the delete. The null value message, shown below, is referred to as a tombstone message in Kafka. Note the ‘before’ syntax with the delete record as opposed to the ‘after’ syntax we observed earlier with the update record.
{
"before": {
"address": "",
"address2": null,
"phone": "",
"district": "",
"last_update": "1970-01-01T00:00:00Z",
"address_id": 104,
"postal_code": null,
"city_id": 0
},
"source": {
"schema": "public",
"sequence": "[\"1101256482032\",\"1101256482032\"]",
"xmin": null,
"connector": "postgresql",
"lsn": 1101256483936,
"name": "pagila",
"txId": 17137,
"version": "1.6.1.Final",
"ts_ms": 1628864251512,
"snapshot": "false",
"db": "pagila",
"table": "address"
},
"op": "d",
"ts_ms": 1628864251671
}
An inefficient solution to duplicates and deletes with query-based CDC would be to bulk ingest the entire query result set from the Pagila database each time instead of only the changes based on the last_update
field. Performing an unbounded query repeatedly on a huge dataset would negatively impact database performance. Notwithstanding, you would still end up with duplicates in the data lake unless you first purged the data in S3 before re-importing the new query results.
Data Movement
Using Amazon Athena, we can easily write the results of our ROW_NUMBER()
query back to the data lake for further enrichment or analysis. Athena’s CREATE TABLE AS SELECT
(CTAS) SQL statement creates a new table in Athena (an external table in AWS Glue Data Catalog) from the results of a SELECT
statement in the subquery. Athena stores data files created by the CTAS statement in a specified location in Amazon S3 and created a new AWS Glue Data Catalog table to store the result set’s schema and metadata information. CTAS supports several file formats and storage options.

Wrapping the last query in Athena’s CTAS statement, as shown below, we can write the query results as SNAPPY-compressed Parquet-format files, partitioned by country
, to a new location in the Amazon S3 bucket. Using common data lake terminology, I will refer to the resulting filtered and cleaned dataset as refined or silver instead of the raw ingestion or bronze data originating from our data source, PostgreSQL, via Kafka.
CREATE TABLE pagila_kafka_connect.pagila_query_processed
WITH (
format='PARQUET',
parquet_compression='SNAPPY',
partitioned_by=ARRAY['country'],
external_location='s3://your-s3-bucket/processed/pagila_query'
) AS
SELECT address_id, last_update, address, address2, city,
state_province, postal_code, country
FROM (SELECT *, ROW_NUMBER() OVER (
PARTITION BY address_id
ORDER BY last_update DESC) AS row_num
FROM "pagila_kafka_connect"."pagila_query") AS x
WHERE x.row_num = 1 AND address_id BETWEEN 0 and 100
ORDER BY address_id;
Examing the Amazon S3 bucket, on last time, you should new set of S3 objects within the /processed/pagila_query/
key path. The Parquet-format files, partitioned by country, are the result of the CTAS query.

We should now see a new table in the same AWS Glue Data Catalog containing metadata, location, and schema information about the data we wrote to S3 using the CTAS query. We can perform additional queries on the processed data.

ACID Transactions with a Data Lake
To fully take advantage of CDC and maximize the freshness of data in the data lake, we would also need to adopt modern data lake file formats like Apache Hudi, Apache Iceberg, or Delta Lake, along with analytics engines such as Apache Spark with Spark Structured Streaming to process the data changes. Using these technologies, it is possible to perform record-level updates and deletes of data in an object store like Amazon S3. Hudi, Iceberg, and Delta Lake offer features including ACID transactions, schema evolution, upserts, deletes, time travel, and incremental data consumption in a data lake. ELT engines like Spark can read streaming Debezium-generated CDC messages from Kafka and process those changes using Hudi, Iceberg, or Delta Lake.
Conclusion
This post explored how CDC could help us hydrate data from an Amazon RDS database into an Amazon S3-based data lake. We leveraged the capabilities of Amazon EKS, Amazon MSK, and Apache Kafka Connect. We learned about query-based CDC for capturing ongoing changes to the source data. In a subsequent post, we will explore log-based CDC using Debezium and see how data lake file formats like Apache Avro, Apache Hudi, Apache Iceberg, and Delta Lake can help us manage the data in our data lake.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Securely Decoupling Kubernetes-based Applications on Amazon EKS using Kafka with SASL/SCRAM
Posted by Gary A. Stafford in AWS, Build Automation, Cloud, DevOps, Go, Kubernetes, Software Development on July 26, 2021
Securely decoupling Go-based microservices on Amazon EKS using Amazon MSK with IRSA, SASL/SCRAM, and data encryption
Introduction
This post will explore a simple Go-based application deployed to Kubernetes using Amazon Elastic Kubernetes Service (Amazon EKS). The microservices that comprise the application communicate asynchronously by producing and consuming events from Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Authentication and Authorization for Apache Kafka
According to AWS, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients, and Apache Kafka ACLs to allow or deny actions.
For this post, our Amazon MSK cluster will use SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Mechanism) username and password-based authentication to increase security. Credentials used for SASL/SCRAM authentication will be securely stored in AWS Secrets Manager and encrypted using AWS Key Management Service (KMS).
Data Encryption
Data at rest in the MSK cluster will be encrypted at rest using Amazon MSK’s integration with AWS KMS to provide transparent server-side encryption. Encryption in transit of data moving between the brokers of the MSK cluster will be provided using Transport Layer Security (TLS 1.2).
Resource Management
AWS resources for Amazon MSK will be created and managed using HashiCorp Terraform, a popular open-source infrastructure-as-Code (IaC) software tool. Amazon EKS resources will be created and managed with eksctl
, the official CLI for Amazon EKS sponsored by Weaveworks. Lastly, Kubernetes resources will be created and managed with Helm, the open-source Kubernetes package manager.
Demonstration Application
The Go-based microservices, which compose the demonstration application, will use Segment’s popular kafka-go
client. Segment is a leading customer data platform (CDP). The microservices will access Amazon MSK using Kafka broker connection information stored in AWS Systems Manager (SSM) Parameter Store.
Source Code
All source code for this post, including the demonstration application, Terraform, and Helm resources, are open-sourced and located on GitHub.garystafford/terraform-msk-demo
Terraform project for using Amazon Managed Streaming for Apache Kafka (Amazon MSK) from Amazon Elastic Kubernetes…github.com
Prerequisites
To follow along with this post’s demonstration, you will need recent versions of terraform
, eksctl
, and helm
installed and accessible from your terminal. Optionally, it will be helpful to have git
or gh
, kubectl
, and the AWS CLI v2 (aws
).
Demonstration
To demonstrate the EKS and MSK features described above, we will proceed as follows:
- Deploy the EKS cluster and associated resources using
eksctl
; - Deploy the MSK cluster and associated resources using Terraform;
- Update the route tables for both VPCs and associated subnets to route traffic between the peered VPCs;
- Create IAM Roles for Service Accounts (IRSA) allowing access to MSK and associated services from EKS, using
eksctl
; - Deploy the Kafka client container to EKS using Helm;
- Create the Kafka topics and ACLs for MSK using the Kafka client;
- Deploy the Go-based application to EKS using Helm;
- Confirm the application’s functionality;
1. Amazon EKS cluster
To begin, create a new Amazon EKS cluster using Weaveworks’ eksctl
. The default cluster.yaml
configuration file included in the project will create a small, development-grade EKS cluster based on Kubernetes 1.20 in us-east-1
. The cluster will contain a managed node group of three t3.medium
Amazon Linux 2 EC2 worker nodes. The EKS cluster will be created in a new VPC.
Set the following environment variables and then run the eksctl create cluster
command to create the new EKS cluster and associated infrastructure.
export AWS_ACCOUNT=$(aws sts get-caller-identity \
--output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="eks-kafka-demo"
eksctl create cluster -f ./eksctl/cluster.yaml
In my experience, it could take up to 25-40 minutes to fully build and configure the new 3-node EKS cluster.


As part of creating the EKS cluster, eksctl
will automatically deploy three AWS CloudFormation stacks containing the following resources:
- Amazon Virtual Private Cloud (VPC), subnets, route tables, NAT Gateways, security policies, and the EKS control plane;
- EKS managed node group containing Kubernetes three worker nodes;
- IAM Roles for Service Accounts (IRSA) that maps an AWS IAM Role to a Kubernetes Service Account;

Once complete, update your kubeconfig
file so that you can connect to the new Amazon EKS cluster using the following AWS CLI command:
aws eks --region ${EKS_REGION} update-kubeconfig \
--name ${CLUSTER_NAME}
Review the details of the new EKS cluster using the following eksctl
command:
eksctl utils describe-stacks \
--region ${EKS_REGION} --cluster ${CLUSTER_NAME}

Review the new EKS cluster in the Amazon Container Services console’s Amazon EKS Clusters tab.

Below, note the EKS cluster’s OpenID Connect URL. Support for IAM Roles for Service Accounts (IRSA) on the EKS cluster requires an OpenID Connect issuer URL associated with it. OIDC was configured in the cluster.yaml
file; see line 8 (shown above).

The OpenID Connect identity provider, referenced in the EKS cluster’s console, created by eksctl
, can be observed in the IAM Identity provider console.

2. Amazon MSK cluster
Next, deploy the Amazon MSK cluster and associated network and security resources using HashiCorp Terraform.

Before creating the AWS infrastructure with Terraform, update the location of the Terraform state. This project’s code uses Amazon S3 as a backend to store the Terraform’s state. Change the Amazon S3 bucket name to one of your existing buckets, located in the main.tf
file.
terraform {
backend "s3" {
bucket = "terrform-us-east-1-your-unique-name"
key = "dev/terraform.tfstate"
region = "us-east-1"
}
}
Also, update the eks_vpc_id
variable in the variables.tf
file with the VPC ID of the EKS VPC created by eksctl
in step 1.
variable "eks_vpc_id" {
default = "vpc-your-id"
}
The quickest way to obtain the ID of the EKS VPC is by using the following AWS CLI v2 command:
aws ec2 describe-vpcs --query 'Vpcs[].VpcId' \
--filters Name=tag:Name,Values=eksctl-eks-kafka-demo-cluster/VPC \
--output text
Next, initialize your Terraform backend in Amazon S3 and initialize the latesthashicorp/aws
provider plugin with terraform init
.

Use terraform plan
to generate an execution plan, showing what actions Terraform would take to apply the current configuration. Terraform will create approximately 25 AWS resources as part of the plan.

Finally, use terraform apply
to create the Amazon resources. Terraform will create a small, development-grade MSK cluster based on Kafka 2.8.0 in us-east-1
, containing a set of three kafka.m5.large
broker nodes. Terraform will create the MSK cluster in a new VPC. The broker nodes are spread across three Availability Zones, each in a private subnet, within the new VPC.


It could take 30 minutes or more for Terraform to create the new cluster and associated infrastructure. Once complete, you can view the new MSK cluster in the Amazon MSK management console.

Below, note the new cluster’s ‘Access control method’ is SASL/SCRAM authentication. The cluster implements encryption of data in transit with TLS and encrypts data at rest using a customer-managed customer master key (CMS) in AWM KSM.

Below, note the ‘Associated secrets from AWS Secrets Manager.’ The secret, AmazonMSK_credentials
, contains the SASL/SCRAM authentication credentials — username and password. These are the credentials the demonstration application, deployed to EKS, will use to securely access MSK.

The SASL/SCRAM credentials secret shown above can be observed in the AWS Secrets Manager console. Note the customer-managed customer master key (CMK), stored in AWS KMS, which is used to encrypt the secret.

3. Update route tables for VPC Peering
Terraform created a VPC Peering relationship between the new EKS VPC and the MSK VPC. However, we will need to complete the peering configuration by updating the route tables. We want to route all traffic from the EKS cluster destined for MSK, whose VPC CIDR is 10.0.0.0/22
, through the VPC Peering Connection resource. There are four route tables associated with the EKS VPC. Add a new route to the route table whose name ends with ‘PublicRouteTable
’, for example, rtb-0a14e6250558a4abb / eksctl-eks-kafka-demo-cluster/PublicRouteTable
. Manually create the required route in this route table using the VPC console’s Route tables tab, as shown below (new route shown second in list).

Similarly, we want to route all traffic from the MSK cluster destined for EKS, whose CIDR is 192.168.0.0/16
, through the same VPC Peering Connection resource. Update the single MSK VPC’s route table using the VPC console’s Route tables tab, as shown below (new route shown second in list).

4. Create IAM Roles for Service Accounts (IRSA)
With both the EKS and MSK clusters created and peered, we are ready to start deploying Kubernetes resources. Create a new namespace, kafka
, which will hold the demonstration application and Kafka client pods.
export AWS_ACCOUNT=$(aws sts get-caller-identity \
--output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="eks-kafka-demo"
export NAMESPACE="kafka"
kubectl create namespace $NAMESPACE
Then using eksctl
, create two IAM Roles for Service Accounts (IRSA) associated with Kubernetes Service Accounts. The Kafka client’s pod will use one of the roles, and the demonstration application’s pods will use the other role. According to the eksctl documentation, IRSA works via IAM OpenID Connect Provider (OIDC) that EKS exposes, and IAM roles must be constructed with reference to the IAM OIDC Provider described earlier in the post, and a reference to the Kubernetes Service Account it will be bound to. The two IAM policies referenced in the eksctl
commands below were created earlier by Terraform.
# kafka-demo-app role
eksctl create iamserviceaccount \
--name kafka-demo-app-sasl-scram-serviceaccount \
--namespace $NAMESPACE \
--region $EKS_REGION \
--cluster $CLUSTER_NAME \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSScramSecretManagerPolicy" \
--approve \
--override-existing-serviceaccounts
# kafka-client-msk role
eksctl create iamserviceaccount \
--name kafka-client-msk-sasl-scram-serviceaccount \
--namespace $NAMESPACE \
--region $EKS_REGION \
--cluster $CLUSTER_NAME \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSKafkaClientMSKPolicy" \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSScramSecretManagerPolicy" \
--approve \
--override-existing-serviceaccounts
# confirm successful creation of accounts
eksctl get iamserviceaccount \
--cluster $CLUSTER_NAME \
--namespace $NAMESPACE
kubectl get serviceaccounts -n $NAMESPACE

Recall eksctl
created three CloudFormation stacks initially. With the addition of the two IAM Roles, we now have a total of five CloudFormation stacks deployed.

5. Kafka client
Next, deploy the Kafka client using the project’s Helm chart, kafka-client-msk
. We will use the Kafka client to create Kafka topics and Apache Kafka ACLs. This particular Kafka client is based on a custom Docker Image that I have built myself using an Alpine Linux base image with Java OpenJDK 17, garystafford/kafka-client-msk
. The image contains the latest Kafka client along with the AWS CLI v2 and a few other useful tools like jq
. If you prefer an alternative, there are multiple Kafka client images available on Docker Hub.h
The Kafka client only requires a single pod. Run the following helm
commands to deploy the Kafka client to EKS using the project’s Helm chart, kafka-client-msk
:
cd helm/
# perform dry run to validate chart
helm install kafka-client-msk ./kafka-client-msk \
--namespace $NAMESPACE --debug --dry-run
# apply chart resources
helm install kafka-client-msk ./kafka-client-msk \
--namespace $NAMESPACE

Confirm the successful creation of the Kafka client pod with either of the following commands:
kubectl get pods -n kafka
kubectl describe pod -n kafka -l app=kafka-client-msk

The ability of the Kafka client to interact with Amazon MSK, AWS SSM Parameter Store, and AWS Secrets Manager is based on two IAM policies created by Terraform, EKSKafkaClientMSKPolicy
and EKSScramSecretManagerPolicy
. These two policies are associated with a new IAM role, which in turn, is associated with the Kubernetes Service Account, kafka-client-msk-sasl-scram-serviceaccount
. This service account is associated with the Kafka client pod as part of the Kubernetes Deployment resource in the Helm chart.
6. Kafka topics and ACLs for Kafka
Use the Kafka client to create Kafka topics and Apache Kafka ACLs. First, use the kubectl exec
command to execute commands from within the Kafka client container.
export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-client-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash
Once successfully attached to the Kafka client container, set the following three environment variables: 1) Apache ZooKeeper connection string, 2) Kafka bootstrap brokers, and 3) ‘Distinguished-Name’ of the Bootstrap Brokers (see AWS documentation). The values for these environment variables will be retrieved from AWS Systems Manager (SSM) Parameter Store. The values were stored in the Parameter store by Terraform during the creation of the MSK cluster. Based on the policy attached to the IAM Role associated with this Pod (IRSA), the client has access to these specific parameters in the SSM Parameter store.
export ZOOKPR=$(\
aws ssm get-parameter --name /msk/scram/zookeeper \
--query 'Parameter.Value' --output text)
export BBROKERS=$(\
aws ssm get-parameter --name /msk/scram/brokers \
--query 'Parameter.Value' --output text)
export DISTINGUISHED_NAME=$(\
echo $BBROKERS | awk -F' ' '{print $1}' | sed 's/b-1/*/g')
Use the env
and grep
commands to verify the environment variables have been retrieved and constructed properly. Your Zookeeper and Kafka bootstrap broker URLs will be uniquely different from the ones shown below.
env | grep 'ZOOKPR\|BBROKERS\|DISTINGUISHED_NAME'

To test the connection between EKS and MSK, list the existing Kafka topics, from the Kafka client container:
bin/kafka-topics.sh --list --zookeeper $ZOOKPR
You should see three default topics, as shown below.

If you did not properly add the new VPC Peering routes to the appropriate route tables in the previous step, establishing peering of the EKS and MSK VPCs, you are likely to see a timeout error while attempting to connect. Go back and confirm that both of the route tables are correctly updated with the new routes.

Kafka Topics, Partitions, and Replicas
The demonstration application produces and consumes messages from two topics, foo-topic
and bar-topic
. Each topic will have three partitions, one for each of the three broker nodes, along with three replicas.

Use the following commands from the client container to create the two new Kafka topics. Once complete, confirm the creation of the topics using the list
option again.
bin/kafka-topics.sh --create --topic foo-topic \
--partitions 3 --replication-factor 3 \
--zookeeper $ZOOKPR
bin/kafka-topics.sh --create --topic bar-topic \
--partitions 3 --replication-factor 3 \
--zookeeper $ZOOKPR
bin/kafka-topics.sh --list --zookeeper $ZOOKPR

Review the details of the topics using the describe
option. Note the three partitions per topic and the three replicas per topic.
bin/kafka-topics.sh --describe --topic foo-topic --zookeeper $ZOOKPR
bin/kafka-topics.sh --describe --topic bar-topic --zookeeper $ZOOKPR

Kafka ACLs
According to Kafka’s documentation, Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses Zookeeper to store all the Access Control Lists (ACLs). Kafka ACLs are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H On Resource R.” You can read more about the ACL structure on KIP-11. To add, remove or list ACLs, you can use the Kafka authorizer CLI.
Authorize access by the Kafka brokers and the demonstration application to the two topics. First, allow access to the topics from the brokers using the DISTINGUISHED_NAME
environment variable (see AWS documentation).
# read auth for brokers
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Read \
--group=consumer-group-B \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Read \
--group=consumer-group-A \
--topic bar-topic
# write auth for brokers
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Write \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Write \
--topic bar-topic
The three instances (replicas/pods) of Service A, part of consumer-group-A
, produce messages to the foo-topic
and consume messages from the bar-topic
. Conversely, the three instances of Service B, part of consumer-group-B
, produce messages to the bar-topic
and consume messages from the foo-topic
.

Allow access to the appropriate topics from the demonstration application’s microservices. First, set the USER
environment variable — the MSK cluster’s SASL/SCRAM credential’s username, stored in AWS Secrets Manager by Terraform. We can retrieve the username from Secrets Manager and assign it to the environment variable with the following command.
export USER=$(
aws secretsmanager get-secret-value \
--secret-id AmazonMSK_credentials \
--query SecretString --output text | \
jq .username | sed -e 's/^"//' -e 's/"$//')
Create the appropriate ACLs.
# producers
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--producer \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--producer \
--topic bar-topic
# consumers
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--consumer \
--topic foo-topic \
--group consumer-group-B
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--consumer \
--topic bar-topic \
--group consumer-group-A
To list the ACLs you just created, use the following commands:
# list all ACLs
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--list
# list for individual topics, e.g. foo-topic
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--list \
--topic foo-topic

7. Deploy example application
We should finally be ready to deploy our demonstration application to EKS. The application contains two Go-based microservices, Service A and Service B. The origin of the demonstration application’s functionality is based on Soham Kamani’s September 2020 blog post, Implementing a Kafka Producer and Consumer In Golang (With Full Examples) For Production. All source Go code for the demonstration application is included in the project.
.
├── Dockerfile
├── README.md
├── consumer.go
├── dialer.go
├── dialer_scram.go
├── go.mod
├── go.sum
├── main.go
├── param_store.go
├── producer.go
└── tls.go
Both microservices use the same Docker image, garystafford/kafka-demo-service
, configured with different environment variables. The configuration makes the two services operate differently. The microservices use Segment’s kafka-go
client, as mentioned earlier, to communicate with the MSK cluster’s broker and topics. Below, we see the demonstration application’s consumer functionality (consumer.go
).
The consumer above and the producer both connect to the MSK cluster using SASL/SCRAM. Below, we see the SASL/SCRAM Dialer functionality. This Dialer
type mirrors the net.Dialer
API but is designed to open Kafka connections instead of raw network connections. Note how the function can access AWS Secrets Manager to retrieve the SASL/SCRAM credentials.
We will deploy three replicas of each microservice (three pods per microservices) using Helm. Below, we see the Kubernetes Deployment
and Service
resources for each microservice.
Run the following helm
commands to deploy the demonstration application to EKS using the project’s Helm chart, kafka-demo-app
:
cd helm/
# perform dry run to validate chart
helm install kafka-demo-app ./kafka-demo-app \
--namespace $NAMESPACE --debug --dry-run
# apply chart resources
helm install kafka-demo-app ./kafka-demo-app \
--namespace $NAMESPACE

Confirm the successful creation of the Kafka client pod with either of the following commands:
kubectl get pods -n kafka
kubectl get pods -n kafka -l app=kafka-demo-service-a
kubectl get pods -n kafka -l app=kafka-demo-service-b
You should now have a total of seven pods running in the kafka
namespace. In addition to the previously deployed single Kafka client pod, there should be three new Service A pods and three new Service B pods.

The ability of the demonstration application to interact with AWS SSM Parameter Store and AWS Secrets Manager is based on the IAM policy created by Terraform, EKSScramSecretManagerPolicy
. This policy is associated with a new IAM role, which in turn, is associated with the Kubernetes Service Account, kafka-demo-app-sasl-scram-serviceaccount
. This service account is associated with the demonstration application’s pods as part of the Kubernetes Deployment resource in the Helm chart.
8. Verify application functionality
Although the pods starting and running successfully is a good sign, to confirm that the demonstration application is operating correctly, examine the logs of Service A and Service B using kubectl
. The logs will confirm that the application has successfully retrieved the SASL/SCRAM credentials from Secrets Manager, connected to MSK, and can produce and consume messages from the appropriate topics.
kubectl logs -l app=kafka-demo-service-a -n kafka
kubectl logs -l app=kafka-demo-service-b -n kafka
The MSG_FREQ
environment variable controls the frequency at which the microservices produce messages. The frequency is 60 seconds by default but overridden and increased to 10 seconds in the Helm chart.
Below, we see the logs generated by the Service A pods. Note one of the messages indicating the Service A producer was successful: writing 1 messages to foo-topic (partition: 0)
. And a message indicating the consumer was successful: kafka-demo-service-a-db76c5d56-gmx4v received message: This is message 68 from host kafka-demo-service-b-57556cdc4c-sdhxc
. Each message contains the name of the host container that produced and consumed it.

Likewise, we see logs generated by the two Service B pods. Note one of the messages indicating the Service B producer was successful: writing 1 messages to bar-topic (partition: 2)
. And a message indicating the consumer was successful: kafka-demo-service-b-57556cdc4c-q8wvz received message: This is message 354 from host kafka-demo-service-a-db76c5d56-r88fk
.

CloudWatch Metrics
We can also examine the available Amazon MSK CloudWatch Metrics to confirm the EKS-based demonstration application is communicating as expected with MSK. There are 132 different metrics available for this cluster. Below, we see the BytesInPerSec
and BytesOutPerSecond
for each of the two topics, across each of the two topic’s three partitions, which are spread across each of the three Kafka broker nodes. Each metric shows similar volumes of traffic, both inbound and outbound, to each topic. Along with the logs, the metrics appear to show the multiple instances of Service A and Service B are producing and consuming messages.

Prometheus
We can also confirm the same results using an open-source observability tool, like Prometheus. The Amazon MSK Developer Guide outlines the steps necessary to monitor Kafka using Prometheus. The Amazon MSK cluster created by eksctl
already has open monitoring with Prometheus enabled and ports 11001
and 11002
added to the necessary MSK security group by Terraform.

Running Prometheus in a single pod on the EKS cluster, built from an Ubuntu base Docker image or similar, is probably the easiest approach for this particular demonstration.
rate(kafka_server_BrokerTopicMetrics_Count{topic=~"foo-topic|bar-topic", name=~"BytesInPerSec|BytesOutPerSec"}[5m])

BytesInPerSec
and BytesOutPerSecond for the two topics
References
- Amazon Managed Streaming for Apache Kafka Developers Guide
- Segment’s
kafka-go
project (GitHub) - Implementing a Kafka Producer and Consumer In Golang
(With Full Examples) For Production, by Soham Kamani - Kafka Golang Example (GitHub/Soham Kamani)
- AWS Amazon MSK Workshop
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Observing gRPC-based Microservices on Amazon EKS running Istio
Posted by Gary A. Stafford in AWS, Build Automation, Cloud, Continuous Delivery, DevOps, Go, JavaScript, Kubernetes, Software Development on July 8, 2021
Observing a gRPC-based Kubernetes application using Jaeger, Zipkin, Prometheus, Grafana, and Kiali on Amazon EKS running Istio service mesh
Introduction
In the previous two-part post, Kubernetes-based Microservice Observability with Istio Service Mesh, we explored a set of popular open source observability tools easily integrated with the Istio service mesh. Tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset with the addition of Fluent Bit for log processing and aggregation to Amazon CloudWatch Container Insights. We used these tools to observe a distributed, microservices-based, RESTful application deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster. The application platform, running on EKS, used Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.
In this post, we will examine those same observability tools to monitor an alternate set of Go-based microservices t