Archive for category Build Automation

Securely Decoupling Go-based Microservices on Amazon EKS using Amazon MSK with IRSA, SASL/SCRAM, and Data Encryption

Introduction

This post will explore a simple Go-based application deployed to Kubernetes using Amazon Elastic Kubernetes Service (Amazon EKS). The microservices that comprise the application communicate asynchronously by producing and consuming events from Amazon Managed Streaming for Apache Kafka (Amazon MSK).

High-level application and AWS infrastructure architecture for the post

Authentication and Authorization for Apache Kafka

According to AWS, you can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients, and Apache Kafka ACLs to allow or deny actions.

For this post, our Amazon MSK cluster will use SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Mechanism) username and password-based authentication to increase security. Credentials used for SASL/SCRAM authentication will be securely stored in AWS Secrets Manager and encrypted using AWS Key Management Service (KMS).

Data Encryption

Data at rest in the MSK cluster will be encrypted at rest using Amazon MSK’s integration with AWS KMS to provide transparent server-side encryption. Encryption in transit of data moving between the brokers of the MSK cluster will be provided using Transport Layer Security (TLS 1.2).

Resource Management

AWS resources for Amazon MSK will be created and managed using HashiCorp Terraform, a popular open-source infrastructure-as-Code (IaC) software tool. Amazon EKS resources will be created and managed with eksctl, the official CLI for Amazon EKS sponsored by Weaveworks. Lastly, Kubernetes resources will be created and managed with Helm, the open-source Kubernetes package manager.

Demonstration Application

The Go-based microservices, which compose the demonstration application, will use Segment’s popular kafka-go client. Segment is a leading customer data platform (CDP). The microservices will access Amazon MSK using Kafka broker connection information stored in AWS Systems Manager (SSM) Parameter Store.

Source Code

All source code for this post, including the demonstration application, Terraform, and Helm resources, are open-sourced and located on GitHub.garystafford/terraform-msk-demo
Terraform project for using Amazon Managed Streaming for Apache Kafka (Amazon MSK) from Amazon Elastic Kubernetes…github.com

Prerequisites

To follow along with this post’s demonstration, you will need recent versions of terraform, eksctl, and helm installed and accessible from your terminal. Optionally, it will be helpful to have git or gh, kubectl, and the AWS CLI v2 (aws).

Demonstration

To demonstrate the EKS and MSK features described above, we will proceed as follows:

  1. Deploy the EKS cluster and associated resources using eksctl;
  2. Deploy the MSK cluster and associated resources using Terraform;
  3. Update the route tables for both VPCs and associated subnets to route traffic between the peered VPCs;
  4. Create IAM Roles for Service Accounts (IRSA) allowing access to MSK and associated services from EKS, using eksctl;
  5. Deploy the Kafka client container to EKS using Helm;
  6. Create the Kafka topics and ACLs for MSK using the Kafka client;
  7. Deploy the Go-based application to EKS using Helm;
  8. Confirm the application’s functionality;

1. Amazon EKS cluster

To begin, create a new Amazon EKS cluster using Weaveworks’ eksctl. The default cluster.yaml configuration file included in the project will create a small, development-grade EKS cluster based on Kubernetes 1.20 in us-east-1. The cluster will contain a managed node group of three t3.medium Amazon Linux 2 EC2 worker nodes. The EKS cluster will be created in a new VPC.

apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: eks-kafka-demo
region: us-east-1
version: "1.20"
iam:
withOIDC: true
managedNodeGroups:
name: managed-ng-1
amiFamily: AmazonLinux2
instanceType: t3.medium
desiredCapacity: 3
minSize: 2
maxSize: 5
volumeSize: 120
volumeType: gp2
labels:
nodegroup-type: demo-app-workloads
tags:
nodegroup-name: managed-ng-1
nodegroup-role: worker
ssh:
enableSsm: true # use aws ssm instead of ssh – no need to open port 22
iam:
withAddonPolicies:
albIngress: true
autoScaler: true
cloudWatch: true
# cloudWatch:
# clusterLogging:
# enableTypes: ["*"]
view raw cluster.yaml hosted with ❤ by GitHub

Set the following environment variables and then run the eksctl create cluster command to create the new EKS cluster and associated infrastructure.

export AWS_ACCOUNT=$(aws sts get-caller-identity \
--output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="eks-kafka-demo"
eksctl create cluster -f ./eksctl/cluster.yaml

In my experience, it could take up to 25-40 minutes to fully build and configure the new 3-node EKS cluster.

Start of the Amazon EKS cluster creation using eksctl
Successful completion of the Amazon EKS cluster creation using eksctl

As part of creating the EKS cluster, eksctl will automatically deploy three AWS CloudFormation stacks containing the following resources:

  1. Amazon Virtual Private Cloud (VPC), subnets, route tables, NAT Gateways, security policies, and the EKS control plane;
  2. EKS managed node group containing Kubernetes three worker nodes;
  3. IAM Roles for Service Accounts (IRSA) that maps an AWS IAM Role to a Kubernetes Service Account;

Once complete, update your kubeconfig file so that you can connect to the new Amazon EKS cluster using the following AWS CLI command:

aws eks --region ${EKS_REGION} update-kubeconfig \
--name ${CLUSTER_NAME}

Review the details of the new EKS cluster using the following eksctl command:

eksctl utils describe-stacks \
--region ${EKS_REGION} --cluster ${CLUSTER_NAME}

Review the new EKS cluster in the Amazon Container Services console’s Amazon EKS Clusters tab.

New Amazon EKS cluster as seen from the Amazon Container Services console

Below, note the EKS cluster’s OpenID Connect URL. Support for IAM Roles for Service Accounts (IRSA) on the EKS cluster requires an OpenID Connect issuer URL associated with it. OIDC was configured in the cluster.yaml file; see line 8 (shown above).

New Amazon EKS cluster as seen from the Amazon Container Services console

The OpenID Connect identity provider, referenced in the EKS cluster’s console, created by eksctl, can be observed in the IAM Identity provider console.

EKS cluster’s OpenID Connect identity provider in the IAM Identity provider console

2. Amazon MSK cluster

Next, deploy the Amazon MSK cluster and associated network and security resources using HashiCorp Terraform.

Graphviz open source graph visualization of Terraform’s AWS resources

Before creating the AWS infrastructure with Terraform, update the location of the Terraform state. This project’s code uses Amazon S3 as a backend to store the Terraform’s state. Change the Amazon S3 bucket name to one of your existing buckets, located in the main.tf file.

terraform {
backend "s3" {
bucket = "terrform-us-east-1-your-unique-name"
key = "dev/terraform.tfstate"
region = "us-east-1"
}
}

Also, update the eks_vpc_id variable in the variables.tf file with the VPC ID of the EKS VPC created by eksctl in step 1.

variable "eks_vpc_id" {
default = "vpc-your-id"
}

The quickest way to obtain the ID of the EKS VPC is by using the following AWS CLI v2 command:

aws ec2 describe-vpcs --query 'Vpcs[].VpcId' \
--filters Name=tag:Name,Values=eksctl-eks-kafka-demo-cluster/VPC \
--output text

Next, initialize your Terraform backend in Amazon S3 and initialize the latesthashicorp/aws provider plugin with terraform init.

Use terraform plan to generate an execution plan, showing what actions Terraform would take to apply the current configuration. Terraform will create approximately 25 AWS resources as part of the plan.

Finally, use terraform apply to create the Amazon resources. Terraform will create a small, development-grade MSK cluster based on Kafka 2.8.0 in us-east-1, containing a set of three kafka.m5.large broker nodes. Terraform will create the MSK cluster in a new VPC. The broker nodes are spread across three Availability Zones, each in a private subnet, within the new VPC.

Start of the process to create the Amazon MSK cluster using Terraform
Successful creation of the Amazon MSK cluster using Terraform

It could take 30 minutes or more for Terraform to create the new cluster and associated infrastructure. Once complete, you can view the new MSK cluster in the Amazon MSK management console.

New Amazon MSK cluster as seen from the Amazon MSK console

Below, note the new cluster’s ‘Access control method’ is SASL/SCRAM authentication. The cluster implements encryption of data in transit with TLS and encrypts data at rest using a customer-managed customer master key (CMS) in AWM KSM.

New Amazon MSK cluster as seen from the Amazon MSK console

Below, note the ‘Associated secrets from AWS Secrets Manager.’ The secret, AmazonMSK_credentials, contains the SASL/SCRAM authentication credentials — username and password. These are the credentials the demonstration application, deployed to EKS, will use to securely access MSK.

New Amazon MSK cluster as seen from the Amazon MSK console

The SASL/SCRAM credentials secret shown above can be observed in the AWS Secrets Manager console. Note the customer-managed customer master key (CMK), stored in AWS KMS, which is used to encrypt the secret.

SASL/SCRAM credentials secret shown in the AWS Secrets Manager console

3. Update route tables for VPC Peering

Terraform created a VPC Peering relationship between the new EKS VPC and the MSK VPC. However, we will need to complete the peering configuration by updating the route tables. We want to route all traffic from the EKS cluster destined for MSK, whose VPC CIDR is 10.0.0.0/22, through the VPC Peering Connection resource. There are four route tables associated with the EKS VPC. Add a new route to the route table whose name ends with ‘PublicRouteTable’, for example, rtb-0a14e6250558a4abb / eksctl-eks-kafka-demo-cluster/PublicRouteTable. Manually create the required route in this route table using the VPC console’s Route tables tab, as shown below (new route shown second in list).

The EKS route table with a new route to MSK via the VPC Peering Connection

Similarly, we want to route all traffic from the MSK cluster destined for EKS, whose CIDR is 192.168.0.0/16, through the same VPC Peering Connection resource. Update the single MSK VPC’s route table using the VPC console’s Route tables tab, as shown below (new route shown second in list).

The MSK route table with a new route to EKS via the VPC Peering Connection

4. Create IAM Roles for Service Accounts (IRSA)

With both the EKS and MSK clusters created and peered, we are ready to start deploying Kubernetes resources. Create a new namespace, kafka, which will hold the demonstration application and Kafka client pods.

export AWS_ACCOUNT=$(aws sts get-caller-identity \
--output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="eks-kafka-demo"
export NAMESPACE="kafka"
kubectl create namespace $NAMESPACE

Then using eksctl, create two IAM Roles for Service Accounts (IRSA) associated with Kubernetes Service Accounts. The Kafka client’s pod will use one of the roles, and the demonstration application’s pods will use the other role. According to the eksctl documentation, IRSA works via IAM OpenID Connect Provider (OIDC) that EKS exposes, and IAM roles must be constructed with reference to the IAM OIDC Provider described earlier in the post, and a reference to the Kubernetes Service Account it will be bound to. The two IAM policies referenced in the eksctl commands below were created earlier by Terraform.

# kafka-demo-app role
eksctl create iamserviceaccount \
--name kafka-demo-app-sasl-scram-serviceaccount \
--namespace $NAMESPACE \
--region $EKS_REGION \
--cluster $CLUSTER_NAME \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSScramSecretManagerPolicy" \
--approve \
--override-existing-serviceaccounts
# kafka-client-msk role
eksctl create iamserviceaccount \
--name kafka-client-msk-sasl-scram-serviceaccount \
--namespace $NAMESPACE \
--region $EKS_REGION \
--cluster $CLUSTER_NAME \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSKafkaClientMSKPolicy" \
--attach-policy-arn "arn:aws:iam::${AWS_ACCOUNT}:policy/EKSScramSecretManagerPolicy" \
--approve \
--override-existing-serviceaccounts
# confirm successful creation of accounts
eksctl get iamserviceaccount \
--cluster $CLUSTER_NAME \
--namespace $NAMESPACE
kubectl get serviceaccounts -n $NAMESPACE
Successful creation of the two IAM Roles for Service Accounts (IRSA) using eksctl

Recall eksctl created three CloudFormation stacks initially. With the addition of the two IAM Roles, we now have a total of five CloudFormation stacks deployed.

Amazon EKS-related CloudFormation stacks created by eksctl

5. Kafka client

Next, deploy the Kafka client using the project’s Helm chart, kafka-client-msk. We will use the Kafka client to create Kafka topics and Apache Kafka ACLs. This particular Kafka client is based on a custom Docker Image that I have built myself using an Alpine Linux base image with Java OpenJDK 17, garystafford/kafka-client-msk. The image contains the latest Kafka client along with the AWS CLI v2 and a few other useful tools like jq. If you prefer an alternative, there are multiple Kafka client images available on Docker Hub.h

# purpose: Kafka client for Amazon MSK
# author: Gary A. Stafford
# date: 2021-07-20
FROM openjdk:17-alpine3.14
ENV KAFKA_VERSION="2.8.0"
ENV KAFKA_PACKAGE="kafka_2.13-2.8.0"
ENV AWS_MSK_IAM_AUTH="1.1.0"
ENV GLIBC_VER="2.33-r0"
RUN apk update && apk add –no-cache wget tar bash jq
# install glibc compatibility for alpine (req. for aws cli v2) and aws cli v2
# reference: https://github.com/aws/aws-cli/issues/4685#issuecomment-615872019
RUN apk –no-cache add binutils curl less groff \
&& curl -sL https://alpine-pkgs.sgerrand.com/sgerrand.rsa.pub -o /etc/apk/keys/sgerrand.rsa.pub \
&& curl -sLO https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VER}/glibc-${GLIBC_VER}.apk \
&& curl -sLO https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VER}/glibc-bin-${GLIBC_VER}.apk \
&& apk add –no-cache \
glibc-${GLIBC_VER}.apk \
glibc-bin-${GLIBC_VER}.apk \
&& curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip \
&& unzip awscliv2.zip \
&& aws/install \
&& rm -rf awscliv2.zip aws \
&& apk –no-cache del binutils curl \
&& rm glibc-${GLIBC_VER}.apk \
&& rm glibc-bin-${GLIBC_VER}.apk \
&& rm -rf /var/cache/apk/*
# setup java truststore
RUN cp $JAVA_HOME/lib/security/cacerts /tmp/kafka.client.truststore.jks
# install kafka
RUN wget https://downloads.apache.org/kafka/$KAFKA_VERSION/$KAFKA_PACKAGE.tgz \
&& tar -xzf $KAFKA_PACKAGE.tgz \
&& rm -rf $KAFKA_PACKAGE.tgz
WORKDIR /$KAFKA_PACKAGE
# install aws-msk-iam-auth jar
RUN wget https://github.com/aws/aws-msk-iam-auth/releases/download/$AWS_MSK_IAM_AUTH/aws-msk-iam-auth-$AWS_MSK_IAM_AUTH-all.jar \
&& mv aws-msk-iam-auth-$AWS_MSK_IAM_AUTH-all.jar libs/
CMD ["/bin/sh", "-c", "tail -f /dev/null"]
ENTRYPOINT ["/bin/bash"]
view raw Dockerfile hosted with ❤ by GitHub

The Kafka client only requires a single pod. Run the following helm commands to deploy the Kafka client to EKS using the project’s Helm chart, kafka-client-msk:

cd helm/
# perform dry run to validate chart
helm install kafka-client-msk ./kafka-client-msk \
--namespace $NAMESPACE --debug --dry-run
# apply chart resources
helm install kafka-client-msk ./kafka-client-msk \
--namespace $NAMESPACE
Successful deployment of the Kafka client’s Helm chart

Confirm the successful creation of the Kafka client pod with either of the following commands:

kubectl get pods -n kafka
kubectl describe pod -n kafka -l app=kafka-client-msk
Describing the Kafka client pod using kubectl

The ability of the Kafka client to interact with Amazon MSK, AWS SSM Parameter Store, and AWS Secrets Manager is based on two IAM policies created by Terraform, EKSKafkaClientMSKPolicy and EKSScramSecretManagerPolicy. These two policies are associated with a new IAM role, which in turn, is associated with the Kubernetes Service Account, kafka-client-msk-sasl-scram-serviceaccount. This service account is associated with the Kafka client pod as part of the Kubernetes Deployment resource in the Helm chart.

6. Kafka topics and ACLs for Kafka

Use the Kafka client to create Kafka topics and Apache Kafka ACLs. First, use the kubectl exec command to execute commands from within the Kafka client container.

export KAFKA_CONTAINER=$(
kubectl get pods -n kafka -l app=kafka-client-msk | \
awk 'FNR == 2 {print $1}')
kubectl exec -it $KAFKA_CONTAINER -n kafka -- bash

Once successfully attached to the Kafka client container, set the following three environment variables: 1) Apache ZooKeeper connection string, 2) Kafka bootstrap brokers, and 3) ‘Distinguished-Name’ of the Bootstrap Brokers (see AWS documentation). The values for these environment variables will be retrieved from AWS Systems Manager (SSM) Parameter Store. The values were stored in the Parameter store by Terraform during the creation of the MSK cluster. Based on the policy attached to the IAM Role associated with this Pod (IRSA), the client has access to these specific parameters in the SSM Parameter store.

export ZOOKPR=$(\
aws ssm get-parameter --name /msk/scram/zookeeper \
--query 'Parameter.Value' --output text)
export BBROKERS=$(\
aws ssm get-parameter --name /msk/scram/brokers \
--query 'Parameter.Value' --output text)
export DISTINGUISHED_NAME=$(\
echo $BBROKERS | awk -F' ' '{print $1}' | sed 's/b-1/*/g')

Use the env and grep commands to verify the environment variables have been retrieved and constructed properly. Your Zookeeper and Kafka bootstrap broker URLs will be uniquely different from the ones shown below.

env | grep 'ZOOKPR\|BBROKERS\|DISTINGUISHED_NAME'
Setting the required environment variables in the Kafka client container

To test the connection between EKS and MSK, list the existing Kafka topics, from the Kafka client container:

bin/kafka-topics.sh --list --zookeeper $ZOOKPR

You should see three default topics, as shown below.

The new MSK cluster’s default Kafka topics

If you did not properly add the new VPC Peering routes to the appropriate route tables in the previous step, establishing peering of the EKS and MSK VPCs, you are likely to see a timeout error while attempting to connect. Go back and confirm that both of the route tables are correctly updated with the new routes.

Connection timeout error due to incorrect configuration of VPC peering-related route tables

Kafka Topics, Partitions, and Replicas

The demonstration application produces and consumes messages from two topics, foo-topic and bar-topic. Each topic will have three partitions, one for each of the three broker nodes, along with three replicas.

Kafka topic’s relationship to partitions, replicas, and brokers

Use the following commands from the client container to create the two new Kafka topics. When complete, confirm the creation of the topics using the list option again.

bin/kafka-topics.sh --create --topic foo-topic \
--partitions 3 --replication-factor 3 \
--zookeeper $ZOOKPR
bin/kafka-topics.sh --create --topic bar-topic \
--partitions 3 --replication-factor 3 \
--zookeeper $ZOOKPR
bin/kafka-topics.sh --list --zookeeper $ZOOKPR
Creating the two new Kafka topics

Review the details of the topics using the describe option. Note the three partitions per topic and the three replicas per topic.

bin/kafka-topics.sh --describe --topic foo-topic --zookeeper $ZOOKPR
bin/kafka-topics.sh --describe --topic bar-topic --zookeeper $ZOOKPR
Describing each of the two new Kafka topics

Kafka ACLs

According to Kafka’s documentation, Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses Zookeeper to store all the Access Control Lists (ACLs). Kafka ACLs are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H On Resource R.” You can read more about the ACL structure on KIP-11. To add, remove or list ACLs, you can use the Kafka authorizer CLI.

Authorize access by the Kafka brokers and the demonstration application to the two topics. First, allow access to the topics from the brokers using the DISTINGUISHED_NAME environment variable (see AWS documentation).

# read auth for brokers
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Read \
--group=consumer-group-B \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Read \
--group=consumer-group-A \
--topic bar-topic
# write auth for brokers
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Write \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal "User:CN=${DISTINGUISHED_NAME}" \
--operation Write \
--topic bar-topic

The three instances (replicas/pods) of Service A, part of consumer-group-A, produce messages to the foo-topic and consume messages from the bar-topic. Conversely, the three instances of Service B, part of consumer-group-B, produce messages to the bar-topic and consume messages from the foo-topic.

Message flow from and to microservices to Kafka topics

Allow access to the appropriate topics from the demonstration application’s microservices. First, set the USER environment variable — the MSK cluster’s SASL/SCRAM credential’s username, stored in AWS Secrets Manager by Terraform. We can retrieve the username from Secrets Manager and assign it to the environment variable with the following command.

export USER=$(
aws secretsmanager get-secret-value \
--secret-id AmazonMSK_credentials \
--query SecretString --output text | \
jq .username | sed -e 's/^"//' -e 's/"$//')

Create the appropriate ACLs.

# producers
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--producer \
--topic foo-topic
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--producer \
--topic bar-topic
# consumers
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--consumer \
--topic foo-topic \
--group consumer-group-B
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--add \
--allow-principal User:$USER \
--consumer \
--topic bar-topic \
--group consumer-group-A

To list the ACLs you just created, use the following commands:

# list all ACLs
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--list
# list for individual topics, e.g. foo-topic
bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$ZOOKPR \
--list \
--topic foo-topic
Kafka ACLs associated with the foo-topic Kafka topic

7. Deploy example application

We should finally be ready to deploy our demonstration application to EKS. The application contains two Go-based microservices, Service A and Service B. The origin of the demonstration application’s functionality is based on Soham Kamani’s September 2020 blog post, Implementing a Kafka Producer and Consumer In Golang (With Full Examples) For Production. All source Go code for the demonstration application is included in the project.

.
├── Dockerfile
├── README.md
├── consumer.go
├── dialer.go
├── dialer_scram.go
├── go.mod
├── go.sum
├── main.go
├── param_store.go
├── producer.go
└── tls.go

Both microservices use the same Docker image, garystafford/kafka-demo-service, configured with different environment variables. The configuration makes the two services operate differently. The microservices use Segment’s kafka-go client, as mentioned earlier, to communicate with the MSK cluster’s broker and topics. Below, we see the demonstration application’s consumer functionality (consumer.go).

package main
import (
"context"
"github.com/segmentio/kafka-go"
)
func consume(ctx context.Context) {
dialer := saslScramDialer()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic2,
GroupID: group,
Logger: kafka.LoggerFunc(log.Debugf),
Dialer: dialer,
})
for {
msg, err := r.ReadMessage(ctx)
if err != nil {
log.Panicf("%v could not read message: %v", getHostname(), err.Error())
}
log.Debugf("%v received message: %v", getHostname(), string(msg.Value))
}
}
view raw consumer.go hosted with ❤ by GitHub

The consumer above and the producer both connect to the MSK cluster using SASL/SCRAM. Below, we see the SASL/SCRAM Dialer functionality. This Dialer type mirrors the net.Dialer API but is designed to open Kafka connections instead of raw network connections. Note how the function can access AWS Secrets Manager to retrieve the SASL/SCRAM credentials.

package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/secretsmanager"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"time"
)
var (
secretId = "AmazonMSK_credentials"
versionStage = "AWSCURRENT"
)
type credentials struct {
username string
password string
}
func getCredentials() credentials {
svc := secretsmanager.New(sess)
input := &secretsmanager.GetSecretValueInput{
SecretId: aws.String(secretId),
VersionStage: aws.String(versionStage),
}
result, err := svc.GetSecretValue(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case secretsmanager.ErrCodeResourceNotFoundException:
log.Error(secretsmanager.ErrCodeResourceNotFoundException, aerr.Error())
case secretsmanager.ErrCodeInvalidParameterException:
log.Error(secretsmanager.ErrCodeInvalidParameterException, aerr.Error())
case secretsmanager.ErrCodeInvalidRequestException:
log.Error(secretsmanager.ErrCodeInvalidRequestException, aerr.Error())
case secretsmanager.ErrCodeDecryptionFailure:
log.Error(secretsmanager.ErrCodeDecryptionFailure, aerr.Error())
case secretsmanager.ErrCodeInternalServiceError:
log.Error(secretsmanager.ErrCodeInternalServiceError, aerr.Error())
default:
log.Error(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Error(err.Error())
}
}
kmsCredentials := map[string]string{}
if err := json.Unmarshal([]byte(*result.SecretString), &kmsCredentials); err != nil {
log.Panic(err.Error())
}
return credentials{
username: kmsCredentials["username"],
password: kmsCredentials["password"],
}
}
func saslScramDialer() *kafka.Dialer {
credentials := getCredentials()
mechanism, err := scram.Mechanism(
scram.SHA512,
credentials.username,
credentials.password,
)
if err != nil {
log.Fatal(err)
}
config := tlsConfig()
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: config,
SASLMechanism: mechanism,
}
return dialer
}
view raw dialer_scram.go hosted with ❤ by GitHub

We will deploy three replicas of each microservice (three pods per microservices) using Helm. Below, we see the Kubernetes Deployment and Service resources for each microservice.

apiVersion: v1
kind: Service
metadata:
name: kafka-demo-service-a
labels:
app: kafka-demo-service-a
component: service
spec:
ports:
name: http
port: 8080
selector:
app: kafka-demo-service-a
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-demo-service-a
labels:
app: kafka-demo-service-a
component: service
spec:
replicas: {{ .Values.kafkaDemoService.replicaCount }}
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-demo-service-a
component: service
template:
metadata:
labels:
app: kafka-demo-service-a
component: service
spec:
serviceAccountName: {{ .Values.kafkaDemoService.serviceAccountName }}
containers:
image: {{ .Values.kafkaDemoService.image.image }}
name: kafka-demo-service-a
ports:
containerPort: {{ .Values.kafkaDemoService.image.ports.containerPort }}
imagePullPolicy: {{ .Values.kafkaDemoService.image.pullPolicy }}
env:
name: LOG_LEVEL
value: "debug"
name: TOPIC1
value: "foo-topic"
name: TOPIC2
value: "bar-topic"
name: GROUP
value: "consumer-group-A"
name: MSG_FREQ
value: "10"
apiVersion: v1
kind: Service
metadata:
name: kafka-demo-service-b
labels:
app: kafka-demo-service-b
component: service
spec:
ports:
name: http
port: 8080
selector:
app: kafka-demo-service-b
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-demo-service-b
labels:
app: kafka-demo-service-b
component: service
spec:
replicas: {{ .Values.kafkaDemoService.replicaCount }}
strategy:
type: Recreate
selector:
matchLabels:
app: kafka-demo-service-b
component: service
template:
metadata:
labels:
app: kafka-demo-service-b
component: service
spec:
serviceAccountName: {{ .Values.kafkaDemoService.serviceAccountName }}
containers:
image: {{ .Values.kafkaDemoService.image.image }}
name: kafka-demo-service-b
ports:
containerPort: {{ .Values.kafkaDemoService.image.ports.containerPort }}
imagePullPolicy: {{ .Values.kafkaDemoService.image.pullPolicy }}
env:
name: LOG_LEVEL
value: "debug"
name: TOPIC1
value: "bar-topic"
name: TOPIC2
value: "foo-topic"
name: GROUP
value: "consumer-group-B"
name: MSG_FREQ
value: "10"
view raw Deployment.yaml hosted with ❤ by GitHub

Run the following helm commands to deploy the demonstration application to EKS using the project’s Helm chart, kafka-demo-app:

cd helm/
# perform dry run to validate chart
helm install kafka-demo-app ./kafka-demo-app \
--namespace $NAMESPACE --debug --dry-run
# apply chart resources
helm install kafka-demo-app ./kafka-demo-app \
--namespace $NAMESPACE
Successful deployment of the demonstration application’s Helm chart

Confirm the successful creation of the Kafka client pod with either of the following commands:

kubectl get pods -n kafka
kubectl get pods -n kafka -l app=kafka-demo-service-a
kubectl get pods -n kafka -l app=kafka-demo-service-b

You should now have a total of seven pods running in the kafka namespace. In addition to the previously deployed single Kafka client pod, there should be three new Service A pods and three new Service B pods.

The kafka namespace showing seven running pods

The ability of the demonstration application to interact with AWS SSM Parameter Store and AWS Secrets Manager is based on the IAM policy created by Terraform, EKSScramSecretManagerPolicy. This policy is associated with a new IAM role, which in turn, is associated with the Kubernetes Service Account, kafka-demo-app-sasl-scram-serviceaccount. This service account is associated with the demonstration application’s pods as part of the Kubernetes Deployment resource in the Helm chart.

8. Verify application functionality

Although the pods starting and running successfully is a good sign, to confirm that the demonstration application is operating correctly, examine the logs of Service A and Service B using kubectl. The logs will confirm that the application has successfully retrieved the SASL/SCRAM credentials from Secrets Manager, connected to MSK, and can produce and consume messages from the appropriate topics.

kubectl logs -l app=kafka-demo-service-a -n kafka
kubectl logs -l app=kafka-demo-service-b -n kafka

The MSG_FREQ environment variable controls the frequency at which the microservices produce messages. The frequency is 60 seconds by default but overridden and increased to 10 seconds in the Helm chart.

Below, we see the logs generated by the Service A pods. Note one of the messages indicating the Service A producer was successful: writing 1 messages to foo-topic (partition: 0). And a message indicating the consumer was successful: kafka-demo-service-a-db76c5d56-gmx4v received message: This is message 68 from host kafka-demo-service-b-57556cdc4c-sdhxc. Each message contains the name of the host container that produced and consumed it.

Logs generated by the Service A pods

Likewise, we see logs generated by the two Service B pods. Note one of the messages indicating the Service B producer was successful: writing 1 messages to bar-topic (partition: 2). And a message indicating the consumer was successful: kafka-demo-service-b-57556cdc4c-q8wvz received message: This is message 354 from host kafka-demo-service-a-db76c5d56-r88fk.

Logs generated by the Service B pods

CloudWatch Metrics

We can also examine the available Amazon MSK CloudWatch Metrics to confirm the EKS-based demonstration application is communicating as expected with MSK. There are 132 different metrics available for this cluster. Below, we see the BytesInPerSec and BytesOutPerSecond for each of the two topics, across each of the two topic’s three partitions, which are spread across each of the three Kafka broker nodes. Each metric shows similar volumes of traffic, both inbound and outbound, to each topic. Along with the logs, the metrics appear to show the multiple instances of Service A and Service B are producing and consuming messages.

Amazon CloudWatch Metrics for the MSK cluster

Prometheus

We can also confirm the same results using an open-source observability tool, like Prometheus. The Amazon MSK Developer Guide outlines the steps necessary to monitor Kafka using Prometheus. The Amazon MSK cluster created by eksctl already has open monitoring with Prometheus enabled and ports 11001 and 11002 added to the necessary MSK security group by Terraform.

Amazon MSK broker targets successfully connected to Prometheus

Running Prometheus in a single pod on the EKS cluster, built from an Ubuntu base Docker image or similar, is probably the easiest approach for this particular demonstration.

rate(kafka_server_BrokerTopicMetrics_Count{topic=~"foo-topic|bar-topic", name=~"BytesInPerSec|BytesOutPerSec"}[5m])
Prometheus graph showing the rate of BytesInPerSec and BytesOutPerSecond for the two topics

References


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

Leave a comment

Observing gRPC-based Microservices on Amazon EKS running Istio

Observing a gRPC-based Kubernetes application using Jaeger, Zipkin, Prometheus, Grafana, and Kiali on Amazon EKS running Istio service mesh

In the previous two-part post, Kubernetes-based Microservice Observability with Istio Service Mesh, we explored a set of popular open source observability tools easily integrated with the Istio service mesh. Tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset with the addition of Fluent Bit for log processing and aggregation to Amazon CloudWatch Container Insights. We used these tools to observe a distributed, microservices-based, RESTful application deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster. The application platform, running on EKS, used Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.

In this post, we will examine those same observability tools to monitor an alternate set of Go-based microservices that use Protocol Buffers (aka Protobuf) over gRPC (gRPC Remote Procedure Calls) and HTTP/2 for client-server communications as opposed to the more common RESTful JSON over HTTP. We will learn how Kubernetes, Istio, and the observability tools work seamlessly with gRPC, just as they do with JSON over HTTP on Amazon EKS.

Kiali Management Console showing gRPC-based reference application platform

Technologies

gRPC

According to the gRPC project, gRPC is a modern open source high-performance Remote Procedure Call (RPC) framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking, and authentication. gRPC is also applicable in the last mile of distributed computing to connect devices, mobile applications, and browsers to backend services.

gRPC was initially created by Google, which has used a single general-purpose RPC infrastructure called Stubby to connect the large number of microservices running within and across its data centers for over a decade. In March 2015, Google decided to build the next version of Stubby and make it open source. gRPC is now used in many organizations outside of Google, including Square, Netflix, CoreOS, Docker, CockroachDB, Cisco, and Juniper Networks. gRPC currently supports over ten languages, including C#, C++, Dart, Go, Java, Kotlin, Node, Objective-C, PHP, Python, and Ruby.

According to widely-cited 2019 tests published by Ruwan Fernando, “gRPC is roughly 7 times faster than REST when receiving data & roughly 10 times faster than REST when sending data for this specific payload. This is mainly due to the tight packing of the Protocol Buffers and the use of HTTP/2 by gRPC.”

Protocol Buffers

With gRPC, you define your service using Protocol Buffers (aka Protobuf), a powerful binary serialization toolset and language. According to Google, Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data — think XML, but smaller, faster, and simpler. Google’s previous documentation claimed protocol buffers were “3 to 10 times smaller and 20 to 100 times faster than XML.

Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto file to generate data access classes. With the proto3 language version, protocol buffers currently support generated code in Java, Python, Objective-C, C++, Dart, Go, Ruby, and C#, with more languages to come. For this post, we have compiled our protobufs for Go. You can read more about the binary wire format of Protobuf on Google’s Developers Portal.

Reference Application Platform

To demonstrate the use of the observability tools, we will deploy a reference application platform to Amazon EKS on AWS. The application platform was developed to demonstrate different Kubernetes platforms, such as EKS, GKE, AKS, and concepts such as service meshes, API management, observability, CI/CD, DevOps, and Chaos Engineering. The platform comprises a backend of eight Go-based microservices labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, one Go-based gRPC Gateway reverse proxy, four MongoDB databases, and one RabbitMQ message queue.

Reference Application Platform’s Angular-based UI

The reference application platform is designed to generate gRPC-based, synchronous service-to-service IPC (inter-process communication), asynchronous TCP-based service-to-queue-to-service communications, and TCP-based service-to-database communications. For example, Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message to a RabbitMQ queue, which Service F consumes and writes to MongoDB, and so on. The platform’s distributed service communications can be observed using the observability tools when the application is deployed to a Kubernetes cluster running the Istio service mesh.

High-level architecture of the gRPC-based Reference Application Platform

Converting to gRPC and Protocol Buffers

For this post, the eight Go microservices have been modified to use gRPC with protocol buffers over HTTP/2 instead of JSON over HTTP. Specifically, the services use version 3 (aka proto3) of protocol buffers. With gRPC, a gRPC client calls a gRPC server. Some of the platform’s services are gRPC servers, others are gRPC clients, while some act as both client and server.

gRPC Gateway

In the revised platform architecture diagram above, note the addition of the gRPC Gateway reverse proxy that replaces Service A at the edge of the API. The proxy, which translates a RESTful HTTP API into gRPC, sits between the Angular-based Web UI and Service A. Assuming for the sake of this demonstration that most consumers of an API require a RESTful JSON over HTTP API, we have added a gRPC Gateway reverse proxy to the platform. The gRPC Gateway proxies communications between the JSON over HTTP-based clients and the gRPC-based microservices. The gRPC Gateway helps to provide APIs with both gRPC and RESTful styles at the same time.

A diagram from the grpc-gateway GitHub project site demonstrates how the reverse proxy works.

Diagram courtesy: https://github.com/grpc-ecosystem/grpc-gateway

Alternatives to gRPC Gateway

As an alternative to the gRPC Gateway reverse proxy, we could convert the TypeScript-based Angular UI client to communicate via gRPC and protobufs and communicate directly with Service A. One option to achieve this is gRPC Web, a JavaScript implementation of gRPC for browser clients. gRPC Web clients connect to gRPC services via a special proxy, which by default is Envoy. The project’s roadmap includes plans for gRPC Web to be supported in language-specific web frameworks for languages such as Python, Java, and Node.

Demonstration

To follow along with this post’s demonstration, review the installation instructions detailed in part one of the previous post, Kubernetes-based Microservice Observability with Istio Service Mesh, to deploy and configure the Amazon EKS cluster, Istio, Amazon MQ, and DocumentDB. To expedite the deployment of the revised gRPC-based platform to the dev namespace, I have included a Helm chart, ref-app-grpc, in the project. Using the chart, you can ignore any instructions in the previous post that refer to deploying resources to the dev namespace. See the chart’s README file for further instructions.

Deployed gRPC-based Reference Application Platform as seen from Argo CD

Source Code

The gRPC-based microservices source code, Kubernetes resources, and Helm chart are located in the k8s-istio-observe-backend project repository in the 2021-istio branch. This project repository is the only source code you will need for this demonstration.

git clone --branch 2021-istio --single-branch \
https://github.com/garystafford/k8s-istio-observe-backend.git

Optionally, the Angular-based web client source code is located in the k8s-istio-observe-frontend repository on the new 2021-grpc branch. The source protobuf .proto file and the Buf-compiled protobuf files are located in the pb-greeting and protobuf project repositories. You do not need to clone any of these projects for this post’s demonstration.

All Docker images for the services, UI, and the reverse proxy are pulled from Docker Hub.

All images for this post are located on Docker Hub

Code Changes

Although this post is not specifically about writing Go for gRPC and protobuf, to better understand the observability requirements and capabilities of these technologies compared to the previous JSON over HTTP-based services, it is helpful to review the code changes.

Microservices

First, compare the revised source code for Service A, shown below to the original code in the previous post. The service’s code is almost completely rewritten. For example, note the following code changes to Service A, which are synonymous with the other backend services:

  • Import of the v3 greeting protobuf package;
  • Local Greeting struct replaced with pb.Greeting struct;
  • All services are now hosted on port 50051;
  • The HTTP server and all API resource handler functions are removed;
  • Headers used for distributed tracing have moved from HTTP request object to metadata passed in a gRPC Context type;
  • Service A is both a gRPC client and a server, which is called by the gRPC Gateway reverse proxy;
  • The primary GreetingHandler function is replaced by the protobuf package’s Greeting function;
  • gRPC clients, such as Service A, call gRPC servers using the CallGrpcService function;
  • CORS handling is offloaded from the services to Istio;
  • Logging methods are largely unchanged;

Source code for revised gRPC-based Service A:

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: Service A – gRPC/Protobuf
package main
import (
"context"
lrf "github.com/banzaicloud/logrus-runtime-formatter"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"net"
"os"
"time"
pb "github.com/garystafford/protobuf/greeting/v3"
)
var (
logLevel = getEnv("LOG_LEVEL", "info")
port = getEnv("PORT", ":50051")
serviceName = getEnv("SERVICE_NAME", "Service A")
message = getEnv("GREETING", "Hello, from Service A!")
URLServiceB = getEnv("SERVICE_B_URL", "service-b:50051")
URLServiceC = getEnv("SERVICE_C_URL", "service-c:50051")
greetings []*pb.Greeting
log = logrus.New()
)
type greetingServiceServer struct {
pb.UnimplementedGreetingServiceServer
}
func (s *greetingServiceServer) Greeting(ctx context.Context, _ *pb.GreetingRequest) (*pb.GreetingResponse, error) {
greetings = nil
requestGreeting := pb.Greeting{
Id: uuid.New().String(),
Service: serviceName,
Message: message,
Created: time.Now().Local().String(),
Hostname: getHostname(),
}
greetings = append(greetings, &requestGreeting)
callGrpcService(ctx, &requestGreeting, URLServiceB)
callGrpcService(ctx, &requestGreeting, URLServiceC)
return &pb.GreetingResponse{
Greeting: greetings,
}, nil
}
func callGrpcService(ctx context.Context, requestGreeting *pb.Greeting, address string) {
conn, err := createGRPCConn(ctx, address)
if err != nil {
log.Fatal(err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
log.Error(err)
}
}(conn)
headersIn, _ := metadata.FromIncomingContext(ctx)
log.Debugf("headersIn: %s", headersIn)
client := pb.NewGreetingServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx = metadata.NewOutgoingContext(context.Background(), headersIn)
headersOut, _ := metadata.FromOutgoingContext(ctx)
log.Debugf("headersOut: %s", headersOut)
defer cancel()
responseGreetings, err := client.Greeting(ctx, &pb.GreetingRequest{Greeting: requestGreeting})
if err != nil {
log.Fatal(err)
}
log.Info(responseGreetings.GetGreeting())
for _, responseGreeting := range responseGreetings.GetGreeting() {
greetings = append(greetings, responseGreeting)
}
}
func createGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) {
var opts []grpc.DialOption
opts = append(opts,
grpc.WithInsecure(),
grpc.WithBlock())
conn, err := grpc.DialContext(ctx, addr, opts)
if err != nil {
log.Fatal(err)
return nil, err
}
return conn, nil
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Error(err)
}
return hostname
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func run() error {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatal(err)
}
grpcServer := grpc.NewServer()
pb.RegisterGreetingServiceServer(grpcServer, &greetingServiceServer{})
return grpcServer.Serve(lis)
}
func init() {
childFormatter := logrus.JSONFormatter{}
runtimeFormatter := &lrf.Formatter{ChildFormatter: &childFormatter}
runtimeFormatter.Line = true
log.Formatter = runtimeFormatter
log.Out = os.Stdout
level, err := logrus.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.Level = level
}
func main() {
if err := run(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}
view raw main.go hosted with ❤ by GitHub

Greeting Protocol Buffers

Shown below is the greeting v3 protocol buffers .proto file. The fields within the Greeting, originally defined in the RESTful JSON-based services as a struct, remains largely unchanged, however, we now have a message— an aggregate containing a set of typed fields. The GreetingRequest is composed of a single Greeting message, while the GreetingResponse message is composed of multiple (repeated) Greeting messages. Services pass a Greeting message in their request and receive an array of one or more messages in response.

syntax = "proto3";
package greeting.v3;
import "google/api/annotations.proto";
option go_package = "github.com/garystafford/pb-greeting/gen/go/greeting/v3";
message Greeting {
string id = 1;
string service = 2;
string message = 3;
string created = 4;
string hostname = 5;
}
message GreetingRequest {
Greeting greeting = 1;
}
message GreetingResponse {
repeated Greeting greeting = 1;
}
service GreetingService {
rpc Greeting (GreetingRequest) returns (GreetingResponse) {
option (google.api.http) = {
get: "/api/greeting"
};
}
}
view raw greeting.proto hosted with ❤ by GitHub

The protobuf is compiled with Buf, the popular Go-based protocol compiler tool. Using Buf, four files are generated: Go, Go gRPC, gRPC Gateway, and Swagger (OpenAPI v2).

.
├── greeting.pb.go
├── greeting.pb.gw.go
├── greeting.swagger.json
└── greeting_grpc.pb.go

Buf is configured using two files, buf.yaml:

version: v1beta1
name: buf.build/garystafford/pb-greeting
deps:
- buf.build/beta/googleapis
- buf.build/grpc-ecosystem/grpc-gateway
build:
roots:
- proto
lint:
use:
- DEFAULT
breaking:
use:
- FILE

And, and buf.gen.yaml:

version: v1beta1
plugins:
- name: go
out: ../protobuf
opt:
- paths=source_relative
- name: go-grpc
out: ../protobuf
opt:
- paths=source_relative
- name: grpc-gateway
out: ../protobuf
opt:
- paths=source_relative
- generate_unbound_methods=true
- name: openapiv2
out: ../protobuf
opt:
- logtostderr=true

The compiled protobuf code is included in the protobuf project on GitHub, and the v3 version is imported into each microservice and the reverse proxy. Below is a snippet of the greeting.pb.go compiled Go file.

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.17.1
// source: greeting/v3/greeting.proto
package v3
import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion 20)
)
type Greeting struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Service string `protobuf:"bytes,2,opt,name=service,proto3" json:"service,omitempty"`
Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
Created string `protobuf:"bytes,4,opt,name=created,proto3" json:"created,omitempty"`
Hostname string `protobuf:"bytes,5,opt,name=hostname,proto3" json:"hostname,omitempty"`
}
func (x *Greeting) Reset() {
*x = Greeting{}
if protoimpl.UnsafeEnabled {
mi := &file_greeting_v3_greeting_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Greeting) String() string {
return protoimpl.X.MessageStringOf(x)
}
view raw greeting.pb.go hosted with ❤ by GitHub

Using Swagger, we can view the greeting protocol buffers’ single RESTful API resource, exposed with an HTTP GET method. You can use the Docker-based version of Swagger UI for viewing protoc generated swagger definitions.

docker run -p 8080:8080 -d --name swagger-ui \
-e SWAGGER_JSON=/tmp/greeting/v3/greeting.swagger.json \
-v ${GOAPTH}/src/protobuf:/tmp swaggerapi/swagger-ui

The Angular UI makes an HTTP GET request to the /api/greeting resource, which is transformed to gRPC and proxied to Service A, where it is handled by the Greeting function.

Swagger UI view of the Greeting protobuf

gRPC Gateway Reverse Proxy

As explained earlier, the gRPC Gateway reverse proxy, which translates the RESTful HTTP API into gRPC, is new. In the code sample below, note the following code features:

  1. Import of the v3 greeting protobuf package;
  2. ServeMux, a request multiplexer, matches http requests to patterns and invokes the corresponding handler;
  3. RegisterGreetingServiceHandlerFromEndpoint registers the http handlers for service GreetingService to mux. The handlers forward requests to the gRPC endpoint;
  4. x-b3 request headers, used for distributed tracing, are collected from the incoming HTTP request and propagated to the upstream services in the gRPC Context type;
// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: gRPC Gateway / Reverse Proxy
// reference: https://github.com/grpc-ecosystem/grpc-gateway
package main
import (
"context"
"flag"
lrf "github.com/banzaicloud/logrus-runtime-formatter"
pb "github.com/garystafford/protobuf/greeting/v3"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"net/http"
"os"
)
var (
logLevel = getEnv("LOG_LEVEL", "info")
port = getEnv("PORT", ":50051")
URLServiceA = getEnv("SERVICE_A_URL", "service-a:50051")
log = logrus.New()
)
func injectHeadersIntoMetadata(ctx context.Context, req *http.Request) metadata.MD {
//https://aspenmesh.io/2018/04/tracing-grpc-with-istio/
otHeaders := []string{
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context"}
var pairs []string
for _, h := range otHeaders {
if v := req.Header.Get(h); len(v) > 0 {
pairs = append(pairs, h, v)
}
}
return metadata.Pairs(pairs)
}
type annotator func(context.Context, *http.Request) metadata.MD
func chainGrpcAnnotators(annotators annotator) annotator {
return func(c context.Context, r *http.Request) metadata.MD {
var mds []metadata.MD
for _, a := range annotators {
mds = append(mds, a(c, r))
}
return metadata.Join(mds)
}
}
func run() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
annotators := []annotator{injectHeadersIntoMetadata}
mux := runtime.NewServeMux(
runtime.WithMetadata(chainGrpcAnnotators(annotators)),
)
opts := []grpc.DialOption{grpc.WithInsecure()}
err := pb.RegisterGreetingServiceHandlerFromEndpoint(ctx, mux, URLServiceA, opts)
if err != nil {
return err
}
return http.ListenAndServe(port, mux)
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func init() {
childFormatter := logrus.JSONFormatter{}
runtimeFormatter := &lrf.Formatter{ChildFormatter: &childFormatter}
runtimeFormatter.Line = true
log.Formatter = runtimeFormatter
log.Out = os.Stdout
level, err := logrus.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.Level = level
}
func main() {
flag.Parse()
if err := run(); err != nil {
log.Fatal(err)
}
}
view raw main.go hosted with ❤ by GitHub

Istio VirtualService and CORS

With the RESTful services in the previous post, CORS was handled by Service A. Service A allowed the UI to make cross-origin requests to the backend API’s domain. Since the gRPC Gateway does not directly support Cross-Origin Resource Sharing (CORS) policy, we have offloaded the CORS responsibility to Istio using the reverse proxy’s VirtualService resource’s CorsPolicy configuration. Moving this responsibility makes CORS much easier to manage as YAML-based configuration and part of the Helm chart. See lines 20–28 below.

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: rev-proxy
spec:
hosts:
{{ YOUR_API_HOSTNAME_HERE }}
gateways:
istio-gateway
http:
match:
uri:
prefix: /
route:
destination:
host: rev-proxy.dev.svc.cluster.local
port:
number: 80
weight: 100
corsPolicy:
allowOrigin:
{{ YOUR_UI_HOSTNAME_HERE }}
allowMethods:
OPTIONS
GET
allowCredentials: true
allowHeaders:
"*"

Pillar One: Logs

To paraphrase Jay Kreps on the LinkedIn Engineering Blog, a log is an append-only, totally ordered sequence of records ordered by time. The ordering of records defines a notion of “time” since entries to the left are defined to be older than entries to the right. Logs are a historical record of events that happened in the past. Logs have been around almost as long as computers and are at the heart of many distributed data systems and real-time application architectures.

Go-based Microservice Logging

An effective logging strategy starts with what you log, when you log, and how you log. As part of the platform’s logging strategy, the eight Go-based microservices use Logrus, a popular structured logger for Go, first released in 2014. The platform’s services also implement Banzai Cloud’s logrus-runtime-formatter. These two logging packages give us greater control over what you log, when you log, and how you log information about the services. The recommended configuration of the packages is minimal. Logrus’ JSONFormatter provides for easy parsing by third-party systems and injects additional contextual data fields into the log entries.

func init() {
childFormatter := logrus.JSONFormatter{}
runtimeFormatter := &lrf.Formatter{ChildFormatter: &childFormatter}
runtimeFormatter.Line = true
log.Formatter = runtimeFormatter
log.Out = os.Stdout
level, err := logrus.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.Level = level
}
view raw main.go hosted with ❤ by GitHub

Logrus provides several advantages over Go’s simple logging package, log. For example, log entries are not only for Fatal errors, nor should all verbose log entries be output in a Production environment. Logrus has the capability to log at seven levels: Trace, Debug, Info, Warning, Error, Fatal, and Panic. The log level of the platform’s microservices can be changed at runtime using an environment variable.

Banzai Cloud’s logrus-runtime-formatter automatically tags log messages with runtime and stack information, including function name and line number — extremely helpful when troubleshooting. There is an excellent post on the Banzai Cloud (now part of Cisco) formatter, Golang runtime Logrus Formatter.

Service A log entries as viewed from Amazon CloudWatch Insights

In 2020, Logus entered maintenance mode. The author, Simon Eskildsen (Principal Engineer at Shopify), stated they would not be introducing new features. This does not mean Logrus is dead. With over 18,000 GitHub Stars, Logrus will continue to be maintained for security, bug fixes, and performance. The author states that many fantastic alternatives to Logus now exist, such as Zerolog, Zap, and Apex.

Client-side Angular UI Logging

Likewise, I have enhanced the logging of the Angular UI using NGX Logger. NGX Logger is a simple logging module for angular (currently supports Angular 6+). It allows “pretty print” to the console and allows log messages to be POSTed to a URL for server-side logging. For this demo, the UI will only log to the web browser’s console. Similar to Logrus, NGX Logger supports multiple log levels: Trace, Debug, Info, Warning, Error, Fatal, and Off. However, instead of just outputting messages, NGX Logger allows us to output properly formatted log entries to the browser’s console.

The level of logs output is configured to be dependent on the environment, Production or not Production. Below is an example of the log output from the Angular UI in Chrome. Since the UI’s Docker Image was built with the Production configuration, the log level is set to INFO. You would not want to expose potentially sensitive information in verbose log output to our end-users in Production.

Client-side logs from the platforms’ Angular UI

Controlling logging levels is accomplished by adding the following ternary operator to the app.module.ts file.

imports: [
BrowserModule,
HttpClientModule,
FormsModule,
LoggerModule.forRoot({
level: !environment.production ?
NgxLoggerLevel.DEBUG : NgxLoggerLevel.INFO,
serverLogLevel: NgxLoggerLevel.INFO
})
]
view raw logs.js hosted with ❤ by GitHub

Platform Logs

Based on the platform built, configured, and deployed in part one, you now have access logs from multiple sources.

  1. Amazon DocumentDB: Amazon CloudWatch Audit and Profiler logs;
  2. Amazon MQ: Amazon CloudWatch logs;
  3. Amazon EKS: API server, Audit, Authenticator, Controller manager, and Scheduler CloudWatch logs;
  4. Kubernetes Dashboard: Individual EKS Pod and Replica Set logs;
  5. Kiali: Individual EKS Pod and Container logs;
  6. Fluent Bit: EKS performance, host, dataplane, and application CloudWatch logs;

Fluent Bit

According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.

Running Fluent Bit, the EKS cluster’s performance, host, dataplane, and application logs will also be available in Amazon CloudWatch.

Amazon CloudWatch log groups from the demonstration’s EKS cluster

Within the application log groups, you can access the individual log streams for each reference application’s components.

Amazon CloudWatch log streams from the application log group

Within each CloudWatch log stream, you can view individual log entries.

Amazon CloudWatch log stream for Service A

CloudWatch Logs Insights enables you to interactively search and analyze your log data in Amazon CloudWatch Logs. You can perform queries to help you more efficiently and effectively respond to operational issues. If an issue occurs, you can use CloudWatch Logs Insights to identify potential causes and validate deployed fixes.

Amazon CloudWatch Log Insights — latest errors found in logs for Service F

CloudWatch Logs Insights supports CloudWatch Logs Insights query syntax, a query language you can use to perform queries on your log groups. Each query can include one or more query commands separated by Unix-style pipe characters (|). For example:

fields @timestamp, @message
| filter kubernetes.container_name = "service-f"
and @message like "error"
| sort @timestamp desc
| limit 20

Pillar Two: Metrics

For metrics, we will examine CloudWatch Container Insights, Prometheus, and Grafana. Prometheus and Grafana are industry-leading tools you installed as part of the Istio deployment.

Prometheus

Prometheus is an open source system monitoring and alerting toolkit originally built at SoundCloud circa 2012. Prometheus joined the Cloud Native Computing Foundation (CNCF) in 2016 as the second project hosted after Kubernetes.

Prometheus Graph of container memory usage during load test

According to Istio, the Prometheus addon is a Prometheus server that comes preconfigured to scrape Istio endpoints to collect metrics. You can use Prometheus with Istio to record metrics that track the health of Istio and applications within the service mesh. You can visualize metrics using tools like Grafana and Kiali. The Istio Prometheus addon is intended for demonstration only and is not tuned for performance or security.

The istioctl dashboardcommand provides access to all of the Istio web UIs. With the EKS cluster running, Istio installed, and the reference application platform deployed, access Prometheus using the istioctl dashboard prometheus command from your terminal. You must be logged into AWS from your terminal to connect to Prometheus successfully. If you are not logged in to AWS, you will often see the following error: Error: not able to locate <tool_name> pod: Unauthorized. Since we used the non-production demonstration versions of the Istio Addons, there is no authentication and authorization required to access Prometheus.

According to Prometheus, users select and aggregate time-series data in real-time using a functional query language called PromQL (Prometheus Query Language). The result of an expression can either be shown as a graph, viewed as tabular data in Prometheus’s expression browser, or consumed by external systems through Prometheus’ HTTP API. The expression browser includes a drop-down menu with all available metrics as a starting point for building queries. Shown below are a few PromQL examples that were developed as part of writing this post.

istio_agent_go_info{kubernetes_namespace="dev"}
istio_build{kubernetes_namespace="dev"}
up{alpha_eksctl_io_cluster_name="istio-observe-demo", job="kubernetes-nodes"}
sum by (pod) (rate(container_network_transmit_packets_total{stack="reference-app",namespace="dev",pod=~"service-.*"}[5m]))
sum by (instance) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code="200"})
sum by (response_code) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code!~"200|0"})

Prometheus APIs

Prometheus has both an HTTP API and a Management API. There are many useful endpoints in addition to the Prometheus UI, available at http://localhost:9090/graph. For example, the Prometheus HTTP API endpoint that lists all the command-line configuration flags is available at http://localhost:9090/api/v1/status/flags. The endpoint that lists all the available Prometheus metrics is available at http://localhost:9090/api/v1/label/__name__/values; over 951 metrics in this demonstration.

The Prometheus endpoint that lists many available metrics with HELP and TYPE to explain their function can be found at http://localhost:9090/metrics.

Understanding Metrics

In addition to these endpoints, the standard service level metrics exported by Istio and available via Prometheus can be found in the Istio Standard Metrics documentation. An explanation of many of the metrics available in Prometheus is also found in the cAdvisor README on their GitHub site. As mentioned in this AWS Blog Post, the cAdvisor metrics are also available from the command line using the following commands:

export NODE=$(kubectl get nodes | sed -n '2 p' | awk {'print $1'})
kubectl get --raw "/api/v1/nodes/${NODE}/proxy/metrics/cadvisor"

Observing Metrics

Below is an example graph of the backend microservice containers deployed to EKS. The graph PromQL expression returns the amount of working set memory, including recently accessed memory, dirty memory, and kernel memory (container_memory_working_set_bytes), summed by pod, in megabytes (MB). There was no load on the services during the period displayed.

sum by (pod) (container_memory_working_set_bytes{namespace="dev", container=~"service-.*|rev-proxy|angular-ui"}) / (1024^2)

The container_memory_working_set_bytes metric is the same metric used by the kubectl top command (not container_memory_usage_bytes). Omitting the --containers=true flag will output pod stats versus containers.

> kubectl top pod -n dev --containers=true | \
grep -v istio-proxy | sort -k 4 -r
POD                           NAME          CPU(cores) MEMORY(bytes)
service-d-69d7469cbf-ts4t7 service-d 135m 13Mi
service-d-69d7469cbf-6thmz service-d 156m 13Mi
service-d-69d7469cbf-nl7th service-d 118m 12Mi
service-d-69d7469cbf-fz5bh service-d 118m 12Mi
service-d-69d7469cbf-89995 service-d 136m 11Mi
service-d-69d7469cbf-g4pfm service-d 106m 10Mi
service-h-69576c4c8c-x9ccl service-h 33m 9Mi
service-h-69576c4c8c-gtjc9 service-h 33m 9Mi
service-h-69576c4c8c-bjgfm service-h 45m 9Mi
service-h-69576c4c8c-8fk6z service-h 38m 9Mi
service-h-69576c4c8c-55rld service-h 36m 9Mi
service-h-69576c4c8c-4xpb5 service-h 41m 9Mi
...

In another Prometheus example, the PromQL query expression returns the per-second rate of CPU resources measured in CPU units (1 CPU = 1 AWS vCPU), as measured over the last 5 minutes, per time series in the range vector, summed by the pod. During this period, the backend services were under a consistent, simulated load of 15 concurrent users using hey. Four instances of Service D pods were consuming the most CPU units during this time period.

sum by (pod) (rate(container_cpu_usage_seconds_total{namespace="dev", container=~"service-.*|rev-proxy|angular-ui"}[5m])) * 1000

The container_cpu_usage_seconds_total metric is the same metric used by the kubectl top command. The above PromQL expression multiplies the query results by 1,000 to match the results from kubectl top, shown below.

> kubectl top pod -n dev --sort-by=cpu
NAME                          CPU(cores)   MEMORY(bytes)
service-d-69d7469cbf-6thmz 159m 60Mi
service-d-69d7469cbf-89995 143m 61Mi
service-d-69d7469cbf-ts4t7 140m 59Mi
service-d-69d7469cbf-fz5bh 135m 58Mi
service-d-69d7469cbf-nl7th 132m 61Mi
service-d-69d7469cbf-g4pfm 119m 62Mi
service-g-c7d68fd94-w5t66 59m 58Mi
service-f-7dc8f64799-qj8qv 56m 55Mi
service-c-69fbc964db-knggt 56m 58Mi
service-h-69576c4c8c-8fk6z 55m 58Mi
service-h-69576c4c8c-4xpb5 55m 58Mi
service-g-c7d68fd94-5cdc2 54m 58Mi
...

Limits

Prometheus also exposes container resource limits. For example, the memory limits set on the reference platform’s backend services, displayed in megabytes (MB), using the container_spec_memory_limit_bytes metric. When viewed alongside the real-time resources consumed by the services, these metrics are useful to properly configure and monitor Kubernetes management features such as the Horizontal Pod Autoscaler.

sum by (container) (container_spec_memory_limit_bytes{namespace="dev", container=~"service-.*|rev-proxy|angular-ui"}) / (1024^2) / count by (container) (container_spec_memory_limit_bytes{namespace="dev", container=~"service-.*|rev-proxy|angular-ui"})

Or, memory limits by Pod:

sum by (pod) (container_spec_memory_limit_bytes{namespace="dev"}) / (1024^2)

Cluster Metrics

Prometheus also contains metrics about Istio components, Kubernetes components, and the EKS cluster. For example, the total available memory in gigabytes (GB) of each of the five m5.large EC2 worker nodes in the istio-observe-demo EKS cluster’s managed-ng-1 Managed Node Group.

machine_memory_bytes{alpha_eksctl_io_cluster_name="istio-observe-demo", alpha_eksctl_io_nodegroup_name="managed-ng-1"} / (1024^3)

For total physical cores, use the machine_cpu_physical_core metric, and for vCPU cores use the machine_cpu_cores metric.

Grafana

Grafana describes itself as the leading open source software for time-series analytics. According to Grafana Labs, Grafana allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. You can easily create, explore, and share visually rich, data-driven dashboards. Grafana also allows users to define alert rules for their most important metrics visually. Grafana will continuously evaluate rules and can send notifications.

If you deployed Grafana using the Istio addons process demonstrated in part one of the previous post, access Grafana similar to the other tools:

istioctl dashboard grafana
Grafana Home page

According to Istio, Grafana is an open source monitoring solution used to configure dashboards for Istio. You can use Grafana to monitor the health of Istio and applications within the service mesh. While you can build your own dashboards, Istio offers a set of preconfigured dashboards for all of the most important metrics for the mesh and the control plane. The preconfigured dashboards use Prometheus as the data source.

Below is an example of the Istio Mesh Dashboard, filtered to show the eight backend service workloads running in the dev namespace. During this period, the backend services were under a consistent simulated load of approximately 20 concurrent users using hey. You can observe the p50, p90, and p99 latency of requests to these workloads.

View of the Istio Mesh Dashboard

Dashboards are built from Panels, the basic visualization building blocks in Grafana. Each panel has a query editor specific to the data source (Prometheus in this case) selected. The query editor allows you to write your (PromQL) query. For example, below is the PromQL expression query responsible for the p50 latency Panel displayed in the Istio Mesh Dashboard.

label_join((histogram_quantile(0.50, sum(rate(istio_request_duration_milliseconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)) / 1000) or histogram_quantile(0.50, sum(rate(istio_request_duration_seconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)), "destination_workload_var", ".", "destination_workload", "destination_workload_namespace")

Below is an example of the Istio Workload Dashboard. The dashboard contains three sections: General, Inbound Workloads, and Outbound Workloads. We have filtered outbound traffic from the reference platform’s backend services in the dev namespace.

View of the Istio Workload Dashboard

Below is a different view of the Istio Workload Dashboard, the dashboard’s Inbound Workloads section filtered to a single workload, the gRPC Gateway. The gRPC Gateway accepts incoming traffic from the Istio Ingress Gateway, as shown in the dashboard’s panels.

View of the Istio Workload Dashboard

Grafana provides the ability to Explore a Panel. Explore strips away the dashboard and panel options so that you can focus on the query. Below is an example of the Panel showing a steady stream of TCP-based egress traffic for Service F, based on the istio_tcp_sent_bytes_total metric. Service F consumes messages off on the RabbitMQ queue (Amazon MQ) and writes messages to MongoDB (DocumentDB).

Exploring a Grafana dashboard panel

Istio Performance

You can monitor the resource usage of Istio with the Istio Performance Dashboard.

View of the Istio Performance Dashboard

Additional Dashboards

Grafana provides a site containing official and community-built dashboards, including the above-mentioned Istio dashboards. Importing dashboards into your Grafana instance is as simple as copying the dashboard URL or the ID provided from the Grafana dashboard site and pasting it into the dashboard import option of your Grafana instance. However, be aware that not every Kubernetes dashboard in Grafan’s site is compatible with your specific version of Kubernetes, Istio, or EKS, nor relies on Prometheus as a data source. As a result, you might have to test and tweak imported dashboards to get them working.

Below is an example of an imported community dashboard, Kubernetes cluster monitoring (via Prometheus) by Instrumentisto Team (dashboard ID 315).

Alerting

An effective observability strategy must include more than just the ability to visualize results. An effective strategy must also detect anomalies and notify (alert) the appropriate resources or directly resolve incidents. Grafana, like Prometheus, is capable of alerting and notification. You visually define alert rules for your critical metrics. Then, Grafana will continuously evaluate metrics against the rules and send notifications when pre-defined thresholds are breached.

Prometheus supports multiple popular notification channels, including PagerDuty, HipChat, Email, Kafka, and Slack. Below is an example of a Prometheus notification channel that sends alert notifications to a Slack support channel.

Below is an example of an alert based on an arbitrarily high CPU usage of 300 millicpu or millicores (m). When the CPU usage of a single pod goes above that value for more than 3 minutes, an alert is sent. The high CPU usage could be caused by the Horizontal Pod Autoscaler not functioning, or the HPA has reached its maxReplicas limit, or there are not enough resources available within the cluster’s existing worker nodes to schedule additional pods.

Triggered by the alert, Prometheus sends detailed notifications to the designated Slack channel.

Amazon CloudWatch Container Insights

Lastly, in the category of Metrics, Amazon CloudWatch Container Insights collects, aggregates, summarizes, and visualizes metrics and logs from your containerized applications and microservices. CloudWatch alarms can be set on metrics that Container Insights collects. Container Insights is available for Amazon Elastic Container Service (Amazon ECS), including Fargate, Amazon EKS, and Kubernetes platforms on Amazon EC2.

In Amazon EKS, Container Insights uses a containerized version of the CloudWatch agent to discover all running containers in a cluster. It then collects performance data at every layer of the performance stack. Container Insights collects data as performance log events using the embedded metric format. These performance log events are entries that use a structured JSON schema that enables high-cardinality data to be ingested and stored at scale.

In the previous post, we also installed CloudWatch Container Insights monitoring for Prometheus, which automates the discovery of Prometheus metrics from containerized systems and workloads.

Below is an example of a basic Performance Monitoring CloudWatch Container Insights Dashboard. The dashboard is filtered to the dev namespace of the EKS cluster, where the reference application platform is running. During this period, the backend services were put under a simulated load using hey. As the load on the application increased, the ‘Number of Pods’ increased from 20 pods to 56 pods based on the container’s requested resources and HPA configurations. There is also a CloudWatch Alarm, shown on the right of the screen. An alarm was triggered for an arbitrarily high level of network transmission activity.

Next is an example of Container Insights’ Container Map view in CPU mode. You see a visual representation of the dev namespace, with each of the backend service’s Service and Deployment resources shown.

Below, there is a warning icon indicating an Alarm on the cluster was triggered.

Lastly, CloudWatch Insights allows you to jump from the CloudWatch Insights to the CloudWatch Log Insights console. CloudWatch Insights will also write the CloudWatch Insights query for you. Below, we went from the Service D container metrics view in the CloudWatch Insights Performance Monitoring console directly to the CloudWatch Log Insights console with a query, ready to run.

Pillar 3: Traces

According to the Open Tracing website, distributed tracing, also called distributed request tracing, is used to profile and monitor applications, especially those built using a microservices architecture. Distributed tracing helps pinpoint where failures occur and what causes poor performance.

Header Propagation

According to Istio, header propagation may be accomplished through client libraries, such as Zipkin or Jaeger. Header propagation may also be accomplished manually, referred to as trace context propagation, documented in the Distributed Tracing Task. Alternately, Istio proxies can automatically send spans. Applications need to propagate the appropriate HTTP headers so that when the proxies send span information, the spans can be correlated correctly into a single trace. To accomplish this, an application needs to collect and propagate the following headers from the incoming request to any outgoing requests.

  • x-request-id
  • x-b3-traceid
  • x-b3-spanid
  • x-b3-parentspanid
  • x-b3-sampled
  • x-b3-flags
  • x-ot-span-context

The x-b3 headers originated as part of the Zipkin project. The B3 portion of the header is named for the original name of Zipkin, BigBrotherBird. Passing these headers across service calls is known as B3 propagation. According to Zipkin, these attributes are propagated in-process and eventually downstream (often via HTTP headers) to ensure all activity originating from the same root are collected together.

To demonstrate distributed tracing with Jaeger and Zipkin, the gRPC Gateway passes the b3 headers. While the RESTful JSON-based services passed these headers in the HTTP request object, with gRPC, the heders are passed in the gRPC Context object. The following code has been added to the gRPC Gateway. The Istio sidecar proxy (Envoy) generates the initial headers, which are then propagated throughout the service call chain. It is critical only to propagate the headers present in the downstream request with values, as the code below does.

func injectHeadersIntoMetadata(ctx context.Context, req *http.Request) metadata.MD {
headers := []string{
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context"}
var pairs []string
for _, h := range headers {
if v := req.Header.Get(h); len(v) > 0 {
pairs = append(pairs, h, v)
}
}
return metadata.Pairs(pairs)
}
view raw main.go hosted with ❤ by GitHub

Below, in the CloudWatch logs, we see an example of the HTTP request headers recorded in a log message for Service A. The b3 headers are propagated from the gRPC Gateway reverse proxy to gRPC-based Go services. Header propagation ensures a complete distributed trace across the entire service call chain.

CloudWatch Log Insights Console showing Service A’s log entries

Headers propagated from Service A are shown below. Note the b3 headers propagated from the gRPC Gateway reverse proxy.

{
"function": "callGrpcService",
"level": "debug",
"line": "84",
"msg": "headersOut: map[:
authority:[service-a.dev.svc.cluster.local:50051]
content-type:[application/grpc]
grpcgateway-accept:[application/json, text/plain, */*]
grpcgateway-accept-language:[en-US,en;q=0.9]
grpcgateway-content-type:[application/json]
grpcgateway-origin:[https://ui.example-api.com]
grpcgateway-referer:[https://ui.example-api.com/]
grpcgateway-user-agent:[Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36]
user-agent:[grpc-go/1.39.0]
x-b3-parentspanid:[3b30be08b7a6bad0]
x-b3-sampled:[1]
x-b3-spanid:[c1f63e34996770c9]
x-b3-traceid:[7b084bbca0bade97bdc76741c3973ed6]
x-envoy-attempt-count:[1]
x-forwarded-client-cert:[By=spiffe://cluster.local/ns/dev/sa/default;Hash=9c02df616b245e7ada5394db109cb1fa4086c08591e668e5a67fc3e0520713cf;Subject=\"\";URI=spiffe://cluster.local/ns/dev/sa/default]
x-forwarded-for:[73.232.228.42,192.168.63.73, 127.0.0.6]
x-forwarded-host:[api.example-api.com]
x-forwarded-proto:[http]
x-request-id:[e83b565f-23ca-9f91-953e-03175bdafaa0]
]",
"time": "2021-07-04T13:54:06Z"
}
view raw logrus-grpc-log.txt hosted with ❤ by GitHub

Jaeger

According to their website, Jaeger, inspired by Dapper and OpenZipkin, is a distributed tracing system released as open source by Uber Technologies. Jaeger is used for monitoring and troubleshooting microservices-based distributed systems, including distributed context propagation, distributed transaction monitoring, root cause analysis, service dependency analysis, and performance and latency optimization. The Jaeger website contains a helpful overview of Jaeger’s architecture and general tracing-related terminology.

If you deployed Jaeger using the Istio addons process demonstrated in part one of the previous post, access Jaeger similar to the other tools:

istioctl dashboard jaeger

Below are examples of the Jaeger UI’s Search view, displaying the results of a search for the Angular UI and the Istio Ingress Gateway services over a period of time. We see a timeline of traces across the top with a list of trace results below. As discussed on the Jaeger website, a trace is composed of spans. A span represents a logical unit of work in Jaeger that has an operation name. A trace is an execution path through the system and can be thought of as a directed acyclic graph (DAG) of spans. If you have worked with systems like Apache Spark, you are probably already familiar with the concept of DAGs.

Latest Angular UI traces
Latest Istio Ingress Gateway traces

Below is a detailed view of a single trace in Jaeger’s Trace Timeline mode. The 16 spans encompass nine of the reference platform’s components: seven backend services, gRPC Gateway, and Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 195.49 ms. The root span in the trace is the Istio Ingress Gateway. The Angular UI, loaded in the end user’s web browser, calls gRPC Gateway via the Istio Ingress Gateway. From there, we see the expected flow of our service-to-service IPC. Service A calls Services B and Service C. Service B calls Service E, which calls Service G and Service H.

In this demonstration, traces are not instrumented to span the RabbitMQ message queue nor MongoDB. You will not see a trace that includes a call from Service D to Service F via the RabbitMQ.

Detailed view of an Istio Ingress Gateway distributed trace

The visualization of the trace’s timeline demonstrates the synchronous nature of the reference platform’s service-to-service IPC instead of the asynchronous nature of the decoupled communications using the RabbitMQ messaging queue. Service A waits for each service in its call chain to respond before returning its response to the requester.

Within Jaeger’s Trace Timeline view, you have the ability to drill into a single span, which contains additional metadata. The span’s metadata includes the API endpoint URL being called, HTTP method, response status, and several other headers.

Detailed view of an Istio Ingress Gateway distributed trace

A Trace Statistics view is also available.

Trace statistics for an Istio Ingress Gateway distributed trace

Additionally, Jaeger has an experimental Trace Graph mode that displays a graph view of the same trace.

Jaeger also includes a Compare Trace feature and two dependency views: Force-Directed Graph and DAG. I find both views rather primitive compared to Kiali. Lacking access to Kiali, the views are marginally useful as a dependency graph.

Zipkin

Zipkin is a distributed tracing system, which helps gather timing data needed to troubleshoot latency problems in service architectures. According to a 2012 post on Twitter’s Engineering Blog, Zipkin started as a project during Twitter’s first Hack Week. During that week, they implemented a basic version of the Google Dapper paper for Thrift.

Results of a search for the latest traces in Zipkin

Zipkin and Jaeger are very similar in terms of capabilities. I have chosen to focus on Jaeger in this post as I prefer it over Zipkin. If you want to try Zipkin instead of Jaeger, you can use the following commands to remove Jaeger and install Zipkin from the Istio addons extras directory. In part one of the post, we did not install Zipkin by default when we deployed the Istio addons. Be aware that running both tools simultaneously in the same Kubernetes cluster will cause unpredictable tracing results.

kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/jaeger.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/extras/zipkin.yaml

Access Zipkin similar to the other observability tools:

istioctl dashboard zipkin

Below is an example of a distributed trace visualized in Zipkin’s UI, containing 16 spans, similar to the trace visualized in Jaeger, shown above. The spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of ~221 ms.

Detailed view of a distributed trace in Zipkin

Zipkin can also visualize a dependency graph based on the distributed trace. Below is an example of a traffic simulation over a 24-hour period, showing network traffic flowing between the reference platform’s components, illustrated as a dependency graph.

Zipkin‘s dependency graph showing traffic over a 24-hour period

Kiali: Microservice Observability

According to their website, Kiali is a management console for an Istio-based service mesh. It provides dashboards and observability, and lets you operate your mesh with robust configuration and validation capabilities. It shows the structure of a service mesh by inferring traffic topology and displaying the mesh’s health. Kiali provides detailed metrics, powerful validation, Grafana access, and strong integration for distributed tracing with Jaeger.

If you deployed Kaili using the Istio addons process demonstrated in part one of the previous post, access Kiali similar to the other tools:

istioctl dashboard kaili

For improved security, install the latest version of Kaili using the customizable install mentioned in Istio’s documentation. Using Kiali’s Install via Kiali Server Helm Chart option adds token-based authentication, similar to the Kubernetes Dashboard.

Kiali’s Overview tab provides a global view of all namespaces within the Istio service mesh and the number of applications within each namespace.

The Graph tab in the Kiali UI represents the components running in the Istio service mesh. Below, filtering on the cluster’s dev Namespace, we can observe that Kiali has mapped 11 applications (workloads), 11 services, and 24 edges (a graph term). Specifically, we see the Istio Ingres Proxy at the edge of the service mesh, gRPC Gateway, Angular UI, and eight backend services, all with their respective Envoy proxy sidecars that are taking traffic (Service F did not take any direct traffic from another service in this example), the external DocumentDB egress point, and the external Amazon MQ egress point. Note how service-to-service traffic flows with Istio, from the service to its sidecar proxy, to the other service’s sidecar proxy, and finally to the service.

Kiali allows you to zoom in and focus on a single component in the graph and its individual metrics.

Kiali can also display average request times and other metrics for each edge in the graph (communication between two components). Kaili can even show those metrics over a given period of time, using Kiali’s Replay feature, shown below.

The Applications tab lists all the applications, their namespace, and labels.

You can drill into an individual component on both the Applications and Workloads tabs and view additional details. Details include the overall health, Pods, and Istio Config status. Below is an overview of the Service A workload in the dev Namespace.

The Workloads detailed view also includes inbound and outbound network metrics. Below is an example of the outbound for Service A in the dev Namespace.

Kiali also gives you access to the individual pod’s container logs. Although log access is not as user-friendly as other log sources discussed previously, having logs available alongside metrics (integration with Grafana), traces (integration with Jaeger), and mesh visualization, all in Kiali, can act as a very effective single pane of glass for observability.

Kiali also has an Istio Config tab. The Istio Config tab displays a list of all of the available Istio configuration objects that exist in the user’s environment.

You can use Kiali to configure and manage the Istio service mesh and its installed resources. Using Kiali, you can actually modify the deployed resources, similar to using the kubectl edit command.

Oftentimes, I find Kiali to be my first stop when troubleshooting platform issues. Once I identify the specific components or communication paths having issues, I then review the specific application logs and Prometheus metrics through the Grafana dashboard.

Tear Down

To tear down the EKS cluster, DocumentDB cluster, and Amazon MQ broker, use the following commands:

# EKS cluster
eksctl delete cluster --name $CLUSTER_NAME
# Amazon MQ
aws mq list-brokers | jq -r '.BrokerSummaries[] | .BrokerId'aws mq delete-broker --broker-id {{ your_broker_id }}
# DocumentDB
aws docdb describe-db-clusters \
| jq -r '.DBClusters[] | .DbClusterResourceId'aws docdb delete-
db-cluster \
--db-cluster-identifier {{ your_cluster_id }}

Conclusion

In this post, we explored a set of popular open source observability tools, easily integrated with the Istio service mesh. These tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset using Fluent Bit for log processing and forwarding to Amazon CloudWatch Container Insights. Using these tools, we successfully observed a gRPC-based, distributed reference application platform deployed to Amazon EKS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

Leave a comment

Continuous Integration and Deployment of Docker Images using GitHub Actions

According to GitHub, GitHub Actions allows you to automate, customize, and execute your software development workflows right in your repository. You can discover, create, and share actions to perform any job you would like, including continuous integration (CI) and continuous deployment (CD), and combine actions in a completely customized workflow.

This brief post will examine a simple use case for GitHub Actions — automatically building and pushing a new Docker image to Docker Hub. A GitHub Actions workflow will be triggered every time a new Git tag is pushed to the GitHub project repository.

GitHub Actions Workflow running, based on the push of a new git tag

GitHub Project Repository

For the demonstration, we will be using the public NLP Client microservice GitHub project repository. The NLP Client, written in Go, is part of five microservices that comprise the Natural Language Processing (NLP) API. I developed this API to demonstrate architectural principles and DevOps practices. The API’s microservices are designed to be run as a distributed system using container orchestration platforms such as Docker Swarm, Red Hat OpenShift, Amazon ECS, and Kubernetes.

Public NLP Client GitHub project repository

Encrypted Secrets

To push new images to Docker Hub, the workflow must be logged in to your Docker Hub account. GitHub recommends storing your Docker Hub username and password as encrypted secrets, so they are not exposed in your workflow file. Encrypted secrets allow you to store sensitive information as encrypted environment variables in your organization, repository, or repository environment. The secrets that you create will be available to use in GitHub Actions workflows. To allow the workflow to log in to Docker Hub, I created two secrets, DOCKERHUB_USERNAME and DOCKERHUB_PASSWORD using my organization’s credentials, which I then reference in the workflow.

Actions Secrets shown in the GitHub project’s Secrets tab

GitHub Actions Workflow

According to GitHub, a workflow is a configurable automated process made up of one or more jobs. You must create a YAML file to define your workflow configuration. GitHub contains many searchable code examples you can use to bootstrap your workflow development. For this demonstration, I started with the example shown in the GitHub Actions Guide, Publishing Docker images, and modified it to meet my needs. Workflow files are checked into the project’s repository within the .github/workflows directory.https://itnext.io/media/0e27d26012167bab83def6ef3595a74f

Workflow Development

Visual Studio Code (VS Code) is an excellent, full-featured, and free IDE for software development and writing Infrastructure as Code (IaC). VS Code has a large ecosystem of extensions, including extensions for GitHub Actions. Currently, I am using the GitHub Actions extension, cschleiden.vscode-github-actions, by Christopher Schleiden.

The extension features auto-complete, as shown below in the GitHub Actions workflow YAML file.

Auto-complete example using the GitHub Actions extension

Git Tags

The demonstration’s workflow is designed to be triggered when a new Git tag is pushed to the NLP Client project repository. Using the workflow, you can perform normal pushes (git push) to the repository without triggering the workflow. For example, you would not typically want to trigger a new image build and push when updating the project’s README file. Thus, we use the new Git tag as the workflow trigger.

Pushing a new tag to GitHub
Git tags as shown in the GitHub project repository

For consistency, I also designed the workflow to be triggered only when the format of the Git tag follows the common Semantic Versioning (SemVer) convention of version number MAJOR.MINOR.PATCH (v*.*.*).

on:
push:
tags:
- 'v*.*.*'

Also, following common Docker conventions in the workflow, the Git tag (e.g., v1.2.3) is truncated to remove the letter ‘v’ and used as the tag for the Docker image (e.g., 1.2.3). In the workflow, theGITHUB_REF:11 portion of the command truncates the Git tag reference of refs/tags/v1.2.3 to just 1.2.3.

run: echo "RELEASE_VERSION=${GITHUB_REF:11}" >> $GITHUB_ENV

Workflow Run

Pushing the Git tag triggers the workflow to run automatically, as seen in the Actions tab.

GitHub Actions Workflow running, based on the push of a new git tag
GitHub Actions Workflow running, based on the push of a new git tag

Detailed logs show you how each step in the workflow was processed.

GitHub Actions Workflow running, based on the push of a new git tag

The example below shows that the workflow has successfully built and pushed a new Docker image to Docker Hub for the NLP Client microservice.

Completed GitHub Actions Workflow run

Failure Notifications

You can choose to receive a notification when a workflow fails. GitHub Actions notifications are a configurable option found in the GitHub account owner’s Settings tab.

Example email notification of workflow run failure

Status Badge

You can display a status badge in your repository to indicate the status of your workflows. The badge can be added as Markdown to your README file.

Public NLP Client GitHub project’s README displaying the status badge

Docker Hub

As a result of the successful completion of the workflow, we now have a new image tagged as 1.2.3 in the NLP Client Docker Hub repository: garystafford/nlp-client.

NLP Client Docker Hub repository showing new image tag

Conclusion

In this brief post, we saw a simple example of how GitHub Actions allows you to automate, customize, and execute your software development workflows right in your GitHub repository. We can easily extend this post’s GitHub Actions example to include updating the service’s Kubernetes Deployment resource file to the latest image tag in Docker Hub. Further, we can trigger a GitOps workflow with tools such as Weaveworks’ Flux or Argo CD to deploy the revised workload to a Kubernetes cluster.

Deployed NLP API as seen from Argo CD

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , ,

Leave a comment

Kubernetes-based Microservice Observability with Istio Service Mesh: Part 2 of 2

In part two of this two-part post, we will continue to explore the set of popular open-source observability tools that are easily integrated with the Istio service mesh. While these tools are not a part of Istio, they are essential to making the most of Istio’s observability features. The tools include Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We will round out the toolset with the addition of Fluent Bit for log processing and aggregation. We will observe a distributed, microservices-based reference application platform deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster using these tools. The platform, running on EKS, will use Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.

Kiali Management Console showing reference application platform

Observability

The O’Reilly book, Distributed Systems Observability, by Cindy Sridharan, describes The Three Pillars of Observability in Chapter 4: “Logs, metrics, and traces are often known as the three pillars of observability. While plainly having access to logs, metrics, and traces doesn’t necessarily make systems more observable, these are powerful tools that, if understood well, can unlock the ability to build better systems.

Reference Application Platform

To demonstrate Istio’s observability tools, we deployed a reference application platform to EKS on AWS. I have developed the application platform to demonstrate different Kubernetes platforms, such as EKS, GKE, AKS, and concepts such as service mesh, API management, observability, DevOps, and Chaos Engineering. The platform comprises a backend containing eight Go-based microservices, labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, four MongoDB databases, and one RabbitMQ message queue. The platform and all its source code are open-sourced on GitHub.

Reference Application Platform’s Angular-based UI

The reference application platform is designed to generate HTTP-based service-to-service, TCP-based service-to-database, and TCP-based service-to-queue-to-service IPC (inter-process communication). For example, Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message to a RabbitMQ queue, which Service F consumes message off on the RabbitMQ queue, and writes to MongoDB, and so on. The platform’s distributed service communications can be observed using Istio’s observability tools when the system is deployed to a Kubernetes cluster running the Istio service mesh.

High-level architecture of Reference Application Platform

Part Two

In part one of the post, we configured and deployed the reference application platform to an Amazon EKS development-grade cluster on AWS. The reference application, running on EKS, communicates with two external systems, Amazon DocumentDB (with MongoDB compatibility) and Amazon MQ.

Deployed Reference Application Platform as seen from Argo CD

In part two of the post, we will explore each of the observability tools we installed in greater detail. We will understand how each tool contributes to the three pillars of observability: logs, metrics, and traces.

Logs, metrics, and traces are often known as the three pillars of observability.
 — Cindy Sridharan

Pillar One: Logs

To paraphrase Jay Kreps on the LinkedIn Engineering Blog, a log is an append-only, totally-ordered sequence of records ordered by time. The ordering of records defines a notion of “time” since entries to the left are defined to be older than entries to the right. Logs are a historical record of events that happened in the past. Logs have been around almost as long as computers and are at the heart of many distributed data systems and real-time application architectures.

Go-based Microservice Logging

An effective logging strategy starts with what you log, when you log, and how you log. As part of our logging strategy, the eight Go-based microservices use Logrus, a popular structured logger for Go first released in 2014. The microservices also implement Banzai Cloud’s logrus-runtime-formatter. There is an excellent article on the formatter, Golang runtime Logrus Formatter. These two logging packages give us greater control over what you log, when you log, and how you log information about our microservices. The recommended configuration of the packages is minimal.

func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}

Logrus provides several advantages over Go’s simple logging package, log. For example, log entries are not only for Fatal errors, nor should all verbose log entries be output in a Production environment. The post’s microservices are taking advantage of Logrus’ ability to log at seven levels: Trace, Debug, Info, Warning, Error, Fatal, and Panic. I have also variabilized the log level, allowing it to be easily changed in the Kubernetes Deployment resource at deploy-time.

The microservices also take advantage of Banzai Cloud’s logrus-runtime-formatter. The Banzai formatter automatically tags log messages with runtime and stack information, including function name and line number; extremely helpful when troubleshooting. I am also using Logrus’ JSON formatter.

Service A log entries in CloudWatch Insights

In 2020, Logus entered maintenance mode. The author, Simon Eskildsen (Principal Engineer at Shopify), stated they will not be introducing new features. This does not mean Logrus is dead. With over 18,000 GitHub Stars, Logrus will continue to be maintained for security, bug fixes, and performance. The author states that many fantastic alternatives to Logus now exist, such as Zerolog, Zap, and Apex.

Client-side Angular UI Logging

Likewise, I have enhanced the logging of the Angular UI using NGX Logger. NGX Logger is a simple logging module for angular (currently supports Angular 6+). It allows “pretty print” to the console and allows log messages to be POSTed to a URL for server-side logging. For this demo, the UI will only log to the web browser’s console. Similar to Logrus, NGX Logger supports multiple log levels: Trace, Debug, Info, Warning, Error, Fatal, and Off. However, instead of just outputting messages, NGX Logger allows us to output properly formatted log entries to the browser’s console.

The level of logs output is configured to be dependent on the environment, Production or not Production. Below is an example of the log output from the Angular UI in Chrome. Since the UI’s Docker Image was built with the Production configuration, the log level is set to INFO. You would not want to expose potentially sensitive information in verbose log output to our end-users in Production.

Controlling logging levels is accomplished by adding the following ternary operator to the app.module.ts file.

imports: [
BrowserModule,
HttpClientModule,
FormsModule,
LoggerModule.forRoot({
level: !environment.production ?
NgxLoggerLevel.DEBUG : NgxLoggerLevel.INFO,
serverLogLevel: NgxLoggerLevel.INFO
})
],

Platform Logs

Based on the platform built, configured, and deployed in , you now have access logs from multiple sources.

  1. Amazon DocumentDB: Amazon CloudWatch Audit and Profiler logs;
  2. Amazon MQ: Amazon CloudWatch logs;
  3. Amazon EKS: API server, Audit, Authenticator, Controller manager, and Scheduler CloudWatch logs;
  4. Kubernetes Dashboard: Individual EKS Pod and Replica Set logs;
  5. Kiali: Individual EKS Pod and Container logs;
  6. Fluent Bit: EKS performance, host, dataplane, and application CloudWatch logs;

Fluent Bit

According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open-source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.

With Fluent Bit, deployed in part one, the EKS cluster’s performance, host, dataplane, and application logs will also be available in Amazon CloudWatch.

Within the application log groups, you have access to the individual log streams for each reference application’s components.

Within each CloudWatch log stream, you can view individual log entries.

CloudWatch Logs Insights enables you to interactively search and analyze your log data in Amazon CloudWatch Logs. You can perform queries to help you more efficiently and effectively respond to operational issues. If an issue occurs, you can use CloudWatch Logs Insights to identify potential causes and validate deployed fixes.

CloudWatch Logs Insights supports CloudWatch Logs Insights query syntax, a query language you can use to perform queries on your log groups. Each query can include one or more query commands separated by Unix-style pipe characters (|). For example:

fields @timestamp, @message
| filter kubernetes.container_name = "service-f"
and @message like "error"
| sort @timestamp desc
| limit 20

Pillar Two: Metrics

For metrics, we will examine CloudWatch Container Insights, Prometheus, and Grafana. Prometheus and Grafana are industry-leading tools you installed as part of the Istio deployment.

Prometheus

Prometheus is an open-source systems monitoring and alerting toolkit originally built at SoundCloud circa 2012. Prometheus joined the Cloud Native Computing Foundation (CNCF) in 2016 as the second project hosted after Kubernetes.

According to Istio, the Prometheus addon is a Prometheus server that comes preconfigured to scrape Istio endpoints to collect metrics. You can use Prometheus with Istio to record metrics that track the health of Istio and applications within the service mesh. You can visualize metrics using tools like Grafana and Kiali. The Istio Prometheus addon is intended for demonstration only and is not tuned for performance or security.

The istioctl dashboardcommand provides access to all of the Istio web UIs. With the EKS cluster running, Istio installed, and the reference application platform deployed, access Prometheus using the istioctl dashboard prometheus command from your terminal. You must be logged into AWS from your terminal to connect to Prometheus successfully. If you are not logged in to AWS, you will often see the following error: Error: not able to locate <tool_name> pod: Unauthorized. Since we used the non-production demonstration versions of the Istio Addons, there is no authentication and authorization required to access Prometheus.

According to Prometheus, users select and aggregate time-series data in real-time using a functional query language called PromQL (Prometheus Query Language). The result of an expression can either be shown as a graph, viewed as tabular data in Prometheus’s expression browser, or consumed by external systems through Prometheus’ HTTP API. The expression browser includes a drop-down menu with all available metrics as a starting point for building queries. Shown below are a few PromQL examples that were developed as part of writing this post.

istio_agent_go_info{kubernetes_namespace="dev"}
istio_build{kubernetes_namespace="dev"}
up{alpha_eksctl_io_cluster_name="istio-observe-demo", job="kubernetes-nodes"}
sum by (pod) (rate(container_network_transmit_packets_total{stack="reference-app",namespace="dev",pod=~"service-.*"}[5m]))
sum by (instance) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code="200"})
sum by (response_code) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code!~"200|0"})

Prometheus APIs

Prometheus has both an HTTP API and a Management API. There are many useful endpoints in addition to the Prometheus UI, available at http://localhost:9090/graph. For example, the Prometheus HTTP API endpoint that lists all the command-line configuration flags is available at http://localhost:9090/api/v1/status/flags. The endpoint that lists all the available Prometheus metrics is available at http://localhost:9090/api/v1/label/__name__/values; a total of 951 metrics in this demonstration!

The Prometheus endpoint that lists many available metrics with HELP and TYPE to explain their function is found at http://localhost:9090/metrics.

Understanding Metrics

In addition to these endpoints, the standard service level metrics exported by Istio and available via Prometheus are found in the Istio Standard Metrics documentation. An explanation of many of the metrics available via Prometheus are also found in the cAdvisor README on their GitHub site. As mentioned in this AWS Blog Post, the cAdvisor metrics are also available from the command line using the following commands:

export NODE=$(kubectl get nodes | sed -n '2 p') | awk {'print $1'}
kubectl get --raw "/api/v1/nodes/${NODE}/proxy/metrics/cadvisor"

Observing Metrics

Below is an example graph of the backend microservice containers deployed to EKS. The graph PromQL expression returns the amount of working set memory, including recently accessed memory, dirty memory, and kernel memory (container_memory_working_set_bytes), summed by pod, in megabytes (MB). There was no load on the services during the period displayed.

sum by (pod) (container_memory_working_set_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2)

The container_memory_working_set_bytes metric is the same metric used by the kubectl top command (not container_memory_usage_bytes).

> kubectl top pod -n dev --containers=true --use-protocol-buffer
POD                          NAME          CPU(cores)   MEMORY(bytes)
service-a-546fbd558d-28jlm service-a 1m 6Mi
service-a-546fbd558d-2lcsg service-a 1m 6Mi
service-b-545c85df9-dl9h8 service-b 1m 6Mi
service-b-545c85df9-q99xm service-b 1m 5Mi
service-c-58996574-58wd8 service-c 1m 7Mi
service-c-58996574-6q7n4 service-c 1m 7Mi
service-d-867796bb47-87ps5 service-d 1m 6Mi
service-d-867796bb47-fh6wl service-d 1m 6Mi
...

In another Prometheus example, the PromQL query expression returns the per-second rate of CPU resources measured in CPU units (1 CPU = 1 AWS vCPU), as measured over the last 5 minutes, per time series in the range vector, summed by the pod. During this period, the backend services were under a consistent, simulated load of 25 concurrent users using hey. The four Service D pods were consuming the most CPU units during this time period.

sum by (pod) (rate(container_cpu_usage_seconds_total{image=~"registry.hub.docker.com/garystafford/.*"}[5m])) * 1000

The container_cpu_usage_seconds_total metric is the same metric used by the kubectl top command. The above PromQL expression multiplies the query results by 1,000 to match the results from kubectl top, shown below.

> kubectl top pod -n dev --containers=true --use-protocol-buffer
POD                          NAME          CPU(cores)   MEMORY(bytes)
service-a-546fbd558d-28jlm service-a 25m 9Mi
service-a-546fbd558d-2lcsg service-a 27m 8Mi
service-b-545c85df9-dl9h8 service-b 29m 11Mi
service-b-545c85df9-q99xm service-b 23m 8Mi
service-c-58996574-c8hkn service-c 62m 9Mi
service-c-58996574-kx895 service-c 55m 8Mi
service-d-867796bb47-87ps5 service-d 285m 12Mi
service-d-867796bb47-9ln7p service-d 226m 11Mi
...

Limits

Prometheus also exposes container resource limits. For example, the memory limits set on the reference platform’s backend services, displayed in megabytes (MB), using the container_spec_memory_limit_bytes metric. When viewed alongside the real-time resources consumed by the services, these metrics are useful to properly configure and monitor Kubernetes management features such as the Horizontal Pod Autoscaler.

sum by (container) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2) / count by (container) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"})

Or, memory limits by Pod:

sum by (pod) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2)

Cluster Metrics

Prometheus also contains metrics about Istio components, Kubernetes components, and the EKS cluster. For example, the total memory in gigabytes (GB) of each m5.large EC2 worker nodes in the istio-observe-demo EKS cluster’s managed-ng-1 Managed Node Group.

machine_memory_bytes{alpha_eksctl_io_cluster_name="istio-observe-demo", alpha_eksctl_io_nodegroup_name="managed-ng-1"} / (1024^3)

For total physical cores, use the machine_cpu_physical_core metric, and for vCPU cores use the machine_cpu_cores metric.

Grafana

Grafana describes itself as the leading open-source software for time-series analytics. According to Grafana Labs, Grafana allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. You can easily create, explore, and share visually rich, data-driven dashboards. Grafana also allows users to visually define alert rules for their most important metrics. Grafana will continuously evaluate rules and can send notifications.

If you deployed Grafana using the Istio addons process demonstrated in part one of the post, access Grafana similar to the other tools:

istioctl dashboard grafana

According to Istio, Grafana is an open-source monitoring solution used to configure dashboards for Istio. You can use Grafana to monitor the health of Istio and applications within the service mesh. While you can build your own dashboards, Istio offers a set of preconfigured dashboards for all of the most important metrics for the mesh and the control plane. The preconfigured dashboards use Prometheus as the data source.

Below is an example of the Istio Mesh Dashboard, filtered to show the eight backend services workloads running in the dev namespace. During this period, the backend services were under a consistent simulated load of approximately 20 concurrent users using hey. You can observe the p50, p90, and p99 latency of requests to these workloads.

Dashboards are built from Panels, the basic visualization building blocks in Grafana. Each panel has a query editor specific to the data source (Prometheus in this case) selected. The query editor allows you to write your (PromQL) query. Below is the PromQL expression query responsible for the p50 latency Panel displayed in the Istio Mesh Dashboard.

 label_join((histogram_quantile(0.50, sum(rate(istio_request_duration_milliseconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)) / 1000) or histogram_quantile(0.50, sum(rate(istio_request_duration_seconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)), "destination_workload_var", ".", "destination_workload", "destination_workload_namespace")

Below is an example of the Outbound Workloads section of the Istio Workload Dashboard. The complete dashboard contains three sections: General, Inbound Workloads, and Outbound Workloads. Here we have filtered the on reference platform’s backend services in the dev namespace.

Here is a different view of the Istio Workload Dashboard, the dashboard’s Inbound Workloads section filtered to a single workload, Service A, the backend’s edge service. Service A accepts incoming traffic from the Istio Ingress Gateway as shown in the dashboard’s panels.

Grafana provides the ability to Explore a Panel. Explore strips away the dashboard and panel options so that you can focus on the query. It helps you iterate until you have a working query and then think about building a dashboard. Below is an example of the Panel showing the egress TCP traffic, based on the istio_tcp_sent_bytes_total metric, for Service F. Service F consumes messages off on the RabbitMQ queue (Amazon MQ) and writes messages to MongoDB (DocumentDB).

You can monitor the resource usage of Istio with the Performance Dashboard.

Additional Dashboards

Grafana provides a site containing official and community-built dashboards, including the above-mentioned Istio dashboards. Importing dashboards into your Grafana instance is as simple as copying the dashboard URL or the ID provided from the Grafana dashboard site and pasting it into the dashboard import option of your Grafana instance. Be aware that not every Kubernetes dashboard in Grafan’s site is compatible with your specific version of Kubernetes, Istio, or EKS, nor relies on Prometheus as a data source. As a result, you might have to test and tweak imported dashboards to get them working.

Below is an example of an imported community dashboard, Kubernetes cluster monitoring (via Prometheus) by Instrumentisto Team (dashboard ID 315).

Alerting

An effective observability strategy must include more than just the ability to visualize results. An effective strategy must also detect anomalies and notify (alert) the appropriate resources or directly resolve incidents. Grafana, like Prometheus, is capable of alerting and notification. You visually define alert rules for your critical metrics. Grafana will continuously evaluate metrics against the rules and send notifications when pre-defined thresholds are breached.

Prometheus supports multiple popular notification channels, including PagerDuty, HipChat, Email, Kafka, and Slack. Below is an example of a Prometheus notification channel that sends alert notifications to a Slack support channel.

Below is an example of an alert based on an arbitrarily high CPU usage of 300 milliCPUs (m). When the CPU usage of a single pod goes above that value for more than 3 minutes, an alert is sent. The high CPU usage could be caused by the Horizontal Pod Autoscaler not functioning, or the HPA has reached its maxReplicas limit, or there are not enough resources available within the cluster to schedule additional pods.

Triggered by the alert, Prometheus sends detailed notifications to the designated Slack channel.

Amazon CloudWatch Container Insights

Lastly in the category of Metrics, Amazon CloudWatch Container Insights collects, aggregates, and summarizes metrics and logs from your containerized applications and microservices. CloudWatch alarms can be set on metrics that Container Insights collects. Container Insights is available for Amazon Elastic Container Service (Amazon ECS) including Fargate, Amazon EKS, and Kubernetes platforms on Amazon EC2.

In Amazon EKS, Container Insights uses a containerized version of the CloudWatch agent to discover all running containers in a cluster. It then collects performance data at every layer of the performance stack. Container Insights collects data as performance log events using the embedded metric format. These performance log events are entries that use a structured JSON schema that enables high-cardinality data to be ingested and stored at scale.

In part one of the post, we also installed CloudWatch Container Insights monitoring for Prometheus, which automates the discovery of Prometheus metrics from containerized systems and workloads.

Below is an example of a basic Performance Monitoring CloudWatch Container Insights Dashboard. The dashboard is filtered to the dev namespace of the EKS cluster, where the reference application platform is running. During this period, the backend services were put under a simulated load using hey. As the load on the application increases, observe the Number of Pods increases from 19 to 34 pods, based on the Deployment resources and HPA configurations. There is also an Alert, shown on the right of the screen. An alarm was triggered for an arbitrarily high level of network transmission activity.

Next is an example of Container Insights’ Container Map view in Memory mode. You see a visual representation of the dev namespace, with each of the backend service’s Service and Deployment resources shown.

There is a warning icon indicating an Alarm on the cluster was triggered.

Lastly, CloudWatch Insights allows you to jump from the CloudWatch Insights to the CloudWatch Log Insights console. CloudWatch Insights will also write the CloudWatch Insights query for you. Below, we went from the Service D container metrics view in the CloudWatch Insights Performance Monitoring console directly to the CloudWatch Log Insights console with a query, ready to run.

Pillar 3: Traces

According to the Open Tracing website, distributed tracing, also called distributed request tracing, is used to profile and monitor applications, especially those built using a microservices architecture. Distributed tracing helps pinpoint where failures occur and what causes poor performance.

According to Istio, header propagation may be accomplished through client libraries, such as Zipkin or Jaeger. It may also be accomplished manually, referred to as trace context propagation, documented in the Distributed Tracing Task. Istio proxies can automatically send spans. Applications need to propagate the appropriate HTTP headers so that when the proxies send span information, the spans can be correlated correctly into a single trace. To accomplish this, an application needs to collect and propagate the following headers from the incoming request to any outgoing requests.

  • x-request-id
  • x-b3-traceid
  • x-b3-spanid
  • x-b3-parentspanid
  • x-b3-sampled
  • x-b3-flags
  • x-ot-span-context

The x-b3 headers originated as part of the Zipkin project. The B3 portion of the header is named for the original name of Zipkin, BigBrotherBird. Passing these headers across service calls is known as B3 propagation. According to Zipkin, these attributes are propagated in-process and eventually downstream (often via HTTP headers) to ensure all activity originating from the same root are collected together.

To demonstrate distributed tracing with Jaeger and Zipkin, Service A, Service B, and Service E have been modified to pass the b3 headers. These are the three services that make HTTP requests to other upstream services. The following code has been added to propagate the headers from one service to the next. The Istio sidecar proxy (Envoy) generates the first headers. It is critical to only propagate the headers that are present in the downstream request and have a value, as the code below does. Propagating an empty header will break the distributed tracing.

incomingHeaders := []string{
"x-b3-flags",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-spanid",
"x-b3-traceid",
"x-ot-span-context",
"x-request-id",
}
for _, header := range incomingHeaders {
if r.Header.Get(header) != "" {
req.Header.Add(header, r.Header.Get(header))
}
}

Below, the highlighted section of the response payload from a call to Service A’s /api/request-echo endpoint reveals the b3 headers originating from the Istio proxy and passed to Service A.

Jaeger

According to their website, Jaeger, inspired by Dapper and OpenZipkin, is a distributed tracing system released as open source by Uber Technologies. Jaeger is used for monitoring and troubleshooting microservices-based distributed systems, including distributed context propagation, distributed transaction monitoring, root cause analysis, service dependency analysis, and performance and latency optimization. The Jaeger website contains a helpful overview of Jaeger’s architecture and general tracing-related terminology.

If you deployed Jaeger using the Istio addons process demonstrated in part one of the post, access Jaeger similar to the other tools:

istioctl dashboard jaeger

Below is an example of the Jaeger UI’s Search view, displaying the results of a search for the Istio Ingress Gateway service over a period of time. We see a timeline of traces across the top with a list of trace results below. As discussed on the Jaeger website, a trace is composed of spans. A span represents a logical unit of work in Jaeger that has an operation name. A trace is an execution path through the system and can be thought of as a directed acyclic graph (DAG) of spans. If you have worked with systems like Apache Spark, you are probably already familiar with the concept of DAGs.

Below is a detailed view of a single trace in Jaeger’s Trace Timeline mode. The 14 spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 160 ms. The root span in the trace is the Istio Ingress Gateway. The Angular UI, loaded in the end user’s web browser, calls Service A via the Istio Ingress Gateway. From there, we see the expected flow of our service-to-service IPC. Service A calls Services B and Service C. Service B calls Service E, which calls Service G and Service H.

In this demonstration, traces are not instrumented to span the RabbitMQ message queue nor MongoDB. This means you would not see a trace that includes a call from Service D to Service F via the RabbitMQ.

The visualization of the trace’s timeline demonstrates the synchronous nature of the reference platform’s service-to-service IPC instead of the asynchronous nature of the decoupled communications using the RabbitMQ messaging queue. Note how Service A waits for each service in its call chain to respond before returning its response to the requester.

Within Jaeger’s Trace Timeline view, you have the ability to drill into a single span, which contains additional metadata. The span’s metadata includes the API endpoint URL being called, HTTP method, response status, and several other headers.

Jaeger also has an experimental Trace Graph mode, which displays a graph view of the same trace.

Jaeger also includes a Compare Trace feature and two Dependencies views: Force-Directed Graph and DAG. I find both views rather primitive compared to Kiali. Lacking access to Kiali, the views are marginally useful as a dependency graph.

Zipkin

Zipkin is a distributed tracing system, which helps gather timing data needed to troubleshoot latency problems in service architectures. According to a 2012 post on Twitter’s Engineering Blog, Zipkin started as a project during Twitter’s first Hack Week. During that week, they implemented a basic version of the Google Dapper paper for Thrift.

Zipkin and Jaeger are very similar in terms of capabilities. I have chosen to focus on Jaeger in this post as I prefer it over Zipkin. If you want to try Zipkin instead of Jaeger, you can use the following commands to remove Jaeger and install Zipkin from the Istio addons extras directory. In part one of the post, we did not install Zipkin by default when we deployed the Istio addons. Be aware that running both tools at the same time in the same Kubernetes cluster will cause unpredictable tracing results.

kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/jaeger.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/extras/zipkin.yaml

Access Zipkin similar to the other observability tools:

istioctl dashboard zipkin

Below is an example of a distributed trace visualized in Zipkin’s UI, containing 14 spans. This is very similar to the trace visualized in Jaeger, shown above. The spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 154 ms.

Zipkin can also visualize a dependency graph based on the distributed trace. Below is an example of a traffic simulation over a two-minute period, showing network traffic flowing between the reference platform’s components, illustrated as a dependency graph.

Kiali: Microservice Observability

According to their website, Kiali is a management console for an Istio-based service mesh. It provides dashboards, observability, and lets you operate your mesh with robust configuration and validation capabilities. It shows the structure of a service mesh by inferring traffic topology and displaying the mesh’s health. Kiali provides detailed metrics, powerful validation, Grafana access, and strong integration for distributed tracing with Jaeger.

If you deployed Kaili using the Istio addons process demonstrated in part one of the post, access Kiali similar to the other tools:

istioctl dashboard kaili

For improved security, I optionally chose to install the latest version of Kaili using the customizable install mentioned in Istio’s documentation. Using Kiali’s Install via Kiali Server Helm Chart option adds token-based authentication, similar to the Kubernetes Dashboard.

Logging into Kiali, we see the Overview tab, which provides a global view of all namespaces within the Istio service mesh and the number of applications within each namespace.

The Graph tab in the Kiali UI represents the components running in the Istio service mesh. Below, filtering on the cluster’s dev Namespace, we can observe that Kiali has mapped 8 applications (Workloads), 10 services, and 22 edges (a graph term). Specifically, we see the Istio Ingres Proxy at the edge of the service mesh, the Angular UI and eight backend services all with their respective Envoy proxy sidecars that are taking traffic (Service F did not take any direct traffic from another service in this example), the external DocumentDB egress point, and the external Amazon MQ egress point. Finally, note how service-to-service traffic flows, with Istio, from the service to its sidecar proxy, to the other service’s sidecar proxy, and finally to the service.

Below is a similar view of the service mesh, but this time, there are failures between the Istio Ingress Gateway and Service A, shown in red. We can also observe overall metrics for the HTTP traffic, such as the request per second inbound and outbound, total requests, success and error rates, and HTTP status codes.

Kiali allows you to zoom in and focus on a single component in the graph and its individual metrics.

Kiali can also display average request times and other metrics for each edge in the graph (communication between two components). Kaili can even show those metrics over a given period of time, using Kiali’s Replay feature, shown below.

Focusing on the external DocumentDB cluster, Kiali also allows us to view TCP traffic between the four services within the service mesh that connect to the external cluster.

The Applications tab lists all the applications, their namespace, and labels.

You can drill into an individual component on both the Applications and Workloads tabs and view additional details. Details include the overall health, Pods, and Istio Config status. Below is an overview of the Service A workload in the dev Namespace.

The Workloads detailed view also includes inbound and outbound metrics. Below is an example of the outbound request volume, duration, throughput, and size metrics, for Service A in the dev Namespace.

Kiali also gives you access to the individual pod’s container logs. Although log access is not as user-friendly as other log sources discussed previously, having logs available alongside metrics (integration with Grafana), traces (integration with Jaeger), and mesh visualization, all in Kiali, can be very effective as a single source for observability.

Kiali also has an Istio Config tab. The Istio Config tab displays a list of all of the available Istio configuration objects that exist in the user’s environment.

You can use Kiali to configure and manage the Istio service mesh and its installed resources. Using Kiali, you can actually modify the deployed resources, similar to using the kubectl edit command.

Oftentimes, I find Kiali to be my first stop when troubleshooting platform issues. Once I identify the specific components or communication paths having issues, I can query the CloudWatch logs and Prometheus metrics through the Grafana dashboard.

Conclusion

In this two-part post, we explored a set of popular open-source observability tools, easily integrated with the Istio service mesh. These tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset with the addition of Fluent Bit for log processing and forwarding to Amazon CloudWatch Container Insights. Using these tools, we successfully observed a microservices-based, distributed reference application platform deployed to Amazon EKS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

1 Comment

Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS

Introduction

In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.

Amazon EMR

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache SparkHiveHBaseFlinkHudi, and ZeppelinJupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.

Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation, while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache Airflow’s UI

Apache recently announced the release of Airflow 2.0.0 on December 17, 2020. The latest 1.x version of Airflow is 1.10.14, released December 12, 2020. However, at the time of this post, Amazon MWAA was running Airflow 1.10.12, released August 25, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow 1.10.12 documentation.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.

Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the HooksSecretsSensors, and Operators of Airflow codebase’s contrib section.

Getting Started

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/aws-airflow-demo.git

Preliminary Steps

This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.

Configuring Amazon MWAA

The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Amazon MWAA Environment Creation Process

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12.

Amazon MWAA Environment Creation Process

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-. You must also enable Bucket Versioning on the bucket. Specify a dags folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

Amazon MWAA Environment Creation Process

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

Amazon MWAA Environment Creation Process

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

AWS CloudFormation Create Stack Console

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.

AWS CloudFormation Create Stack Console
Amazon MWAA Environment Creation Process

As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.

You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.

Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small class will be sufficient for this demonstration.

Amazon MWAA Environment Creation Process

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Amazon MWAA Environment Creation Process

Airflow Execution Role

As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole and EMR_EC2_DefaultRole. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole and EMR_EC2_DemoRole. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject permission.

Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:RunJobFlow"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::123412341234:role/EMR_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DefaultRole",
"arn:aws:iam::123412341234:role/EMR_DefaultRole"
]
}
]
}

The Airflow service role, created by MWAA, is shown below with the new policy attached.

Airflow Execution Service Role with the new Policy Attached

Final Architecture

Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Demonstration’s Amazon MWAA and Amazon EMR Architecture

Amazon MWAA Environment

The new MWAA Environment will include a link to the Airflow UI.

Amazon MWAA Environment Console

Airflow UI

Using the supplied link, you should be able to access the Airflow UI using your web browser.

Apache Airflow UI

Our First DAG

The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.utils.dates import days_ago
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': 'emr-6.2.0',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw spark_pi_example.py hosted with ❤ by GitHub

Upload the DAG to the Airflow S3 bucket’s dags directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.

aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/

The DAG, spark_pi_example, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

Apache Airflow UI’s Trigger DAG Page

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Apache Airflow UI’s DAG Graph View

Switching to the EMR Console, you should see the single-node EMR cluster being created.

Amazon EMR Console’s Summary tab

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Amazon EMR Console’s Steps tab

Triggering DAGs Programmatically

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.

Below is an example of triggering the spark_pi_example DAG programmatically using Airflow’s trigger_dag CLI command. You will need to replace the WEB_SERVER_HOSTNAME variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME variable assumes only one MWAA environment is returned by jq.

export WEB_SERVER_HOSTNAME="<your_airflow_web_server.us-east-1.airflow.amazonaws.com>"
export ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
export DAG_NAME=spark_pi_example
aws mwaa create-cli-token –name "${ENVIRONMENT_NAME}" | \
export CLI_TOKEN=$(jq -r .CliToken)
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME}"
view raw trigger_dag.sh hosted with ❤ by GitHub

Analytics Job with Airflow

Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py PySpark application. This job should already exist in the processed data S3 bucket.

The DAG, dags/bakery_sales.py, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators and airflow.contrib.sensors packages for EMR.

Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
SPARK_STEPS = [
{
'Name': 'Bakery Sales',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'–deploy-mode',
'cluster',
'–master',
'yarn',
'–conf',
'spark.yarn.submit.waitAppCompletion=true',
's3a://{{ var.value.work_bucket }}/analyze/bakery_sales_ssm.py'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': '{{ var.value.release_label }}',
'LogUri': 's3n://{{ var.value.logs_bucket }}',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.xlarge',
},
],
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': '{{ var.value.emr_ec2_key_pair }}',
},
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh',
}
},
],
'Configurations': [
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
'VisibleToAllUsers': True,
'JobFlowRole': '{{ var.value.job_flow_role }}',
'ServiceRole': '{{ var.value.service_role }}',
'EbsRootVolumeSize': 32,
'StepConcurrencyLevel': 1,
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Analyze Bakery Sales with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw bakery_sales.py hosted with ❤ by GitHub

Import Variables into Airflow UI

First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json file. You will need to update the values for bootstrap_bucket, emr_ec2_key_pair, logs_bucket, and work_bucket. The three S3 buckets should all exist from the previous post.

{
"bootstrap_bucket": "emr-demo-bootstrap-123412341234-us-east-1",
"emr_ec2_key_pair": "emr-demo-123412341234-us-east-1",
"job_flow_role": "EMR_EC2_DemoRole",
"logs_bucket": "emr-demo-logs-123412341234-us-east-1",
"release_label": "emr-6.2.0",
"service_role": "EMR_DemoRole",
"work_bucket": "emr-demo-work-123412341234-us-east-1",
"ec2_subnet_id": "subnet-012abc456efg78900"
}

Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Apache Airflow UI’s Admin > Variables tab

Upload the DAG, dags/bakery_sales.py, to the Airflow S3 bucket, similar to the first DAG.

aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/

The second DAG, bakery_sales, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json.

{
    "airflow_email": "analytics_team@example.com",
    "email_on_failure": false,
    "email_on_retry": false
}

This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Apache Airflow UI’s Trigger DAG Screen

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Amazon EMR Console’s Steps tab

Multi-Step DAG

In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator steps parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator job_flow_overrides parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.

We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.

Below we see a snippet of two of the four Spark submit-job job definitions (steps), which have been moved to a separate JSON file, emr_steps/emr_steps.json.

[
{
"Name": "Movie Ratings",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/movies_avg_ratings_ssm.py",
"–start-date",
"2016-01-01 00:00:00",
"–end-date",
"2016-12-31 23:59:59"
]
}
},
{
"Name": "Stock Volatility",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/stock_volatility_ssm.py",
"–start-date",
"2017-01-01",
"–end-date",
"2018-12-31"
]
}
}
]
view raw emr_steps.json hosted with ❤ by GitHub

Below are the EMR cluster specifications (job_flow_overrides), which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json.

{
"Name": "demo-cluster-airflow",
"ReleaseLabel": "{{ var.value.release_label }}",
"LogUri": "s3n://{{ var.value.logs_bucket }}",
"Applications": [
{
"Name": "Spark"
}
],
"Instances": {
"InstanceFleets": [
{
"Name": "MASTER",
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"Name": "CORE",
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
],
"Ec2SubnetId": "{{ var.value.ec2_subnet_id }}",
"KeepJobFlowAliveWhenNoSteps": false,
"TerminationProtected": false,
"Ec2KeyName": "{{ var.value.emr_ec2_key_pair }}"
},
"BootstrapActions": [
{
"Name": "string",
"ScriptBootstrapAction": {
"Path": "s3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh"
}
}
],
"Configurations": [
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
],
"VisibleToAllUsers": true,
"JobFlowRole": "{{ var.value.job_flow_role }}",
"ServiceRole": "{{ var.value.service_role }}",
"EbsRootVolumeSize": 32,
"StepConcurrencyLevel": 5,
"Tags": [
{
"Key": "Environment",
"Value": "Development"
},
{
"Key": "Name",
"Value": "Airflow EMR Demo Project"
},
{
"Key": "Owner",
"Value": "Data Analytics Team"
}
]
}

Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name), which loads the JSON.

import json
import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
def get_object(key, bucket_name):
"""
Load S3 object as JSON
"""
hook = S3Hook()
content_object = hook.get_key(key=key, bucket_name=bucket_name)
file_content = content_object.get()['Body'].read().decode('utf-8')
return json.loads(file_content)
with DAG(
dag_id=DAG_ID,
description='Run multiple Spark jobs with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval=None,
tags=['emr', 'spark', 'pyspark']
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=get_object('job_flow_overrides/job_flow_overrides.json', work_bucket)
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=get_object('emr_steps/emr_steps.json', work_bucket)
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
cluster_creator >> step_adder >> step_checker
view raw multiple_steps.py hosted with ❤ by GitHub

This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.

aws s3 cp emr_steps/emr_steps.json \
    s3://emr-demo-work-123412341234-us-east-1/emr_steps/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
    s3://emr-demo-work-123412341234-us-east-1/job_flow_overrides/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234-us-east-1/dags/

The second DAG, multiple_steps, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json.

Apache Airflow UI’s DAGs tab

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.

Amazon EMR Console’s Steps tab showing four Steps running in parallel

Triggering DAGs Programmatically

AWS CLI

Similar to the previous example, below we can trigger the multiple_steps DAG programmatically using Airflow’s trigger_dag CLI command. Note the addition of the —-conf named argument, which passes the configuration, containing three key/value pairs, to the trigger command as a JSON blob.

ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
DAG_NAME="multiple_steps"
CONFIG="""'{
\"airflow_email\": \"analytics_team@example.com\",
\"email_on_failure\": true,
\"email_on_retry\": false
}'"""
CLI_JSON=$(aws mwaa create-cli-token –name ${ENVIRONMENT_NAME}) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME} –conf ${CONFIG}"

AWS SDK

Airflow DAGs can also be triggered using the AWS SDK. For example, with boto3 for Python, we could use a script, similar to the following to remotely trigger a DAG.

#!/usr/bin/env python3
# MWAA: Trigger an Apache Airflow DAG using SDK
# Author: Gary A. Stafford (February 2021)
import logging
import boto3
import requests
logging.basicConfig(
format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
mwaa_client = boto3.client('mwaa')
ENVIRONMENT_NAME = 'Your_Airflow_Environment_Name'
DAG_NAME = 'your_dag_name'
CONFIG = '{"foo": "bar"}'
def main():
response = mwaa_client.create_cli_token(
Name=ENVIRONMENT_NAME
)
logging.info('response: ' + str(response))
token = response['CliToken']
url = 'https://{0}/aws_mwaa/cli'.format(response['WebServerHostname'])
headers = {'Authorization': 'Bearer ' + token, 'Content-Type': 'text/plain'}
payload = 'trigger_dag {0} –conf {1}'.format(DAG_NAME, CONFIG)
response = requests.post(url, headers=headers, data=payload)
logging.info('response: ' + str(response)) # should be <Response [200]>
if __name__ == '__main__':
main()
view raw trigger_dag.py hosted with ❤ by GitHub

Cleaning Up

Once you are done with the MWAA Environment, be sure to delete it as soon as possible to save additional costs. Also, delete the MWAA-VPC CloudFormation stack. These resources, like the two NAT Gateways, will also continue to generate additional costs.

aws mwaa delete-environment --name <your_mwaa_environment_name>
aws cloudformation delete-stack --stack-name MWAA-VPC

Conclusion

In this second post in the series, we explored using the newly released Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to run PySpark applications on Amazon Elastic MapReduce (Amazon EMR). In future posts, we will explore the use of Jupyter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.

If you are interested in learning more about configuring Amazon MWAA and Airflow, see my recent post, Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment

Installing Apache Superset on Amazon EMR: Add data exploration and visualization to your analytics cluster

Introduction

AWS provides nearly twenty-five different open-source data analytics applications that can be automatically installed and configured on Amazon Elastic MapReduce (Amazon EMR). Of all those options, EMR doesn’t offer a general-purpose data exploration and visualization tool. However, with EMR, you can automate the installation of additional software as part of the cluster creation process or post cluster creation. This brief post will explore how to install, configure, and access Apache Superset, the modern data exploration and visualization platform on Amazon EMR’s Master Node, as a post-cluster creation step. You can use these same techniques to install other software packages on EMR as well, manually or as part of an automated Data Pipeline.

Amazon EMR

According to AWS, Amazon EMR is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

AWS currently offers 5.x and 6.x versions of Amazon EMR. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0. Each version of Amazon EMR offers incremental major and minor releases of nearly 25 different, popular open-source big-data software packages to choose from, which Amazon EMR will install and configure when the cluster is created.

Apache Superset

According to its website, Apache Superset is a modern data exploration and visualization platform. Superset is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

Superset natively supports over twenty-five data sources, including Amazon Athena and Redshift, Apache Drill, Druid, Hive, Impala, Kylin, Pinot, and Spark SQL, Elasticsearch, Google BigQuery, Hana, MySQL, Oracle, Postgres, Presto, Snowflake, Microsoft SQL Server, and Teradata.

As shown in their Gallery, Superset includes dozens of visualization types, including Pivot Table, Line Chart, Markup, Pie Chart, Filter Box, Bubble Chart, Box Plot, Histogram, Heatmap, Sunburst, Calendar Heatmap, and several geospatial types.

Apache Superset Visualization Gallery

Setup

Using this git clone command, download a copy of this post’s open-source GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-superset-demo.git

To demonstrate how to install Apache Superset on EMR, I have prepared an AWS CloudFormation template. Deploying the template, cloudformation/superset-emr-demo.yml, to AWS will result in the AWS CloudFormation stack, superset-emr-demo-dev. The stack creates a minimally-sized, two-node EMR cluster, two Amazon S3 buckets, and several AWS Systems Manager (SSM) Parameter Store parameters.

There is also a JSON-format CloudFormation parameters file, cloudformation/superset-emr-demo-params-dev.json. The parameters file contains values for eight of the ten required parameters in the CloudFormation template, all of which you can adjust. For the remaining two required parameters, you will need to supply the name of an existing EC2 key pair to access the EMR Master node. The key pair will need to be deployed to the same AWS Account into which you are deploying EMR. You will also need to supply a Subnet ID for the EMR cluster to be installed into. The subnet must have access to the Internet to install Superset’s required system and Python packages and to access Superset’s web-based user interface. If you need help creating a VPC and subnet to deploy EMR into, refer to my previous blog post, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

The CloudFormation stack is created using a Python script, create_cfn_stack.py. The python script uses the AWS boto3 Python SDK.

#!/usr/bin/env python3
# Purpose: Create EMR bootstrap script bucket and deploy the cfn stack
# Author: Gary A. Stafford (December 2020)
# Reference: https://gist.github.com/svrist/73e2d6175104f7ab4d201280acba049c
# Usage Example: python3 ./create_cfn_stack.py \
# –ec2-key-name emr-demo-123456789012-us-east-1 \
# –ec2-subnet-id subnet-06aa61f790a932b32 \
# –environment dev
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
sts_client = boto3.client('sts')
cfn_client = boto3.client('cloudformation')
region = boto3.DEFAULT_SESSION.region_name
s3_client = boto3.client('s3', region_name=region)
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
def main():
args = parse_args()
# create bootstrap bucket
account_id = sts_client.get_caller_identity()['Account']
bootstrap_bucket = f'superset-emr-demo-bootstrap-{account_id}{region}'
create_bucket(bootstrap_bucket)
# upload bootstrap script
dir_path = os.path.dirname(os.path.realpath(__file__))
upload_file(f'{dir_path}/bootstrap_emr/bootstrap_actions.sh', bootstrap_bucket, 'bootstrap_actions.sh')
# set variables
stack_name = f'emr-superset-demo-{args.environment}'
cfn_template_path = f'{dir_path}/cloudformation/superset-emr-demo.yml'
cfn_params_path = f'{dir_path}/cloudformation/superset-emr-demo-params-{args.environment}.json'
ec2_key_name = args.ec2_key_name
# append new parameters
cfn_params = _parse_parameters(cfn_params_path)
cfn_params.append({'ParameterKey': 'Ec2KeyName', 'ParameterValue': ec2_key_name})
cfn_params.append({'ParameterKey': 'Ec2SubnetId', 'ParameterValue': args.ec2_subnet_id})
cfn_params.append({'ParameterKey': 'BootstrapBucket', 'ParameterValue': bootstrap_bucket})
logging.info(json.dumps(cfn_params, indent=4))
# create the cfn stack
create_stack(stack_name, cfn_template_path, cfn_params)
def create_bucket(bootstrap_bucket):
"""Create an S3 bucket in a specified region
:param bootstrap_bucket: Bucket to create
:return: True if bucket created, else False
"""
try:
s3_client.create_bucket(Bucket=bootstrap_bucket)
logging.info(f'New bucket name: {bootstrap_bucket}')
except ClientError as e:
logging.error(e)
return False
return True
def upload_file(file_name, bootstrap_bucket, object_name):
"""Upload a file to an S3 bucket
:param file_name: File to upload
:param bootstrap_bucket: Bucket to upload to
:param object_name: S3 object name
:return: True if file was uploaded, else False
"""
# Upload the file
try:
response = s3_client.upload_file(file_name, bootstrap_bucket, object_name)
logging.info(f'File {file_name} uploaded to bucket {bootstrap_bucket} as object {object_name}')
except ClientError as e:
logging.error(e)
return False
return True
def create_stack(stack_name, cfn_template, cfn_params):
"""Create EMR Cluster CloudFormation stack"""
template_data = _parse_template(cfn_template)
create_stack_params = {
'StackName': stack_name,
'TemplateBody': template_data,
'Parameters': cfn_params,
'TimeoutInMinutes': 60,
'Capabilities': [
'CAPABILITY_NAMED_IAM',
],
'Tags': [
{
'Key': 'Project',
'Value': 'Superset EMR Demo'
},
]
}
try:
response = cfn_client.create_stack(**create_stack_params)
logging.info(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def _parse_template(template):
with open(template) as template_file_obj:
template_data = template_file_obj.read()
cfn_client.validate_template(TemplateBody=template_data)
return template_data
def _parse_parameters(parameters):
with open(parameters) as parameter_file_obj:
parameter_data = json.load(parameter_file_obj)
return parameter_data
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–environment', required=True, choices=['dev', 'test', 'prod'], help='Environment')
parser.add_argument('-k', '–ec2-key-name', required=True, help='Ec2KeyName: Name of EC2 Keypair')
parser.add_argument('-s', '–ec2-subnet-id', required=True, help='Ec2SubnetId: Name of EC2 Keypair')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw create_cfn_stack.py hosted with ❤ by GitHub

To execute the Python script and create the CloudFormation stack, which will create the EMR cluster, run the following command. Remember to update the parameters to the name of your EC2 key pair and the Subnet ID for the EMR cluster.

python3 ./create_cfn_stack.py \
--ec2-key-name <your_key_pair_name> \
--ec2-subnet-id <your_subnet_id> \
--environment dev

Here is what the complete CloudFormation workflow looks like.

AWS CloudFormation stack creation

Security Group Ingress Rules

To install Superset on the EMR cluster’s Master node via SSH, you need to open port 22 on the Security Group associated with the EMR cluster’s Master Node, allowing access from your IP address. You can use the AWS Management Console or AWS CLI to open port 22. We will use jq and AWS ec2 API from the AWS CLI to get the Security Group ID associated with the EMR cluster’s Master Node and create the two ingress rules.

export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r ".SecurityGroups[] | \
select(.GroupName==\"ElasticMapReduce-master\").GroupId" | \
head -n 1)
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32

Superset Script

Once the CloudFormation stack is created and the ports are open, we can install Apache Superset on the EMR Master Node. The bootstrap script,bootstrap_emr/bootstrap_superset.sh, will be used to install Apache Superset onto the EMR cluster’s Master Node as the hadoop user. The script is roughly based on Superset’s Installing from Scratch instructions.

As part of installing Superset, the script will also deploy several common database drivers, including Amazon Athena, Amazon Redshift, Apache Spark SQL, Presto, PostgreSQL, and MySQL. The script will also create a Superset Admin role, and two Superset User roles — Alpha and Gamma.

#!/bin/bash
# Purpose: Installs Apache Superset on EMR
# Author: Gary A. Stafford (December 2020)
# Usage: sh ./bootstrap_superset.sh 8280
# Reference: https://superset.apache.org/docs/installation/installing-superset-from-scratch
# port for superset (default: 8280)
export SUPERSET_PORT="${1:-8280}"
# install required packages
sudo yum -y install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel \
openssl-devel cyrus-sasl-devel openldap-devel python3-devel.x86_64
# optionally, update Master Node packages
sudo yum -y update
# install required Python package
python3 -m pip install –user –upgrade setuptools virtualenv
python3 -m venv venv
. venv/bin/activate
python3 -m pip install –upgrade apache-superset \
PyAthenaJDBC PyAthena sqlalchemy-redshift pyhive mysqlclient psycopg2-binary
command -v superset
superset db upgrade
export FLASK_APP=superset
echo "export FLASK_APP=superset" >>~/.bashrc
touch superset_config.py
echo "ENABLE_TIME_ROTATE = True" >>superset_config.py
echo "export SUPERSET_CONFIG_PATH=superset_config.py" >>~/.bashrc
export ADMIN_USERNAME="SupersetAdmin"
export ADMIN_PASSWORD="Admin1234"
# create superset admin
superset fab create-admin \
–username "${ADMIN_USERNAME}" \
–firstname Superset \
–lastname Admin \
–email superset_admin@example.com \
–password "${ADMIN_PASSWORD}"
superset init
# create two sample superset users
superset fab create-user \
–role Alpha \
–username SupersetUserAlpha \
–firstname Superset \
–lastname UserAlpha \
–email superset_user_alpha@example.com \
–password UserAlpha1234
superset fab create-user \
–role Gamma \
–username SupersetUserGamma \
–firstname Superset \
–lastname UserGamma \
–email superset_user_gamma@example.com \
–password UserGamma1234
# get instance id
INSTANCE_ID="$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .instanceId)"
export INSTANCE_ID
echo "INSTANCE_ID: ${INSTANCE_ID}"
# use instance id to get public dns of master node
PUBLIC_MASTER_DNS="$(aws ec2 describe-instances –instance-id ${INSTANCE_ID} |
jq -r '.Reservations[0].Instances[0].PublicDnsName')"
export PUBLIC_MASTER_DNS
echo "PUBLIC_MASTER_DNS: ${PUBLIC_MASTER_DNS}"
# start superset in background
nohup superset run \
–host "${PUBLIC_MASTER_DNS}" \
–port "${SUPERSET_PORT}" \
–with-threads –reload –debugger \
>superset_output.log 2>&1 </dev/null &
# output connection info
printf %s """
**********************************************************************
Superset URL: http://${PUBLIC_MASTER_DNS}:${SUPERSET_PORT}
Admin Username: ${ADMIN_USERNAME}
Admin Password: ${ADMIN_PASSWORD}
**********************************************************************
"""

To install Superset using the bootstrap script, we will use another Python script, install_superset.py. The script uses paramiko, a Python implementation of SSHv2. The script also uses scp, a module that uses a paramiko transport to send and receive files via the scp1 protocol.

#!/bin/bash
# Purpose: Installs Apache Superset on EMR
# Author: Gary A. Stafford (December 2020)
# Usage: sh ./bootstrap_superset.sh 8280
# Reference: https://superset.apache.org/docs/installation/installing-superset-from-scratch
# port for superset (default: 8280)
SUPERSET_PORT="${1:-8280}"
# install required packages
sudo yum -y install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel \
openssl-devel cyrus-sasl-devel openldap-devel python3-devel.x86_64
# optionally, update master node packages
sudo yum -y update
# set up a python virtual environment
python3 -m pip install –user –upgrade setuptools virtualenv
python3 -m venv venv
. venv/bin/activate
python3 -m pip install –upgrade apache-superset \
PyAthenaJDBC PyAthena sqlalchemy-redshift pyhive mysqlclient psycopg2-binary
command -v superset
superset db upgrade
FLASK_APP=superset
export "FLASK_APP=${FLASK_APP}"
echo "export FLASK_APP=superset" >>~/.bashrc
touch superset_config.py
echo "ENABLE_TIME_ROTATE = True" >>superset_config.py
echo "export SUPERSET_CONFIG_PATH=superset_config.py" >>~/.bashrc
ADMIN_USERNAME="SupersetAdmin"
ADMIN_PASSWORD="Admin1234"
# create superset admin
superset fab create-admin \
–username "${ADMIN_USERNAME}" \
–firstname Superset \
–lastname Admin \
–email superset_admin@example.com \
–password "${ADMIN_PASSWORD}"
superset init
# create two sample superset users
superset fab create-user \
–role Alpha \
–username SupersetUserAlpha \
–firstname Superset \
–lastname UserAlpha \
–email superset_user_alpha@example.com \
–password UserAlpha1234
superset fab create-user \
–role Gamma \
–username SupersetUserGamma \
–firstname Superset \
–lastname UserGamma \
–email superset_user_gamma@example.com \
–password UserGamma1234
# get ec2 instance id
INSTANCE_ID="$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .instanceId)"
# use ec2 instance id to get public dns of master node
PUBLIC_MASTER_DNS="$(aws ec2 describe-instances –instance-id ${INSTANCE_ID} |
jq -r '.Reservations[0].Instances[0].PublicDnsName')"
# start superset in background
nohup superset run \
–host "${PUBLIC_MASTER_DNS}" \
–port "${SUPERSET_PORT}" \
–with-threads –reload –debugger \
>superset_output.log 2>&1 </dev/null &
# output superset connection info
printf %s """
**********************************************************************
Superset URL: http://${PUBLIC_MASTER_DNS}:${SUPERSET_PORT}
Admin Username: ${ADMIN_USERNAME}
Admin Password: ${ADMIN_PASSWORD}
**********************************************************************
"""
view raw install_superset.py hosted with ❤ by GitHub

The script requires a single input parameter, ec2-key-path, which is the full path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem). Optionally, you can change the default Superset port of 8280, using the superset-port parameter.

python3 ./install_superset.py \
--ec2-key-path </path/to/my-key-pair.pem> \
--superset-port 8280

The script uses SSH and SCP to deploy and execute the bootstrap script,bootstrap_superset.sh. The output from the script includes the URL of Apache Superset running on the EMR cluster. The output also contains the username and password of the Superset Admin.

********************************************************************
Superset URL: http://ec2-111-22-333-44.compute-1.amazonaws.com:8280
Admin Username: SupersetAdmin
Admin Password: Admin1234
********************************************************************

SSH Tunnel

According to AWS, EMR applications publish user interfaces as websites hosted on the master node. For security reasons, these websites are only available on the master node’s local web server. To reach any of the web interfaces, you must establish an SSH tunnel with the master node using either dynamic or local port forwarding. If you are using dynamic port forwarding, you must also configure a proxy server to view the web interfaces.

Instructions for creating an SSH tunnel to access UI’s on EMR

Running the command in your terminal will start the SSH tunnel on port 8157. Once the tunnel is enabled, you can access Apache Superset in a web browser, using the script output’s URL shown in the script output above. Use the Admin credentials or either of the two User credentials to sign in to Superset.

Apache Superset Sign In screen

Once signed in, you will have the ability to connect to your data sources and explore and visualize data. Below, we see an example of a SQL query executed against an Amazon RDS for PostgreSQL database, running in a separate VPC from EMR.

Sample query of an Amazon RDS PostgreSQL database using Superset’s SQL Editor

Conclusion

In this post, we learned how to install Apache Superset onto the Master Node of an Amazon EMR Cluster. If you want to install an application on all the nodes of an EMR cluster, you can add the commands to the bootstrap script, which runs when CloudFormation creates the cluster.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , ,

Leave a comment

Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce

Introduction

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache SparkHiveHBaseFlinkHudi, and ZeppelinJupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics. 

Image for post
Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

With EMR, individuals and teams can also use notebooks, including EMR Notebooks, based on JupyterLab, the web-based interactive development environment for Jupyter notebooks for ad-hoc data analytics. Apache Zeppelin is also available to collaborate and interactively explore, process, and visualize data. With EMR notebooks and the EMR API, users can programmatically execute a notebook without the need to interact with the EMR console, referred to as headless execution.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. One major difference between EMR versions relevant to this post is EMR 6.x’s support for the latest Hadoop and Spark 3.x frameworks. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

PySpark on EMR

In the following series of posts, we will focus on the options available to interact with Amazon EMR using the Python API for Apache Spark, known as PySpark. We will divide the methods for accessing PySpark on EMR into two categories: PySpark applications and notebooks. We will explore both interactive and automated patterns for running PySpark applications (Python scripts) and PySpark-based notebooks. In this first post, I will cover the first four PySpark Application Methods listed below. In part two, I will cover Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and in part three, the use of notebooks.

PySpark Application Methods

  1. Add Job Flow Steps: Remote execution of EMR Steps on an existing EMR cluster using the add_job_flow_steps method;
  2. EMR Master Node: Remote execution over SSH of PySpark applications using spark-submit on an existing EMR cluster’s Master node;
  3. Run Job Flow: Remote execution of EMR Steps on a newly created long-lived or auto-terminating EMR cluster using the run_job_flow method;
  4. AWS Step Functions: Remote execution of EMR Steps using AWS Step Functions on an existing or newly created long-lived or auto-terminating EMR cluster;
  5. Apache Airflow: Remote execution of EMR Steps using the recently released Amazon MWAA on an existing or newly created long-lived or auto-terminating EMR cluster (see part two of this series);

Notebook Methods

  1. EMR Notebooks for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Jupyter Notebooks on an existing EMR cluster;
  2. Headless Execution of EMR Notebooks: Headless execution of notebooks from an existing EMR cluster or newly created auto-terminating cluster;
  3. Apache Zeppelin for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Zeppelin notebooks on an existing EMR cluster;

Note that wherever the AWS SDK for Python (boto3) is used in this post, we can substitute the AWS CLI or AWS Tools for PowerShell. Typically, these commands and Python scripts would be run as part of a DevOps or DataOps deployment workflow, using CI/CD platforms like AWS CodePipeline, Jenkins, Harness, CircleCI, Travis CI, or Spinnaker.

Preliminary Tasks

To prepare the AWS EMR environment for this post, we need to perform a few preliminary tasks.

  1. Download a copy of this post’s GitHub repository;
  2. Download three Kaggle datasets and organize locally;
  3. Create an Amazon EC2 key pair;
  4. Upload the EMR bootstrap script and create the CloudFormation Stack;
  5. Allow your IP address access to the EMR Master node on port 22;
  6. Upload CSV data files and PySpark applications to S3;
  7. Crawl the raw data and create a Data Catalog using AWS Glue;

Step 1: GitHub Repository

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-demo.git

Step 2: Kaggle Datasets

Kaggle is a well-known data science resource with 50,000 public datasets and 400,000 public notebooks. We will be using three Kaggle datasets in this post. You will need to join Kaggle to access these free datasets. Download the following three Kaggle datasets as CSV files. Since we are working with (moderately) big data, the total size of the datasets will be approximately 1 GB.

  1. Movie Ratings: https://www.kaggle.com/rounakbanik/the-movies-dataset
  2. Bakery: https://www.kaggle.com/sulmansarwar/transactions-from-a-bakery
  3. Stocks: https://www.kaggle.com/timoboz/stock-data-dow-jones

Organize the (38) downloaded CSV files into the raw_data directory of the locally cloned GitHub repository, exactly as shown below. We will upload these files to Amazon S3, in the proceeding step.

 > tree raw_data --si -v -A

 raw_data
├── [ 128]  bakery
│   ├── [711k]  BreadBasket_DMS.csv
├── [ 320]  movie_ratings
│   ├── [190M]  credits.csv
│   ├── [6.2M]  keywords.csv
│   ├── [989k]  links.csv
│   ├── [183k]  links_small.csv
│   ├── [ 34M]  movies_metadata.csv
│   ├── [710M]  ratings.csv
│   └── [2.4M]  ratings_small.csv
└── [1.1k]  stocks
    ├── [151k]  AAPL.csv
    ├── [146k]  AXP.csv
    ├── [150k]  BA.csv
    ├── [147k]  CAT.csv
    ├── [146k]  CSCO.csv
    ├── [149k]  CVX.csv
    ├── [147k]  DIS.csv
    ├── [ 42k]  DWDP.csv
    ├── [150k]  GS.csv
    └── [...]  abrdiged... 

In this post, we will be using three different datasets. However, if you want to limit the potential costs associated with big data analytics on AWS, you can choose to limit job submissions to only one or two of the datasets. For example, the bakery and stocks datasets are fairly small yet effectively demonstrate most EMR features. In contrast, the movie rating dataset has nearly 27 million rows of ratings data, which starts to demonstrate the power of EMR and PySpark for big data.

Step 3: Amazon EC2 key pair

According to AWS, a key pair, consisting of a private key and a public key, is a set of security credentials that you use to prove your identity when connecting to an [EC2] instance. Amazon EC2 stores the public key, and you store the private key. To SSH into the EMR cluster, you will need an Amazon key pair. If you do not have an existing Amazon EC2 key pair, create one now. The easiest way to create a key pair is from the AWS Management Console.

Image for post
Amazon EC2 Key pair Console

Your private key is automatically downloaded when you create a key pair in the console. Store your private key somewhere safe. If you use an SSH client on a macOS or Linux computer to connect to EMR, use the following chmod command to set the correct permissions of your private key file so that only you can read it.

chmod 0400 /path/to/my-key-pair.pem

Step 4: Bootstrap Script and CloudFormation Stack

The bulk of the resources that are used as part of this demonstration are created using the CloudFormation stack, emr-dem-dev. The CloudFormation template that creates the stack, cloudformation/emr-demo.yml, is included in the repository. Please review all resources and understand the cost and security implications before continuing.

There is also a JSON-format CloudFormation parameters file, cloudformation/emr-demo-params-dev.json, containing values for all but two of the parameters in the CloudFormation template. The two parameters not in the parameter file are the name of the EC2 key pair you just created and the bootstrap bucket’s name. Both will be passed along with the CloudFormation template using the Python script, create_cfn_stack.py. For each type of environment, such as Development, Test, and Production, you could have a separate CloudFormation parameters file, with different configurations.

AWS CloudFormation stack creation

The template will create approximately (39) AWS resources, including a new AWS VPC, a public subnet, an internet gateway, route tables, a 3-node EMR v6.2.0 cluster, a series of Amazon S3 buckets, AWS Glue data catalog, AWS Glue crawlers, several Systems Manager Parameter Store parameters, and so forth.

The CloudFormation template includes the location of the EMR bootstrap script located on Amazon S3. Before creating the CloudFormation stack, the Python script creates an S3 bootstrap bucket and copies the bootstrap script, bootstrap_actions.sh, from the local project repository to the S3 bucket. The script will be used to install additional packages on EMR cluster nodes, which are required by our PySpark applications. The script also sets the default AWS Region for boto3.

#!/bin/bash
# Purpose: EMR bootstrap script
# Author: Gary A. Stafford (2021-04-05)
# update and install some useful yum packages
sudo yum install -y jq
# set region for boto3
aws configure set region \
"$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region)"
# install some useful python packages
sudo python3 -m pip install boto3 ec2-metadata
view raw bootstrap_actions.sh hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to create the bootstrap bucket, copy the bootstrap script, and provision the CloudFormation stack. You will need to pass the name of your EC2 key pair to the script as a command-line argument.

python3 ./scripts/create_cfn_stack.py \
    --environment dev \
    --ec2-key-name <my-key-pair-name>

The CloudFormation template should create a CloudFormation stack, emr-demo-dev, as shown below.

Image for post
AWS CloudFormation Console’s Stacks tab

Step 5: SSH Access to EMR

For this demonstration, we will need access to the new EMR cluster’s Master EC2 node, using SSH and your key pair, on port 22. The easiest way to add a new inbound rule to the correct AWS Security Group is to use the AWS Management Console. First, find your EC2 Security Group named ElasticMapReduce-master.

Image for post
Amazon EC2 Security Group Console

Then, add a new Inbound rule for SSH (port 22) from your IP address, as shown below.

Image for post
Amazon EC2 Security Group Inbound rules

Alternately, you could use the AWS CLI or AWS SDK to create a new security group ingress rule.

export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r '.SecurityGroups[] | select(.GroupName=="ElasticMapReduce-master").GroupId')
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32

Step 6: Raw Data and PySpark Apps to S3

As part of the emr-demo-dev CloudFormation stack, we now have several new Amazon S3 buckets within our AWS Account. The naming conventions and intended usage of these buckets follow common organizational patterns for data lakes. The data buckets use the common naming convention of rawprocessed, and analyzed data in reference to the data stored within them. We also use a widely used, corresponding naming convention of ‘bronze’, ‘silver’, and ‘gold’ when referring to these data buckets as parameters.

> aws s3api list-buckets | \
    jq -r '.Buckets[] | select(.Name | startswith("emr-demo-")).Name'

emr-demo-raw-123456789012-us-east-1
emr-demo-processed-123456789012-us-east-1
emr-demo-analyzed-123456789012-us-east-1
emr-demo-work-123456789012-us-east-1
emr-demo-logs-123456789012-us-east-1
emr-demo-glue-db-123456789012-us-east-1
emr-demo-bootstrap-123456789012-us-east-1

There is a raw data bucket (aka bronze) that will contain the original CSV files. There is a processed data bucket (aka silver) that will contain data that might have had any number of actions applied: data cleansing, obfuscation, data transformation, file format changes, file compression, and data partitioning. Finally, there is an analyzed data bucket (aka gold) that has the results of the data analysis. We also have a work bucket that holds the PySpark applications, a logs bucket that holds EMR logs, and a glue-db bucket to hold the Glue Data Catalog metadata.

Whenever we submit PySpark jobs to EMR, the PySpark application files and data will always be accessed from Amazon S3. From the GitHub repository’s local copy, run the following command, which will execute a Python script to upload the approximately (38) Kaggle dataset CSV files to the raw S3 data bucket.

python3 ./scripts/upload_csv_files_to_s3.py

Next, run the following command, which will execute a Python script to upload a series of PySpark application files to the work S3 data bucket.

python3 ./scripts/upload_apps_to_s3.py

Step 7: Crawl Raw Data with Glue

The last preliminary step to prepare the EMR demonstration environment is to catalog the raw CSV data into an AWS Glue data catalog database, using one of the two Glue Crawlers we created. The three kaggle dataset’s data will reside in Amazon S3, while their schema and metadata will reside within tables in the Glue data catalog database, emr_demo. When we eventually query the data from our PySpark applications, we will be querying the Glue data catalog’s database tables, which reference the underlying data in S3.

From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the raw data’s schema and metadata information into the Glue data catalog database, emr_demo.

python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-raw

Once the crawler is finished, from the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo, all prefixed with raw_. The tables hold metadata and schema information for the three CSV-format Kaggle datasets loaded into S3.

Image for post
AWS Glue Data Catalog Database Tables Console

Alternately, we can use the glue get-tables AWS CLI command to review the tables.

> aws glue get-tables --database emr_demo | \
    jq -r '.TableList[] | select(.Name | startswith("raw_")).Name'

raw_bakery
raw_credits_csv
raw_keywords_csv
raw_links_csv
raw_links_small_csv
raw_movies_metadata_csv
raw_ratings_csv
raw_ratings_small_csv
raw_stocks

PySpark Applications

Let’s explore four methods to run PySpark applications on EMR.

Image for post
High-level architecture of this post’s data analytics platform

1. Add Job Flow Steps to an Existing EMR Cluster

We will start by looking at running PySpark applications using EMR Steps. According to AWS, we can use Amazon EMR steps to submit work to the Spark framework installed on an EMR cluster. The EMR step for PySpark uses a spark-submit command. According to Spark’s documentation, the spark-submit script, located in Spark’s bin directory, is used to launch applications on a [EMR] cluster. A typical spark-submit command we will be using resembles the following example. This command runs a PySpark application in S3, bakery_sales_ssm.py.

spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://emr-demo-work-123456789012-us-east-1/analyze/bakery_sales_ssm.py

We will target the existing EMR cluster created by CloudFormation earlier to execute our PySpark applications using EMR Steps. We have two sets of PySpark applications. The first set of three PySpark applications will transform the raw CSV-format datasets into Apache Parquet, a more efficient file format for big data analytics. Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data.

There are two versions of each PySpark application. Files with suffix _ssm use the AWS Systems Manager (SSM) Parameter Store service to obtain dynamic parameter values at runtime on EMR. Corresponding non-SSM applications require those same parameter values to be passed on the command line when they are submitted to Spark. Therefore, these PySpark applications are not tightly coupled to boto3 or the SSM Parameter Store. We will use _ssm versions of the scripts in this post’s demonstration.

 > tree pyspark_apps --si -v -A

 pyspark_apps
├── [ 320]  analyze
│   ├── [1.4k]  bakery_sales.py
│   ├── [1.5k]  bakery_sales_ssm.py
│   ├── [2.6k]  movie_choices.py
│   ├── [2.7k]  movie_choices_ssm.py
│   ├── [2.0k]  movies_avg_ratings.py
│   ├── [2.3k]  movies_avg_ratings_ssm.py
│   ├── [2.2k]  stock_volatility.py
│   └── [2.3k]  stock_volatility_ssm.py
└── [ 256]  process
    ├── [1.1k]  bakery_csv_to_parquet.py
    ├── [1.3k]  bakery_csv_to_parquet_ssm.py
    ├── [1.3k]  movies_csv_to_parquet.py
    ├── [1.5k]  movies_csv_to_parquet_ssm.py
    ├── [1.9k]  stocks_csv_to_parquet.py
    └── [2.0k]  stocks_csv_to_parquet_ssm.py 

We will start by executing the three PySpark processing applications. They will convert the CSV data to Parquet. Below, we see an example of one of the PySpark applications we will run, bakery_csv_to_parquet_ssm.py. The PySpark application will convert the Bakery Sales dataset’s CSV file to Parquet and write it to S3.

#!/usr/bin/env python3
# Process raw CSV data and output Parquet
# Author: Gary A. Stafford (November 2020)
import os
import boto3
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client('ssm')
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("bakery-csv-to-parquet") \
.getOrCreate()
convert_to_parquet(spark, "bakery", params)
def convert_to_parquet(spark, file, params):
df_bakery = spark.read \
.format("csv") \
.option("header", "true") \
.option("delimiter", ",") \
.option("inferSchema", "true") \
.load(f"s3a://{params['bronze_bucket']}/bakery/{file}.csv")
write_parquet(df_bakery, params)
def write_parquet(df_bakery, params):
df_bakery.write \
.format("parquet") \
.save(f"s3a://{params['silver_bucket']}/bakery/", mode="overwrite")
def get_parameters():
params = {
'bronze_bucket': ssm_client.get_parameter(Name='/emr_demo/bronze_bucket')['Parameter']['Value'],
'silver_bucket': ssm_client.get_parameter(Name='/emr_demo/silver_bucket')['Parameter']['Value']
}
return params
if __name__ == "__main__":
main()

The three PySpark data processing application’s spark-submit commands are defined in a separate JSON-format file, job_flow_steps_process.json, a snippet of which is shown below. The same goes for the four analytics applications.

[
{
"Name": "Bakery CSV to Parquet",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ work_bucket }}/process/bakery_csv_to_parquet_ssm.py"
]
}
},
{
"Name": "Stocks CSV to Parquet",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ work_bucket }}/process/stocks_csv_to_parquet_ssm.py"
]
}
}
]

Using this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python script, add_job_flow_steps_process.py, shown below. Note line 31, where the Steps are injected into the add_job_flow_steps method’s parameters.

#!/usr/bin/env python3
# Purpose: Submit a variable number of Steps defined in a separate JSON file
# Author: Gary A. Stafford (November 2020)
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
emr_client = boto3.client('emr')
def main():
args = parse_args()
params = get_parameters()
steps = get_steps(params, args.job_type)
add_job_flow_steps(params['cluster_id'], steps)
def add_job_flow_steps(cluster_id, steps):
"""Add Steps to an existing EMR cluster"""
try:
response = emr_client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=steps
)
print(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def get_steps(params, job_type):
"""
Load EMR Steps from a separate JSON-format file and substitutes tags for SSM parameter values
"""
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
file = open(f'{dir_path}/job_flow_steps/job_flow_steps_{job_type}.json', 'r')
steps = json.load(file)
new_steps = []
for step in steps:
step['HadoopJarStep']['Args'] = list(
map(lambda st: str.replace(st, '{{ work_bucket }}', params['work_bucket']), step['HadoopJarStep']['Args']))
new_steps.append(step)
return new_steps
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value'],
'cluster_id': ssm_client.get_parameter(Name='/emr_demo/cluster_id')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-t', '–job-type', required=True, choices=['process', 'analyze'],
help='process or analysis')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()

The Python script used for this task takes advantage of AWS Systems Manager Parameter Store parameters. The parameters were placed in the Parameter Store, within the /emr_demo path, by CloudFormation. We will reference these parameters in several scripts throughout the post.

> aws ssm get-parameters-by-path --path '/emr_demo' | \
      jq -r ".Parameters[] | {Name: .Name, Value: .Value}"

{
"Name": "/emr_demo/bootstrap_bucket",
"Value": "emr-demo-bootstrap-123456789012-us-east-1"
}
{
"Name": "/emr_demo/ec2_key_name",
"Value": "emr-demo-123456789012-us-east-1"
}
{
"Name": "/emr_demo/ec2_subnet_id",
"Value": "subnet-06aa61f790a932b32"
}
{
"Name": "/emr_demo/emr_ec2_role",
"Value": "EMR_EC2_DemoRole"
}
{
"Name": "/emr_demo/emr_role",
"Value": "EMR_DemoRole"
}
{
"Name": "/emr_demo/gold_bucket",
"Value": "emr-demo-analyzed-123456789012-us-east-1"
}
{
"Name": "/emr_demo/silver_bucket",
"Value": "emr-demo-processed-123456789012-us-east-1"
}
{
"Name": "/emr_demo/sm_log_group_arn",
"Value": "arn:aws:logs:us-east-1:123456789012:log-group:EmrDemoStateMachineLogGroup:*"
}
{
"Name": "/emr_demo/sm_role_arn",
"Value": "arn:aws:iam::123456789012:role/State_ExecutionRole"
}
{
"Name": "/emr_demo/work_bucket",
"Value": "emr-demo-work-123456789012-us-east-1"
}
{
"Name": "/emr_demo/bronze_bucket",
"Value": "emr-demo-raw-123456789012-us-east-1"
}
{
"Name": "/emr_demo/cluster_id",
"Value": "j-3J44BFJXNVSCT"
}
{
"Name": "/emr_demo/glue_db_bucket",
"Value": "emr-demo-logs-123456789012-us-east-1"
}
{
"Name": "/emr_demo/logs_bucket",
"Value": "emr-demo-logs-123456789012-us-east-1"
}
{
"Name": "/emr_demo/vpc_id",
"Value": "vpc-01d4151396c119d3a"
}
view raw ssm_parameters.json hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to load the three spark-submit commands from JSON-format file, job_flow_steps_process.json, and run the PySpark processing applications on the existing EMR cluster.

python3 ./scripts/add_job_flow_steps.py --job-type process

While the three Steps are running concurrently, the view from the Amazon EMR Console’s Cluster Steps tab should look similar to the example below.

Image for post
Amazon EMR Console’s Cluster Steps tab

Once the three Steps have been completed, we should note three sub-directories in the processed data bucket containing Parquet-format files.

Image for post
Processed CSV data converted to Parquet and organized by dataset

Of special note is the Stocks dataset, which has been converted to Parquet and partitioned by stock symbol. According to AWS, by partitioning your data, we can restrict the amount of data scanned by each query by specifying filters based on the partition, thus improving performance and reducing cost.

Image for post
Processed stock data converted to Parquet and partitioned by stock symbol

Lastly, the movie ratings dataset has been divided into sub-directories, based on the schema of each table. Each sub-directory contains Parquet files specific to that unique schema.

Image for post
Processed movie ratings data converted to Parquet and organized by schema

Crawl Processed Data with Glue

Similar to the raw data earlier, catalog the newly processed Parquet data into the same AWS Glue data catalog database using one of the two Glue Crawlers we created. Similar to the raw data, earlier, processed data will reside in the Amazon S3 processed data bucket while their schemas and metadata will reside within tables in the Glue data catalog database, emr_demo.

From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the processed data’s schema and metadata information into the Glue data catalog database, emr_demo.

python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-processed

Once the crawler has finished successfully, using the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo, all prefixed with processed_. The tables represent the three kaggle dataset’s contents converted to Parquet and correspond to the equivalent tables with the raw_ prefix.

Image for post
AWS Glue Data Catalog Database Tables Console

Alternately, we can use the glue get-tables AWS CLI command to review the tables.

> aws glue get-tables --database emr_demo | \
    jq -r '.TableList[] | select(.Name | startswith("processed_")).Name'

processed_bakery
processed_credits
processed_keywords
processed_links
processed_links_small
processed_movies_metadata
processed_ratings
processed_ratings_small
processed_stocks

2. Run PySpark Jobs from EMR Master Node

Next, we will explore how to execute PySpark applications remotely on the Master node on the EMR cluster using boto3 and SSH. Although this method may be optimal for certain use cases as opposed to using the EMR SDK, remote SSH execution does not scale as well in my opinion due to a lack of automation, and it exposes some potential security risks.

There are four PySpark applications in the GitHub repository. For this part of the demonstration, we will just submit the bakery_sales_ssm.py application. This application will perform a simple analysis of the bakery sales data. While the other three PySpark applications use AWS Glue, the bakery_sales_ssm.py application reads data directly from the processed data S3 bucket.

The application writes its results into the analyzed data S3 bucket, in both Parquet and CSV formats. The CSV file is handy for business analysts and other non-technical stakeholders who might wish to import the results of the analysis into Excel or business applications.

#!/usr/bin/env python3
# Purpose: Submit Spark job to EMR Master Node
# Author: Gary A. Stafford (December 2020)
# Usage Example: python3 ./submit_spark_ssh.py \
# –ec2-key-path ~/.ssh/emr-demo-123456789012-us-east-1.pem
import argparse
import logging
import boto3
from paramiko import SSHClient, AutoAddPolicy
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
def main():
args = parse_args()
params = get_parameters()
submit_job(params['master_public_dns'], 'hadoop', args.ec2_key_path, params['work_bucket'])
def submit_job(master_public_dns, username, ec2_key_path, work_bucket):
"""Submit job to EMR Master Node"""
ssh = SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect(hostname=master_public_dns, username=username, key_filename=ec2_key_path)
stdin_, stdout_, stderr_ = ssh.exec_command(
command=f"""
spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://{work_bucket}/analyze/bakery_sales_ssm.py"""
)
stdout_lines = ''
while not stdout_.channel.exit_status_ready():
if stdout_.channel.recv_ready():
stdout_lines = stdout_.readlines()
logging.info(' '.join(map(str, stdout_lines)))
ssh.close()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'master_public_dns': ssm_client.get_parameter(Name='/emr_demo/master_public_dns')['Parameter']['Value'],
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–ec2-key-path', required=True, help='EC2 Key Path')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw submit_spark_ssh.py hosted with ❤ by GitHub

Earlier, we created an inbound rule to allow your IP address to access the Master node on port 22. From the EMR Console’s Cluster Summary tab, note the command necessary to SSH into the Master node of the EMR cluster.

EMR Console’s Cluster Summary tab

The Python script, submit_spark_ssh.py, shown below, will submit the PySpark job to the EMR Master Node, using paramiko, a Python implementation of SSHv2. The script is replicating the same functionality as the shell-based SSH command above to execute a remote command on the EMR Master Node. The spark-submit command is on lines 36–38, below.

#!/usr/bin/env python3
# Purpose: Submit Spark job to EMR Master Node
# Author: Gary A. Stafford (December 2020)
# Usage Example: python3 ./submit_spark_ssh.py \
# –ec2-key-path ~/.ssh/emr-demo-123456789012-us-east-1.pem
import argparse
import logging
import boto3
from paramiko import SSHClient, AutoAddPolicy
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
def main():
args = parse_args()
params = get_parameters()
submit_job(params['master_public_dns'], 'hadoop', args.ec2_key_path, params['work_bucket'])
def submit_job(master_public_dns, username, ec2_key_path, work_bucket):
"""Submit job to EMR Master Node"""
ssh = SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(AutoAddPolicy())
ssh.connect(hostname=master_public_dns, username=username, key_filename=ec2_key_path)
stdin_, stdout_, stderr_ = ssh.exec_command(
command=f"""
spark-submit –deploy-mode cluster –master yarn \
–conf spark.yarn.submit.waitAppCompletion=true \
s3a://{work_bucket}/analyze/bakery_sales_ssm.py"""
)
stdout_lines = ''
while not stdout_.channel.exit_status_ready():
if stdout_.channel.recv_ready():
stdout_lines = stdout_.readlines()
logging.info(' '.join(map(str, stdout_lines)))
ssh.close()
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'master_public_dns': ssm_client.get_parameter(Name='/emr_demo/master_public_dns')['Parameter']['Value'],
'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value']
}
return params
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–ec2-key-path', required=True, help='EC2 Key Path')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw submit_spark_ssh.py hosted with ❤ by GitHub

From the GitHub repository’s local copy, run the following command, which will execute a Python script to submit the job. The script requires one input parameter, which is the path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem)

python3 ./scripts/submit_spark_ssh.py \
    --ec2-key-path </path/to/my-key-pair.pem>

The spark-submit command will be executed remotely on the EMR cluster’s Master node over SSH. All variables in the commands will be replaced by the environment variables, set in advance, which use AWS CLI emr and ssm commands.

Image for post
Remote SSH submission of a Spark job

Monitoring Spark Jobs

We set spark.yarn.submit.waitAppCompletion to true. According to Spark’s documentation, this property controls whether the client waits to exit in YARN cluster mode until the application is completed. If set to true, the client process will stay alive, reporting the application’s status. Otherwise, the client process will exit after submission. We can watch the job’s progress from the terminal.

Image for post
PySpark application shown running on EMR’s Master node

We can also use the YARN Timeline Server and the Spark History Server in addition to the terminal. Links to both are shown on both the EMR Console’s Cluster ‘Summary’ and ‘Application user interfaces’ tabs. Unlike other EMR application web interfaces, using port forwarding, also known as creating an SSH tunnel, is not required for the YARN Timeline Server or the Spark History Server.

Image for post
EMR Console’s Cluster Application user interfaces tab

YARN Timeline Server

Below, we see that the job we submitted running on the YARN Timeline Server also includes useful tools like access to configuration, local logs, server stacks, and server metrics.

Image for post
YARN Timeline Server

YARN Timeline Server allows us to drill down into individual jobs and view logs. Logs are ideal for troubleshooting failed jobs, especially the stdout logs.

Image for post

Spark History Server

You can also view the PySpark application we submitted from the Master node using the Spark History Server. Below, we see completed Spark applications (aka Spark jobs) in the Spark History Server.

Image for post
Spark History Server completed applications

Below, we see more details about our Spark job using the Spark History Server.

Image for post
Spark History Server’s Jobs tab

We can even see visual representations of each Spark job’s Directed Acyclic Graph (DAG).

Image for post
Spark History Server’s Jobs tab

3. Run Job Flow on an Auto-Terminating EMR Cluster

The next option to run PySpark applications on EMR is to create a short-lived, auto-terminating EMR cluster using the run_job_flow method. We will create a new EMR cluster, run a series of Steps (PySpark applications), and then auto-terminate the cluster. This is a cost-effective method of running PySpark applications on-demand.

We will create a second 3-node EMR v6.2.0 cluster to demonstrate this method, using Amazon EC2 Spot instances for all the EMR cluster’s Master and Core nodes. Unlike the first, long-lived, more general-purpose EMR cluster, we will only deploy the Spark application to this cluster as that is the only application we will need to run the Steps.

Using the run_job_flow method, we will execute the four PySpark data analysis applications. The PySpark application’s spark-submit commands are defined in a separate JSON-format file, job_flow_steps_analyze.json. Similar to the previous add_job_flow_steps.py script, this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python execution script. Also similar, this script retrieves parameter values from the SSM Parameter Store.

#!/usr/bin/env python3
# Purpose: Create a new EMR cluster and submits a variable
# number of Steps defined in a separate JSON file
# Author: Gary A. Stafford (November 2020)
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
from scripts.parameters import parameters
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
emr_client = boto3.client('emr')
def main():
args = parse_args()
params = parameters.get_parameters()
steps = get_steps(params, args.job_type)
run_job_flow(params, steps)
def run_job_flow(params, steps):
"""Create EMR cluster, run Steps, and then terminate cluster"""
try:
response = emr_client.run_job_flow(
Name='demo-cluster-run-job-flow',
LogUri=f's3n://{params["logs_bucket"]}',
ReleaseLabel='emr-6.2.0',
Instances={
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.2xlarge',
},
],
},
],
'Ec2KeyName': params['ec2_key_name'],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': params['ec2_subnet_id'],
},
Steps=steps,
BootstrapActions=[
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': f's3://{params["bootstrap_bucket"]}/bootstrap_actions.sh',
}
},
],
Applications=[
{
'Name': 'Spark'
},
],
Configurations=[
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
VisibleToAllUsers=True,
JobFlowRole=params['emr_ec2_role'],
ServiceRole=params['emr_role'],
Tags=[
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'EMR Demo Project Cluster'
},
{
'Key': 'Owner',
'Value': 'Data Analytics'
},
],
EbsRootVolumeSize=32,
StepConcurrencyLevel=5,
)
print(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def get_steps(params, job_type):
"""
Load EMR Steps from a separate JSON-format file and substitutes tags for SSM parameter values
"""
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
file = open(f'{dir_path}/job_flow_steps/job_flow_steps_{job_type}.json', 'r')