Posts Tagged Helm

Securely Decoupling Kubernetes-based Applications on Amazon EKS using Kafka with SASL/SCRAM

Securely decoupling Go-based microservices on Amazon EKS using Amazon MSK with IRSA, SASL/SCRAM, and data encryption

Introduction

This post will explore a simple Go-based application deployed to Kubernetes using Amazon Elastic Kubernetes Service (Amazon EKS). The microservices that comprise the application communicate asynchronously by producing and consuming events from Amazon Managed Streaming for Apache Kafka (Amazon MSK).

High-level application and AWS infrastructure architecture for the post

Authentication and Authorization for Apache Kafka

According to AWS, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients, and Apache Kafka ACLs to allow or deny actions.

For this post, our Amazon MSK cluster will use SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Mechanism) username and password-based authentication to increase security. Credentials used for SASL/SCRAM authentication will be securely stored in AWS Secrets Manager and encrypted using AWS Key Management Service (KMS).

Data Encryption

Data at rest in the MSK cluster will be encrypted at rest using Amazon MSK’s integration with AWS KMS to provide transparent server-side encryption. Encryption in transit of data moving between the brokers of the MSK cluster will be provided using Transport Layer Security (TLS 1.2).

Resource Management

AWS resources for Amazon MSK will be created and managed using HashiCorp Terraform, a popular open-source infrastructure-as-Code (IaC) software tool. Amazon EKS resources will be created and managed with eksctl, the official CLI for Amazon EKS sponsored by Weaveworks. Lastly, Kubernetes resources will be created and managed with Helm, the open-source Kubernetes package manager.

Demonstration Application

The Go-based microservices, which compose the demonstration application, will use Segment’s popular kafka-go client. Segment is a leading customer data platform (CDP). The microservices will access Amazon MSK using Kafka broker connection information stored in AWS Systems Manager (SSM) Parameter Store.

Source Code

All source code for this post, including the demonstration application, Terraform, and Helm resources, are open-sourced and located on GitHub.garystafford/terraform-msk-demo
Terraform project for using Amazon Managed Streaming for Apache Kafka (Amazon MSK) from Amazon Elastic Kubernetes…github.com

Prerequisites

To follow along with this post’s demonstration, you will need recent versions of terraform, eksctl, and helm installed and accessible from your terminal. Optionally, it will be helpful to have git or gh, kubectl, and the AWS CLI v2 (aws).

Demonstration

To demonstrate the EKS and MSK features described above, we will proceed as follows:

  1. Deploy the EKS cluster and associated resources using eksctl;
  2. Deploy the MSK cluster and associated resources using Terraform;
  3. Update the route tables for both VPCs and associated subnets to route traffic between the peered VPCs;
  4. Create IAM Roles for Service Accounts (IRSA) allowing access to MSK and associated services from EKS, using eksctl;
  5. Deploy the Kafka client container to EKS using Helm;
  6. Create the Kafka topics and ACLs for MSK using the Kafka client;
  7. Deploy the Go-based application to EKS using Helm;
  8. Confirm the application’s functionality;

1. Amazon EKS cluster

To begin, create a new Amazon EKS cluster using Weaveworks’ eksctl. The default cluster.yaml configuration file included in the project will create a small, development-grade EKS cluster based on Kubernetes 1.20 in us-east-1. The cluster will contain a managed node group of three t3.medium Amazon Linux 2 EC2 worker nodes. The EKS cluster will be created in a new VPC.

apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: eks-kafka-demo
region: us-east-1
version: "1.20"
iam:
withOIDC: true
managedNodeGroups:
name: managed-ng-1
amiFamily: AmazonLinux2
instanceType: t3.medium
desiredCapacity: 3
minSize: 2
maxSize: 5
volumeSize: 120
volumeType: gp2
labels:
nodegroup-type: demo-app-workloads
tags:
nodegroup-name: managed-ng-1
nodegroup-role: worker
ssh:
enableSsm: true # use aws ssm instead of ssh – no need to open port 22
iam:
withAddonPolicies:
albIngress: true
autoScaler: true
cloudWatch: true
# cloudWatch:
# clusterLogging:
# enableTypes: ["*"]
view raw cluster.yaml hosted with ❤ by GitHub

Set the following environment variables and then run the eksctl create cluster command to create the new EKS cluster and associated infrastructure.

export AWS_ACCOUNT=$(aws sts get-caller-identity \
--output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="eks-kafka-demo"
eksctl create cluster -f ./eksctl/cluster.yaml

In my experience, it could take up to 25-40 minutes to fully build and configure the new 3-node EKS cluster.

Start of the Amazon EKS cluster creation using eksctl
Successful completion of the Amazon EKS cluster creation using eksctl

As part of creating the EKS cluster, eksctl will automatically deploy three AWS CloudFormation stacks containing the following resources:

  1. Amazon Virtual Private Cloud (VPC), subnets, route tables, NAT Gateways, security policies, and the EKS control plane;
  2. EKS managed node group containing Kubernetes three worker nodes;
  3. IAM Roles for Service Accounts (IRSA) that maps an AWS IAM Role to a Kubernetes Service Account;

Once complete, update your kubeconfig file so that you can connect to the new Amazon EKS cluster using the following AWS CLI command:

aws eks --region ${EKS_REGION} update-kubeconfig \
--name ${CLUSTER_NAME}

Review the details of the new EKS cluster using the following eksctl command:

eksctl utils describe-stacks \
--region ${EKS_REGION} --cluster ${CLUSTER_NAME}

Review the new EKS cluster in the Amazon Container Services console’s Amazon EKS Clusters tab.

New Amazon EKS cluster as seen from the Amazon Container Services console

Below, note the EKS cluster’s OpenID Connect URL. Support for IAM Roles for Service Accounts (IRSA) on the EKS cluster requires an OpenID Connect issuer URL associated with it. OIDC was configured in the cluster.yaml file; see line 8 (shown above).

New Amazon EKS cluster as seen from the Amazon Container Services console

The OpenID Connect identity provider, referenced in the EKS cluster’s console, created by eksctl, can be observed in the IAM Identity provider console.

EKS cluster’s OpenID Connect identity provider in the IAM Identity provider console

2. Amazon MSK cluster

Next, deploy the Amazon MSK cluster and associated network and security resources using HashiCorp Terraform.

Graphviz open source graph visualization of Terraform’s AWS resources

Before creating the AWS infrastructure with Terraform, update the location of the Terraform state. This project’s code uses Amazon S3 as a backend to store the Terraform’s state. Change the Amazon S3 bucket name to one of your existing buckets, located in the main.tf file.

terraform {
backend "s3" {
bucket = "terrform-us-east-1-your-unique-name"
key = "dev/terraform.tfstate"
region = "us-east-1"
}
}

Also, update the eks_vpc_id variable in the variables.tf file with the VPC ID of the EKS VPC created by eksctl in step 1.

variable "eks_vpc_id" {
default = "vpc-your-id"
}

The quickest way to obtain the ID of the EKS VPC is by using the following AWS CLI v2 command:

aws ec2 describe-vpcs --query 'Vpcs[].VpcId' \
--filters Name=tag:Name,Values=eksctl-eks-kafka-demo-cluster/VPC \
--output text

Next, initialize your Terraform backend in Amazon S3 and initialize the latesthashicorp/aws provider plugin with terraform init.

Use terraform plan to generate an execution plan, showing what actions Terraform would take to apply the current configuration. Terraform will create approximately 25 AWS resources as part of the plan.

Finally, use terraform apply to create the Amazon resources. Terraform will create a small, development-grade MSK cluster based on Kafka 2.8.0 in us-east-1, containing a set of three kafka.m5.large broker nodes. Terraform will create the MSK cluster in a new VPC. The broker nodes are spread across three Availability Zones, each in a private subnet, within the new VPC.

Start of the process to create the Amazon MSK cluster using Terraform
Successful creation of the Amazon MSK cluster using Terraform

It could take 30 minutes or more for Terraform to create the new cluster and associated infrastructure. Once complete, you can view the new MSK cluster in the Amazon MSK management console.

New Amazon MSK cluster as seen from the Amazon MSK console

Below, note the new cluster’s ‘Access control method’ is SASL/SCRAM authentication. The cluster implements encryption of data in transit with TLS and encrypts data at rest using a customer-managed customer master key (CMS) in AWM KSM.

New Amazon MSK cluster as seen from the Amazon MSK console

Below, note the ‘Associated secrets from AWS Secrets Manager.’ The secret, AmazonMSK_credentials, contains the SASL/SCRAM authentication credentials — username and password. These are the credentials the demonstration application, deployed to EKS, will use to securely access MSK.

New Amazon MSK cluster as seen from the Amazon MSK console

The SASL/SCRAM credentials secret shown above can be observed in the AWS Secrets Manager console. Note the customer-managed customer master key (CMK), stored in AWS KMS, which is used to encrypt the secret.

SASL/SCRAM credentials secret shown in the AWS Secrets Manager console

3. Update route tables for VPC Peering

Terraform created a VPC Peering relationship between the new EKS VPC and the MSK VPC. However, we will need to complete the peering configuration by updating the route tables. We want to route all traffic from the EKS cluster destined for MSK, whose VPC CIDR is 10.0.0.0/22, through the VPC Peering Connection resource. There are four route tables associated with the EKS VPC. Add a new route to the route table whose name ends with ‘PublicRouteTable’, for example, rtb-0a14e6250558a4abb / eksctl-eks-kafka-demo-cluster/PublicRouteTable. Manually create the required route in this route table using the VPC console’s Route tables tab, as shown below (new route shown second in list).

The EKS route table with a new route to MSK via the VPC Peering Connection

Similarly, we want to route all traffic from the MSK cluster destined for EKS, whose CIDR is 192.168.0.0/16, through the same VPC Peering Connection resource. Update the single MSK VPC’s route table using the VPC console’s Route tables tab, as shown below (new route shown second in list).

The MSK route table with a new route to EKS via the VPC Peering Connection

4. Create IAM Roles for Service Accounts (IRSA)

With both the EKS and MSK clusters created and peered, we are ready to start deploying Kubernetes resources. Create a new namespace, kafka, which will hold the demonstration application and Kafka client pods.

export AWS_ACCOUNT=$(aws sts get-caller-identity \
--output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="eks-kafka-demo"
export NAMESPACE="kafka"
kubectl create namespace $NAMESPACE

Then using eksctl, create two IAM Roles for Service Accounts (IRSA) associated with Kubernetes Service Accounts. The Kafka client’s pod will use one of the roles, and the demonstration application’s pods will use the other role. According to the eksctl documentation, IRSA works via IAM OpenID Connect Provider (OIDC) that EKS exposes, and IAM roles must be constructed with reference to the IAM OIDC Provider described earlier in the post, and a reference to the Kubernetes Service Account it will be bound to. The two IAM policies referenced in the eksctl commands below were created earlier by Terraform.

# kafka-demo-app role
eksctl create iamserviceaccount \
--name kafka-demo-app-sasl-scram-serviceaccount \
--namespace $NAMESPACE \
--region $EKS_REGION \
--cluster $CLUSTER_NAME \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSScramSecretManagerPolicy" \
--approve \
--override-existing-serviceaccounts
# kafka-client-msk role
eksctl create iamserviceaccount \
--name kafka-client-msk-sasl-scram-serviceaccount \
--namespace $NAMESPACE \
--region $EKS_REGION \
--cluster $CLUSTER_NAME \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSKafkaClientMSKPolicy" \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSScramSecretManagerPolicy" \
--approve \
--override-existing-serviceaccounts
# confirm successful creation of accounts
eksctl get iamserviceaccount \
--cluster $CLUSTER_NAME \
--namespace $NAMESPACE
kubectl get serviceaccounts -n $NAMESPACE
Successful creation of the two IAM Roles for Service Accounts (IRSA) using eksctl

Recall eksctl created three CloudFormation stacks initially. With the addition of the two IAM Roles, we now have a total of five CloudFormation stacks deployed.

Amazon EKS-related CloudFormation stacks created by eksctl

5. Kafka client

Next, deploy the Kafka client using the project’s Helm chart, kafka-client-msk. We will use the Kafka client to create Kafka topics and Apache Kafka ACLs. This particular Kafka client is based on a custom Docker Image that I have built myself using an Alpine Linux base image with Java OpenJDK 17, garystafford/kafka-client-msk. The image contains the latest Kafka client along with the AWS CLI v2 and a few other useful tools like jq. If you prefer an alternative, there are multiple Kafka client images available on Docker Hub.h

# purpose: Kafka client for Amazon MSK
# author: Gary A. Stafford
# date: 2021-07-20
FROM openjdk:17-alpine3.14
ENV KAFKA_VERSION="2.8.0"
ENV KAFKA_PACKAGE="kafka_2.13-2.8.0"
ENV AWS_MSK_IAM_AUTH="1.1.0"
ENV GLIBC_VER="2.33-r0"
RUN apk update && apk add –no-cache wget tar bash jq
# install glibc compatibility for alpine (req. for aws cli v2) and aws cli v2
# reference: https://github.com/aws/aws-cli/issues/4685#issuecomment-615872019
RUN apk –no-cache add binutils curl less groff \
&& curl -sL https://alpine-pkgs.sgerrand.com/sgerrand.rsa.pub -o /etc/apk/keys/sgerrand.rsa.pub \
&& curl -sLO https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VER}/glibc-${GLIBC_VER}.apk \
&& curl -sLO https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VER}/glibc-bin-${GLIBC_VER}.apk \
&& apk add –no-cache \
glibc-${GLIBC_VER}.apk \
glibc-bin-${GLIBC_VER}.apk \
&& curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip \
&& unzip awscliv2.zip \
&& aws/install \
&& rm -rf awscliv2.zip aws \
&& apk –no-cache del binutils curl \
&& rm glibc-${GLIBC_VER}.apk \
&& rm glibc-bin-${GLIBC_VER}.apk \
&& rm -rf /var/cache/apk/*
# setup java truststore
RUN cp $JAVA_HOME/lib/security/cacerts /tmp/kafka.client.truststore.jks
# install kafka
RUN wget https://downloads.apache.org/kafka/$KAFKA_VERSION/$KAFKA_PACKAGE.tgz \
&& tar -xzf $KAFKA_PACKAGE.tgz \
&& rm -rf $KAFKA_PACKAGE.tgz
WORKDIR /$KAFKA_PACKAGE
# install aws-msk-iam-auth jar
RUN wget https://github.com/aws/aws-msk-iam-auth/releases/download/$AWS_MSK_IAM_AUTH/aws-msk-iam-auth-$AWS_MSK_IAM_AUTH-all.jar \
&& mv aws-msk-iam-auth-$AWS_MSK_IAM_AUTH-all.jar libs/
CMD ["/bin/sh", "-c", "tail -f /dev/null"]
ENTRYPOINT ["/bin/bash"]
view raw Dockerfile hosted with ❤ by GitHub

The Kafka client only requires a single pod. Run the following helm commands to deploy the Kafka client to EKS using the project’s Helm chart, kafka-client-msk:

cd helm/
# perform dry run to validate chart
helm install kafka-client-msk ./kafka-client-msk \
--namespace $NAMESPACE --debug --dry-run
# apply chart resources
helm install kafka-client-msk ./kafka-client-msk \
--namespace $NAMESPACE
Successful deployment of the Kafka client’s Helm chart

Confirm the successful creation of the Kafka client pod with either of the following commands:

kubectl get pods -n kafka
kubectl describe pod -n kafka -l app=kafka-client-msk
Describing the Kafka client pod using kubectl

The ability of the Kafka client to interact with Amazon MSK, AWS SSM Parameter Store, and AWS Secrets Manager is based on two IAM policies created by Terraform, EKSKafkaClientMSKPolicy and EKSScramSecretManagerPolicy. These two policies are associated with a new IAM role, which in turn, is associated with the Kubernetes Service Account, kafka-client-msk-sasl-scram-serviceaccount. This service account is associated with the Kafka client pod as part of the Kubernetes Deployment resource in the Helm chart.

6. Kafka topics and ACLs for Kafka

Use the Kafka client to create Kafka topics and Apache Kafka ACLs. First, use the kubectl exec command to execute commands from within the Kafka client container.

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-client-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash

Once successfully attached to the Kafka client container, set the following three environment variables: 1) Apache ZooKeeper connection string, 2) Kafka bootstrap brokers, and 3) ‘Distinguished-Name’ of the Bootstrap Brokers (see AWS documentation). The values for these environment variables will be retrieved from AWS Systems Manager (SSM) Parameter Store. The values were stored in the Parameter store by Terraform during the creation of the MSK cluster. Based on the policy attached to the IAM Role associated with this Pod (IRSA), the client has access to these specific parameters in the SSM Parameter store.

export ZOOKPR=$(\
aws ssm get-parameter --name /msk/scram/zookeeper \
--query 'Parameter.Value' --output text)
export BBROKERS=$(\
aws ssm get-parameter --name /msk/scram/brokers \
--query 'Parameter.Value' --output text)
export DISTINGUISHED_NAME=$(\
echo $BBROKERS | awk -F' ' '{print $1}' | sed 's/b-1/*/g')

Use the env and grep commands to verify the environment variables have been retrieved and constructed properly. Your Zookeeper and Kafka bootstrap broker URLs will be uniquely different from the ones shown below.

env | grep 'ZOOKPR\|BBROKERS\|DISTINGUISHED_NAME'
Setting the required environment variables in the Kafka client container

To test the connection between EKS and MSK, list the existing Kafka topics, from the Kafka client container:

bin/kafka-topics.sh --list --zookeeper $ZOOKPR

You should see three default topics, as shown below.

The new MSK cluster’s default Kafka topics

If you did not properly add the new VPC Peering routes to the appropriate route tables in the previous step, establishing peering of the EKS and MSK VPCs, you are likely to see a timeout error while attempting to connect. Go back and confirm that both of the route tables are correctly updated with the new routes.

Connection timeout error due to incorrect configuration of VPC peering-related route tables

Kafka Topics, Partitions, and Replicas

The demonstration application produces and consumes messages from two topics, foo-topic and bar-topic. Each topic will have three partitions, one for each of the three broker nodes, along with three replicas.

Kafka topic’s relationship to partitions, replicas, and brokers

Use the following commands from the client container to create the two new Kafka topics. Once complete, confirm the creation of the topics using the list option again.

bin/kafka-topics.sh --create --topic foo-topic \
--partitions 3 --replication-factor 3 \
--zookeeper $ZOOKPR
bin/kafka-topics.sh --create --topic bar-topic \
--partitions 3 --replication-factor 3 \
--zookeeper $ZOOKPR
bin/kafka-topics.sh --list --zookeeper $ZOOKPR
Creating the two new Kafka topics

Review the details of the topics using the describe option. Note the three partitions per topic and the three replicas per topic.

bin/kafka-topics.sh --describe --topic foo-topic --zookeeper $ZOOKPR
bin/kafka-topics.sh --describe --topic bar-topic --zookeeper $ZOOKPR
Describing each of the two new Kafka topics

Kafka ACLs

According to Kafka’s documentation, Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses Zookeeper to store all the Access Control Lists (ACLs). Kafka ACLs are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H On Resource R.” You can read more about the ACL structure on KIP-11. To add, remove or list ACLs, you can use the Kafka authorizer CLI.

Authorize access by the Kafka brokers and the demonstration application to the two topics. First, allow access to the topics from the brokers using the DISTINGUISHED_NAME environment variable (see AWS documentation).

# read auth for brokers
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Read \
--group=consumer-group-B \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Read \
--group=consumer-group-A \
--topic bar-topic
# write auth for brokers
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Write \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Write \
--topic bar-topic

The three instances (replicas/pods) of Service A, part of consumer-group-A, produce messages to the foo-topic and consume messages from the bar-topic. Conversely, the three instances of Service B, part of consumer-group-B, produce messages to the bar-topic and consume messages from the foo-topic.

Message flow from and to microservices to Kafka topics

Allow access to the appropriate topics from the demonstration application’s microservices. First, set the USER environment variable — the MSK cluster’s SASL/SCRAM credential’s username, stored in AWS Secrets Manager by Terraform. We can retrieve the username from Secrets Manager and assign it to the environment variable with the following command.

export USER=$(
aws secretsmanager get-secret-value \
--secret-id AmazonMSK_credentials \
--query SecretString --output text | \
jq .username | sed -e 's/^"//' -e 's/"$//')

Create the appropriate ACLs.

# producers
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--producer \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--producer \
--topic bar-topic
# consumers
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--consumer \
--topic foo-topic \
--group consumer-group-B
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--consumer \
--topic bar-topic \
--group consumer-group-A

To list the ACLs you just created, use the following commands:

# list all ACLs
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--list
# list for individual topics, e.g. foo-topic
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--list \
--topic foo-topic
Kafka ACLs associated with the foo-topic Kafka topic

7. Deploy example application

We should finally be ready to deploy our demonstration application to EKS. The application contains two Go-based microservices, Service A and Service B. The origin of the demonstration application’s functionality is based on Soham Kamani’s September 2020 blog post, Implementing a Kafka Producer and Consumer In Golang (With Full Examples) For Production. All source Go code for the demonstration application is included in the project.

.
├── Dockerfile
├── README.md
├── consumer.go
├── dialer.go
├── dialer_scram.go
├── go.mod
├── go.sum
├── main.go
├── param_store.go
├── producer.go
└── tls.go

Both microservices use the same Docker image, garystafford/kafka-demo-service, configured with different environment variables. The configuration makes the two services operate differently. The microservices use Segment’s kafka-go client, as mentioned earlier, to communicate with the MSK cluster’s broker and topics. Below, we see the demonstration application’s consumer functionality (consumer.go).

package main
import (
"context"
"github.com/segmentio/kafka-go"
)
func consume(ctx context.Context) {
dialer := saslScramDialer()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic2,
GroupID: group,
Logger: kafka.LoggerFunc(log.Debugf),
Dialer: dialer,
})
for {
msg, err := r.ReadMessage(ctx)
if err != nil {
log.Panicf("%v could not read message: %v", getHostname(), err.Error())
}
log.Debugf("%v received message: %v", getHostname(), string(msg.Value))
}
}
view raw consumer.go hosted with ❤ by GitHub

The consumer above and the producer both connect to the MSK cluster using SASL/SCRAM. Below, we see the SASL/SCRAM Dialer functionality. This Dialer type mirrors the net.Dialer API but is designed to open Kafka connections instead of raw network connections. Note how the function can access AWS Secrets Manager to retrieve the SASL/SCRAM credentials.

package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/secretsmanager"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"time"
)
var (
secretId = "AmazonMSK_credentials"
versionStage = "AWSCURRENT"
)
type credentials struct {
username string
password string
}
func getCredentials() credentials {
svc := secretsmanager.New(sess)
input := &secretsmanager.GetSecretValueInput{
SecretId: aws.String(secretId),
VersionStage: aws.String(versionStage),
}
result, err := svc.GetSecretValue(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case secretsmanager.ErrCodeResourceNotFoundException:
log.Error(secretsmanager.ErrCodeResourceNotFoundException, aerr.Error())
case secretsmanager.ErrCodeInvalidParameterException:
log.Error(secretsmanager.ErrCodeInvalidParameterException, aerr.Error())
case secretsmanager.ErrCodeInvalidRequestException:
log.Error(secretsmanager.ErrCodeInvalidRequestException, aerr.Error())
case secretsmanager.ErrCodeDecryptionFailure:
log.Error(secretsmanager.ErrCodeDecryptionFailure, aerr.Error())
case secretsmanager.ErrCodeInternalServiceError:
log.Error(secretsmanager.ErrCodeInternalServiceError, aerr.Error())
default:
log.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Error(err.Error())
}
}
kmsCredentials := map[string]string{}
if err := json.Unmarshal([]byte(*result.SecretString), &kmsCredentials); err != nil {
log.Panic(err.Error())
}
return credentials{
username: kmsCredentials["username"],
password: kmsCredentials["password"],
}
}
func saslScramDialer() *kafka.Dialer {
credentials := getCredentials()
mechanism, err := scram.Mechanism(
scram.SHA512,
credentials.username,
credentials.password,
)
if err != nil {
log.Fatal(err)
}
config := tlsConfig()
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: config,
SASLMechanism: mechanism,
}
return dialer
}
view raw dialer_scram.go hosted with ❤ by GitHub

We will deploy three replicas of each microservice (three pods per microservices) using Helm. Below, we see the Kubernetes Deployment and Service resources for each microservice.

apiVersion: v1
kind: Service
metadata:
name: kafka-demo-service-a
labels:
app: kafka-demo-service-a
component: service
spec:
ports:
name: http
port: 8080
selector:
app: kafka-demo-service-a
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-demo-service-a
labels:
app: kafka-demo-service-a
component: service
spec:
replicas: {{ .Values.kafkaDemoService.replicaCount }}
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-demo-service-a
component: service
template:
metadata:
labels:
app: kafka-demo-service-a
component: service
spec:
serviceAccountName: {{ .Values.kafkaDemoService.serviceAccountName }}
containers:
image: {{ .Values.kafkaDemoService.image.image }}
name: kafka-demo-service-a
ports:
containerPort: {{ .Values.kafkaDemoService.image.ports.containerPort }}
imagePullPolicy: {{ .Values.kafkaDemoService.image.pullPolicy }}
env:
name: LOG_LEVEL
value: "debug"
name: TOPIC1
value: "foo-topic"
name: TOPIC2
value: "bar-topic"
name: GROUP
value: "consumer-group-A"
name: MSG_FREQ
value: "10"
apiVersion: v1
kind: Service
metadata:
name: kafka-demo-service-b
labels:
app: kafka-demo-service-b
component: service
spec:
ports:
name: http
port: 8080
selector:
app: kafka-demo-service-b
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-demo-service-b
labels:
app: kafka-demo-service-b
component: service
spec:
replicas: {{ .Values.kafkaDemoService.replicaCount }}
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-demo-service-b
component: service
template:
metadata:
labels:
app: kafka-demo-service-b
component: service
spec:
serviceAccountName: {{ .Values.kafkaDemoService.serviceAccountName }}
containers:
image: {{ .Values.kafkaDemoService.image.image }}
name: kafka-demo-service-b
ports:
containerPort: {{ .Values.kafkaDemoService.image.ports.containerPort }}
imagePullPolicy: {{ .Values.kafkaDemoService.image.pullPolicy }}
env:
name: LOG_LEVEL
value: "debug"
name: TOPIC1
value: "bar-topic"
name: TOPIC2
value: "foo-topic"
name: GROUP
value: "consumer-group-B"
name: MSG_FREQ
value: "10"
view raw Deployment.yaml hosted with ❤ by GitHub

Run the following helm commands to deploy the demonstration application to EKS using the project’s Helm chart, kafka-demo-app:

cd helm/
# perform dry run to validate chart
helm install kafka-demo-app ./kafka-demo-app \
--namespace $NAMESPACE --debug --dry-run
# apply chart resources
helm install kafka-demo-app ./kafka-demo-app \
--namespace $NAMESPACE
Successful deployment of the demonstration application’s Helm chart

Confirm the successful creation of the Kafka client pod with either of the following commands:

kubectl get pods -n kafka
kubectl get pods -n kafka -l app=kafka-demo-service-a
kubectl get pods -n kafka -l app=kafka-demo-service-b

You should now have a total of seven pods running in the kafka namespace. In addition to the previously deployed single Kafka client pod, there should be three new Service A pods and three new Service B pods.

The kafka namespace showing seven running pods

The ability of the demonstration application to interact with AWS SSM Parameter Store and AWS Secrets Manager is based on the IAM policy created by Terraform, EKSScramSecretManagerPolicy. This policy is associated with a new IAM role, which in turn, is associated with the Kubernetes Service Account, kafka-demo-app-sasl-scram-serviceaccount. This service account is associated with the demonstration application’s pods as part of the Kubernetes Deployment resource in the Helm chart.

8. Verify application functionality

Although the pods starting and running successfully is a good sign, to confirm that the demonstration application is operating correctly, examine the logs of Service A and Service B using kubectl. The logs will confirm that the application has successfully retrieved the SASL/SCRAM credentials from Secrets Manager, connected to MSK, and can produce and consume messages from the appropriate topics.

kubectl logs -l app=kafka-demo-service-a -n kafka
kubectl logs -l app=kafka-demo-service-b -n kafka

The MSG_FREQ environment variable controls the frequency at which the microservices produce messages. The frequency is 60 seconds by default but overridden and increased to 10 seconds in the Helm chart.

Below, we see the logs generated by the Service A pods. Note one of the messages indicating the Service A producer was successful: writing 1 messages to foo-topic (partition: 0). And a message indicating the consumer was successful: kafka-demo-service-a-db76c5d56-gmx4v received message: This is message 68 from host kafka-demo-service-b-57556cdc4c-sdhxc. Each message contains the name of the host container that produced and consumed it.

Logs generated by the Service A pods

Likewise, we see logs generated by the two Service B pods. Note one of the messages indicating the Service B producer was successful: writing 1 messages to bar-topic (partition: 2). And a message indicating the consumer was successful: kafka-demo-service-b-57556cdc4c-q8wvz received message: This is message 354 from host kafka-demo-service-a-db76c5d56-r88fk.

Logs generated by the Service B pods

CloudWatch Metrics

We can also examine the available Amazon MSK CloudWatch Metrics to confirm the EKS-based demonstration application is communicating as expected with MSK. There are 132 different metrics available for this cluster. Below, we see the BytesInPerSec and BytesOutPerSecond for each of the two topics, across each of the two topic’s three partitions, which are spread across each of the three Kafka broker nodes. Each metric shows similar volumes of traffic, both inbound and outbound, to each topic. Along with the logs, the metrics appear to show the multiple instances of Service A and Service B are producing and consuming messages.

Amazon CloudWatch Metrics for the MSK cluster

Prometheus

We can also confirm the same results using an open-source observability tool, like Prometheus. The Amazon MSK Developer Guide outlines the steps necessary to monitor Kafka using Prometheus. The Amazon MSK cluster created by eksctl already has open monitoring with Prometheus enabled and ports 11001 and 11002 added to the necessary MSK security group by Terraform.

Amazon MSK broker targets successfully connected to Prometheus

Running Prometheus in a single pod on the EKS cluster, built from an Ubuntu base Docker image or similar, is probably the easiest approach for this particular demonstration.

rate(kafka_server_BrokerTopicMetrics_Count{topic=~"foo-topic|bar-topic", name=~"BytesInPerSec|BytesOutPerSec"}[5m])
Prometheus graph showing the rate of BytesInPerSec and BytesOutPerSecond for the two topics

References


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment