Posts Tagged Amazon
Observing gRPC-based Microservices on Amazon EKS running Istio
Posted by Gary A. Stafford in AWS, Build Automation, Cloud, Continuous Delivery, DevOps, Go, JavaScript, Kubernetes, Software Development on July 8, 2021
Observing a gRPC-based Kubernetes application using Jaeger, Zipkin, Prometheus, Grafana, and Kiali on Amazon EKS running Istio service mesh
Introduction
In the previous two-part post, Kubernetes-based Microservice Observability with Istio Service Mesh, we explored a set of popular open source observability tools easily integrated with the Istio service mesh. Tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset with the addition of Fluent Bit for log processing and aggregation to Amazon CloudWatch Container Insights. We used these tools to observe a distributed, microservices-based, RESTful application deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster. The application platform, running on EKS, used Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.
In this post, we will examine those same observability tools to monitor an alternate set of Go-based microservices that use Protocol Buffers (aka Protobuf) over gRPC (gRPC Remote Procedure Calls) and HTTP/2 for client-server communications as opposed to the more common RESTful JSON over HTTP. We will learn how Kubernetes, Istio, and the observability tools work seamlessly with gRPC, just as they do with JSON over HTTP on Amazon EKS.

Technologies
gRPC
According to the gRPC project, gRPC is a modern open source high-performance Remote Procedure Call (RPC) framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking, and authentication. gRPC is also applicable in the last mile of distributed computing to connect devices, mobile applications, and browsers to backend services.
gRPC was initially created by Google, which has used a single general-purpose RPC infrastructure called Stubby to connect the large number of microservices running within and across its data centers for over a decade. In March 2015, Google decided to build the next version of Stubby and make it open source. gRPC is now used in many organizations outside of Google, including Square, Netflix, CoreOS, Docker, CockroachDB, Cisco, and Juniper Networks. gRPC currently supports over ten languages, including C#, C++, Dart, Go, Java, Kotlin, Node, Objective-C, PHP, Python, and Ruby.
According to widely-cited 2019 tests published by Ruwan Fernando, “gRPC is roughly 7 times faster than REST when receiving data & roughly 10 times faster than REST when sending data for this specific payload. This is mainly due to the tight packing of the Protocol Buffers and the use of HTTP/2 by gRPC.”
Protocol Buffers
With gRPC, you define your service using Protocol Buffers (aka Protobuf), a powerful binary serialization toolset and language. According to Google, Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data — think XML, but smaller, faster, and simpler. Google’s previous documentation claimed protocol buffers were “3 to 10 times smaller and 20 to 100 times faster than XML.”
Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto
file to generate data access classes. With the proto3
language version, protocol buffers currently support generated code in Java, Python, Objective-C, C++, Dart, Go, Ruby, and C#, with more languages to come. For this post, we have compiled our protobufs for Go. You can read more about the binary wire format of Protobuf on Google’s Developers Portal.
Reference Application Platform
To demonstrate the use of the observability tools, we will deploy a reference application platform to Amazon EKS on AWS. The application platform was developed to demonstrate different Kubernetes platforms, such as EKS, GKE, AKS, and concepts such as service meshes, API management, observability, CI/CD, DevOps, and Chaos Engineering. The platform comprises a backend of eight Go-based microservices labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, one Go-based gRPC Gateway reverse proxy, four MongoDB databases, and one RabbitMQ message queue.

The reference application platform is designed to generate gRPC-based, synchronous service-to-service IPC (inter-process communication), asynchronous TCP-based service-to-queue-to-service communications, and TCP-based service-to-database communications. For example, Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message to a RabbitMQ queue, which Service F consumes and writes to MongoDB, and so on. The platform’s distributed service communications can be observed using the observability tools when the application is deployed to a Kubernetes cluster running the Istio service mesh.

Converting to gRPC and Protocol Buffers
For this post, the eight Go microservices have been modified to use gRPC with protocol buffers over HTTP/2 instead of JSON over HTTP. Specifically, the services use version 3 (aka proto3) of protocol buffers. With gRPC, a gRPC client calls a gRPC server. Some of the platform’s services are gRPC servers, others are gRPC clients, while some act as both client and server.
gRPC Gateway
In the revised platform architecture diagram above, note the addition of the gRPC Gateway reverse proxy that replaces Service A at the edge of the API. The proxy, which translates a RESTful HTTP API into gRPC, sits between the Angular-based Web UI and Service A. Assuming for the sake of this demonstration that most consumers of an API require a RESTful JSON over HTTP API, we have added a gRPC Gateway reverse proxy to the platform. The gRPC Gateway proxies communications between the JSON over HTTP-based clients and the gRPC-based microservices. The gRPC Gateway helps to provide APIs with both gRPC and RESTful styles at the same time.
A diagram from the grpc-gateway GitHub project site demonstrates how the reverse proxy works.

Alternatives to gRPC Gateway
As an alternative to the gRPC Gateway reverse proxy, we could convert the TypeScript-based Angular UI client to communicate via gRPC and protobufs and communicate directly with Service A. One option to achieve this is gRPC Web, a JavaScript implementation of gRPC for browser clients. gRPC Web clients connect to gRPC services via a special proxy, which by default is Envoy. The project’s roadmap includes plans for gRPC Web to be supported in language-specific web frameworks for languages such as Python, Java, and Node.
Demonstration
To follow along with this post’s demonstration, review the installation instructions detailed in part one of the previous post, Kubernetes-based Microservice Observability with Istio Service Mesh, to deploy and configure the Amazon EKS cluster, Istio, Amazon MQ, and DocumentDB. To expedite the deployment of the revised gRPC-based platform to the dev
namespace, I have included a Helm chart, ref-app-grpc
, in the project. Using the chart, you can ignore any instructions in the previous post that refer to deploying resources to the dev
namespace. See the chart’s README file for further instructions.

Source Code
The gRPC-based microservices source code, Kubernetes resources, and Helm chart are located in the k8s-istio-observe-backend project repository in the 2021-istio
branch. This project repository is the only source code you will need for this demonstration.
git clone --branch 2021-istio --single-branch \
https://github.com/garystafford/k8s-istio-observe-backend.git
Optionally, the Angular-based web client source code is located in the k8s-istio-observe-frontend repository on the new 2021-grpc
branch. The source protobuf .proto
file and the Buf-compiled protobuf files are located in the pb-greeting and protobuf project repositories. You do not need to clone any of these projects for this post’s demonstration.
All Docker images for the services, UI, and the reverse proxy are pulled from Docker Hub.

Code Changes
Although this post is not specifically about writing Go for gRPC and protobuf, to better understand the observability requirements and capabilities of these technologies compared to the previous JSON over HTTP-based services, it is helpful to review the code changes.
Microservices
First, compare the revised source code for Service A, shown below to the original code in the previous post. The service’s code is almost completely rewritten. For example, note the following code changes to Service A, which are synonymous with the other backend services:
- Import of the v3 greeting protobuf package;
- Local Greeting struct replaced with
pb.Greeting
struct; - All services are now hosted on port
50051
; - The HTTP server and all API resource handler functions are removed;
- Headers used for distributed tracing have moved from HTTP request object to metadata passed in a gRPC
Context
type; - Service A is both a gRPC client and a server, which is called by the gRPC Gateway reverse proxy;
- The primary
GreetingHandler
function is replaced by the protobuf package’sGreeting
function; - gRPC clients, such as Service A, call gRPC servers using the
CallGrpcService
function; - CORS handling is offloaded from the services to Istio;
- Logging methods are largely unchanged;
Source code for revised gRPC-based Service A:
Greeting Protocol Buffers
Shown below is the greeting v3 protocol buffers .proto
file. The fields within the Greeting
, originally defined in the RESTful JSON-based services as a struct, remains largely unchanged, however, we now have a message— an aggregate containing a set of typed fields. The GreetingRequest
is composed of a single Greeting
message, while the GreetingResponse
message is composed of multiple (repeated
) Greeting
messages. Services pass a Greeting
message in their request and receive an array of one or more messages in response.
The protobuf is compiled with Buf, the popular Go-based protocol compiler tool. Using Buf, four files are generated: Go, Go gRPC, gRPC Gateway, and Swagger (OpenAPI v2).
.
├── greeting.pb.go
├── greeting.pb.gw.go
├── greeting.swagger.json
└── greeting_grpc.pb.go
Buf is configured using two files, buf.yaml
:
version: v1beta1
name: buf.build/garystafford/pb-greeting
deps:
- buf.build/beta/googleapis
- buf.build/grpc-ecosystem/grpc-gateway
build:
roots:
- proto
lint:
use:
- DEFAULT
breaking:
use:
- FILE
And, and buf.gen.yaml
:
version: v1beta1
plugins:
- name: go
out: ../protobuf
opt:
- paths=source_relative
- name: go-grpc
out: ../protobuf
opt:
- paths=source_relative
- name: grpc-gateway
out: ../protobuf
opt:
- paths=source_relative
- generate_unbound_methods=true
- name: openapiv2
out: ../protobuf
opt:
- logtostderr=true
The compiled protobuf code is included in the protobuf project on GitHub, and the v3 version is imported into each microservice and the reverse proxy. Below is a snippet of the greeting.pb.go
compiled Go file.
Using Swagger, we can view the greeting protocol buffers’ single RESTful API resource, exposed with an HTTP GET method. You can use the Docker-based version of Swagger UI for viewing protoc
generated swagger definitions.
docker run -p 8080:8080 -d --name swagger-ui \
-e SWAGGER_JSON=/tmp/greeting/v3/greeting.swagger.json \
-v ${GOAPTH}/src/protobuf:/tmp swaggerapi/swagger-ui
The Angular UI makes an HTTP GET request to the /api/greeting
resource, which is transformed to gRPC and proxied to Service A, where it is handled by the Greeting
function.

gRPC Gateway Reverse Proxy
As explained earlier, the gRPC Gateway reverse proxy, which translates the RESTful HTTP API into gRPC, is new. In the code sample below, note the following code features:
- Import of the v3 greeting protobuf package;
ServeMux
, a request multiplexer, matcheshttp
requests to patterns and invokes the corresponding handler;RegisterGreetingServiceHandlerFromEndpoint
registers thehttp
handlers for serviceGreetingService
tomux
. The handlers forward requests to the gRPC endpoint;x-b3
request headers, used for distributed tracing, are collected from the incoming HTTP request and propagated to the upstream services in the gRPCContext
type;
Istio VirtualService and CORS
With the RESTful services in the previous post, CORS was handled by Service A. Service A allowed the UI to make cross-origin requests to the backend API’s domain. Since the gRPC Gateway does not directly support Cross-Origin Resource Sharing (CORS) policy, we have offloaded the CORS responsibility to Istio using the reverse proxy’s VirtualService
resource’s CorsPolicy
configuration. Moving this responsibility makes CORS much easier to manage as YAML-based configuration and part of the Helm chart. See lines 20–28 below.
Pillar One: Logs
To paraphrase Jay Kreps on the LinkedIn Engineering Blog, a log is an append-only, totally ordered sequence of records ordered by time. The ordering of records defines a notion of “time” since entries to the left are defined to be older than entries to the right. Logs are a historical record of events that happened in the past. Logs have been around almost as long as computers and are at the heart of many distributed data systems and real-time application architectures.
Go-based Microservice Logging
An effective logging strategy starts with what you log, when you log, and how you log. As part of the platform’s logging strategy, the eight Go-based microservices use Logrus, a popular structured logger for Go, first released in 2014. The platform’s services also implement Banzai Cloud’s logrus-runtime-formatter. These two logging packages give us greater control over what you log, when you log, and how you log information about the services. The recommended configuration of the packages is minimal. Logrus’ JSONFormatter
provides for easy parsing by third-party systems and injects additional contextual data fields into the log entries.
Logrus provides several advantages over Go’s simple logging package, log. For example, log entries are not only for Fatal errors, nor should all verbose log entries be output in a Production environment. Logrus has the capability to log at seven levels: Trace, Debug, Info, Warning, Error, Fatal, and Panic. The log level of the platform’s microservices can be changed at runtime using an environment variable.
Banzai Cloud’s logrus-runtime-formatter automatically tags log messages with runtime and stack information, including function name and line number — extremely helpful when troubleshooting. There is an excellent post on the Banzai Cloud (now part of Cisco) formatter, Golang runtime Logrus Formatter.

In 2020, Logus entered maintenance mode. The author, Simon Eskildsen (Principal Engineer at Shopify), stated they would not be introducing new features. This does not mean Logrus is dead. With over 18,000 GitHub Stars, Logrus will continue to be maintained for security, bug fixes, and performance. The author states that many fantastic alternatives to Logus now exist, such as Zerolog, Zap, and Apex.
Client-side Angular UI Logging
Likewise, I have enhanced the logging of the Angular UI using NGX Logger. NGX Logger is a simple logging module for angular (currently supports Angular 6+). It allows “pretty print” to the console and allows log messages to be POSTed to a URL for server-side logging. For this demo, the UI will only log to the web browser’s console. Similar to Logrus, NGX Logger supports multiple log levels: Trace, Debug, Info, Warning, Error, Fatal, and Off. However, instead of just outputting messages, NGX Logger allows us to output properly formatted log entries to the browser’s console.
The level of logs output is configured to be dependent on the environment, Production or not Production. Below is an example of the log output from the Angular UI in Chrome. Since the UI’s Docker Image was built with the Production configuration, the log level is set to INFO
. You would not want to expose potentially sensitive information in verbose log output to our end-users in Production.

Controlling logging levels is accomplished by adding the following ternary operator to the app.module.ts
file.
Platform Logs
Based on the platform built, configured, and deployed in part one, you now have access logs from multiple sources.
- Amazon DocumentDB: Amazon CloudWatch Audit and Profiler logs;
- Amazon MQ: Amazon CloudWatch logs;
- Amazon EKS: API server, Audit, Authenticator, Controller manager, and Scheduler CloudWatch logs;
- Kubernetes Dashboard: Individual EKS Pod and Replica Set logs;
- Kiali: Individual EKS Pod and Container logs;
- Fluent Bit: EKS performance, host, dataplane, and application CloudWatch logs;
Fluent Bit
According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet
, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.
Running Fluent Bit, the EKS cluster’s performance, host, dataplane, and application logs will also be available in Amazon CloudWatch.

Within the application log groups, you can access the individual log streams for each reference application’s components.

Within each CloudWatch log stream, you can view individual log entries.

CloudWatch Logs Insights enables you to interactively search and analyze your log data in Amazon CloudWatch Logs. You can perform queries to help you more efficiently and effectively respond to operational issues. If an issue occurs, you can use CloudWatch Logs Insights to identify potential causes and validate deployed fixes.

CloudWatch Logs Insights supports CloudWatch Logs Insights query syntax, a query language you can use to perform queries on your log groups. Each query can include one or more query commands separated by Unix-style pipe characters (|). For example:
fields @timestamp, @message
| filter kubernetes.container_name = "service-f"
and @message like "error"
| sort @timestamp desc
| limit 20
Pillar Two: Metrics
For metrics, we will examine CloudWatch Container Insights, Prometheus, and Grafana. Prometheus and Grafana are industry-leading tools you installed as part of the Istio deployment.
Prometheus
Prometheus is an open source system monitoring and alerting toolkit originally built at SoundCloud circa 2012. Prometheus joined the Cloud Native Computing Foundation (CNCF) in 2016 as the second project hosted after Kubernetes.

According to Istio, the Prometheus addon is a Prometheus server that comes preconfigured to scrape Istio endpoints to collect metrics. You can use Prometheus with Istio to record metrics that track the health of Istio and applications within the service mesh. You can visualize metrics using tools like Grafana and Kiali. The Istio Prometheus addon is intended for demonstration only and is not tuned for performance or security.
The istioctl dashboard
command provides access to all of the Istio web UIs. With the EKS cluster running, Istio installed, and the reference application platform deployed, access Prometheus using the istioctl dashboard prometheus
command from your terminal. You must be logged into AWS from your terminal to connect to Prometheus successfully. If you are not logged in to AWS, you will often see the following error: Error: not able to locate <tool_name> pod: Unauthorized
. Since we used the non-production demonstration versions of the Istio Addons, there is no authentication and authorization required to access Prometheus.
According to Prometheus, users select and aggregate time-series data in real-time using a functional query language called PromQL (Prometheus Query Language). The result of an expression can either be shown as a graph, viewed as tabular data in Prometheus’s expression browser, or consumed by external systems through Prometheus’ HTTP API. The expression browser includes a drop-down menu with all available metrics as a starting point for building queries. Shown below are a few PromQL examples that were developed as part of writing this post.
istio_agent_go_info{kubernetes_namespace="dev"}
istio_build{kubernetes_namespace="dev"}
up{alpha_eksctl_io_cluster_name="istio-observe-demo", job="kubernetes-nodes"}
sum by (pod) (rate(container_network_transmit_packets_total{stack="reference-app",namespace="dev",pod=~"service-.*"}[5m]))
sum by (instance) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code="200"})
sum by (response_code) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code!~"200|0"})
Prometheus APIs
Prometheus has both an HTTP API and a Management API. There are many useful endpoints in addition to the Prometheus UI, available at http://localhost:9090/graph
. For example, the Prometheus HTTP API endpoint that lists all the command-line configuration flags is available at http://localhost:9090/api/v1/status/flags
. The endpoint that lists all the available Prometheus metrics is available at http://localhost:9090/api/v1/label/__name__/values
; over 951 metrics in this demonstration.

The Prometheus endpoint that lists many available metrics with HELP
and TYPE
to explain their function can be found at http://localhost:9090/metrics
.

Understanding Metrics
In addition to these endpoints, the standard service level metrics exported by Istio and available via Prometheus can be found in the Istio Standard Metrics documentation. An explanation of many of the metrics available in Prometheus is also found in the cAdvisor README on their GitHub site. As mentioned in this AWS Blog Post, the cAdvisor metrics are also available from the command line using the following commands:
export NODE=$(kubectl get nodes | sed -n '2 p' | awk {'print $1'})
kubectl get --raw "/api/v1/nodes/${NODE}/proxy/metrics/cadvisor"
Observing Metrics
Below is an example graph of the backend microservice containers deployed to EKS. The graph PromQL expression returns the amount of working set memory, including recently accessed memory, dirty memory, and kernel memory (container_memory_working_set_bytes
), summed by pod, in megabytes (MB). There was no load on the services during the period displayed.
sum by (pod) (container_memory_working_set_bytes{namespace="dev", container=~"service-.*|rev-proxy|angular-ui"}) / (1024^2)

The container_memory_working_set_bytes
metric is the same metric used by the kubectl top
command (not container_memory_usage_bytes
). Omitting the --containers=true
flag will output pod stats versus containers.
> kubectl top pod -n dev --containers=true | \
grep -v istio-proxy | sort -k 4 -r
POD NAME CPU(cores) MEMORY(bytes)
service-d-69d7469cbf-ts4t7 service-d 135m 13Mi
service-d-69d7469cbf-6thmz service-d 156m 13Mi
service-d-69d7469cbf-nl7th service-d 118m 12Mi
service-d-69d7469cbf-fz5bh service-d 118m 12Mi
service-d-69d7469cbf-89995 service-d 136m 11Mi
service-d-69d7469cbf-g4pfm service-d 106m 10Mi
service-h-69576c4c8c-x9ccl service-h 33m 9Mi
service-h-69576c4c8c-gtjc9 service-h 33m 9Mi
service-h-69576c4c8c-bjgfm service-h 45m 9Mi
service-h-69576c4c8c-8fk6z service-h 38m 9Mi
service-h-69576c4c8c-55rld service-h 36m 9Mi
service-h-69576c4c8c-4xpb5 service-h 41m 9Mi
...
In another Prometheus example, the PromQL query expression returns the per-second rate of CPU resources measured in CPU units (1 CPU = 1 AWS vCPU), as measured over the last 5 minutes, per time series in the range vector, summed by the pod. During this period, the backend services were under a consistent, simulated load of 15 concurrent users using hey
. Four instances of Service D pods were consuming the most CPU units during this time period.
sum by (pod) (rate(container_cpu_usage_seconds_total{namespace="dev", container=~"service-.*|rev-proxy|angular-ui"}[5m])) * 1000

The container_cpu_usage_seconds_total
metric is the same metric used by the kubectl top
command. The above PromQL expression multiplies the query results by 1,000 to match the results from kubectl top
, shown below.
> kubectl top pod -n dev --sort-by=cpu
NAME CPU(cores) MEMORY(bytes)
service-d-69d7469cbf-6thmz 159m 60Mi
service-d-69d7469cbf-89995 143m 61Mi
service-d-69d7469cbf-ts4t7 140m 59Mi
service-d-69d7469cbf-fz5bh 135m 58Mi
service-d-69d7469cbf-nl7th 132m 61Mi
service-d-69d7469cbf-g4pfm 119m 62Mi
service-g-c7d68fd94-w5t66 59m 58Mi
service-f-7dc8f64799-qj8qv 56m 55Mi
service-c-69fbc964db-knggt 56m 58Mi
service-h-69576c4c8c-8fk6z 55m 58Mi
service-h-69576c4c8c-4xpb5 55m 58Mi
service-g-c7d68fd94-5cdc2 54m 58Mi
...
Limits
Prometheus also exposes container resource limits. For example, the memory limits set on the reference platform’s backend services, displayed in megabytes (MB), using the container_spec_memory_limit_bytes
metric. When viewed alongside the real-time resources consumed by the services, these metrics are useful to properly configure and monitor Kubernetes management features such as the Horizontal Pod Autoscaler.
sum by (container) (container_spec_memory_limit_bytes{namespace="dev", container=~"service-.*|rev-proxy|angular-ui"}) / (1024^2) / count by (container) (container_spec_memory_limit_bytes{namespace="dev", container=~"service-.*|rev-proxy|angular-ui"})
Or, memory limits by Pod:
sum by (pod) (container_spec_memory_limit_bytes{namespace="dev"}) / (1024^2)

Cluster Metrics
Prometheus also contains metrics about Istio components, Kubernetes components, and the EKS cluster. For example, the total available memory in gigabytes (GB) of each of the five m5.large
EC2 worker nodes in the istio-observe-demo
EKS cluster’s managed-ng-1
Managed Node Group.
machine_memory_bytes{alpha_eksctl_io_cluster_name="istio-observe-demo", alpha_eksctl_io_nodegroup_name="managed-ng-1"} / (1024^3)

For total physical cores, use the machine_cpu_physical_core
metric, and for vCPU cores use the machine_cpu_cores
metric.
Grafana
Grafana describes itself as the leading open source software for time-series analytics. According to Grafana Labs, Grafana allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. You can easily create, explore, and share visually rich, data-driven dashboards. Grafana also allows users to define alert rules for their most important metrics visually. Grafana will continuously evaluate rules and can send notifications.
If you deployed Grafana using the Istio addons process demonstrated in part one of the previous post, access Grafana similar to the other tools:
istioctl dashboard grafana

According to Istio, Grafana is an open source monitoring solution used to configure dashboards for Istio. You can use Grafana to monitor the health of Istio and applications within the service mesh. While you can build your own dashboards, Istio offers a set of preconfigured dashboards for all of the most important metrics for the mesh and the control plane. The preconfigured dashboards use Prometheus as the data source.
- Mesh Dashboard provides an overview of all services in the mesh.
- Service Dashboard provides a detailed breakdown of metrics for a service.
- Workload Dashboard provides a detailed breakdown of metrics for a workload.
- Performance Dashboard monitors the resource usage of the mesh.
- Control Plane Dashboard monitors the health and performance of the control plane.
Below is an example of the Istio Mesh Dashboard, filtered to show the eight backend service workloads running in the dev
namespace. During this period, the backend services were under a consistent simulated load of approximately 20 concurrent users using hey
. You can observe the p50, p90, and p99 latency of requests to these workloads.

Dashboards are built from Panels, the basic visualization building blocks in Grafana. Each panel has a query editor specific to the data source (Prometheus in this case) selected. The query editor allows you to write your (PromQL) query. For example, below is the PromQL expression query responsible for the p50 latency Panel displayed in the Istio Mesh Dashboard.
label_join((histogram_quantile(0.50, sum(rate(istio_request_duration_milliseconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)) / 1000) or histogram_quantile(0.50, sum(rate(istio_request_duration_seconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)), "destination_workload_var", ".", "destination_workload", "destination_workload_namespace")
Below is an example of the Istio Workload Dashboard. The dashboard contains three sections: General, Inbound Workloads, and Outbound Workloads. We have filtered outbound traffic from the reference platform’s backend services in the dev
namespace.

Below is a different view of the Istio Workload Dashboard, the dashboard’s Inbound Workloads section filtered to a single workload, the gRPC Gateway. The gRPC Gateway accepts incoming traffic from the Istio Ingress Gateway, as shown in the dashboard’s panels.

Grafana provides the ability to Explore a Panel. Explore strips away the dashboard and panel options so that you can focus on the query. Below is an example of the Panel showing a steady stream of TCP-based egress traffic for Service F, based on the istio_tcp_sent_bytes_total
metric. Service F consumes messages off on the RabbitMQ queue (Amazon MQ) and writes messages to MongoDB (DocumentDB).

Istio Performance
You can monitor the resource usage of Istio with the Istio Performance Dashboard.

Additional Dashboards
Grafana provides a site containing official and community-built dashboards, including the above-mentioned Istio dashboards. Importing dashboards into your Grafana instance is as simple as copying the dashboard URL or the ID provided from the Grafana dashboard site and pasting it into the dashboard import option of your Grafana instance. However, be aware that not every Kubernetes dashboard in Grafan’s site is compatible with your specific version of Kubernetes, Istio, or EKS, nor relies on Prometheus as a data source. As a result, you might have to test and tweak imported dashboards to get them working.

Below is an example of an imported community dashboard, Kubernetes cluster monitoring (via Prometheus) by Instrumentisto Team (dashboard ID 315).

Alerting
An effective observability strategy must include more than just the ability to visualize results. An effective strategy must also detect anomalies and notify (alert) the appropriate resources or directly resolve incidents. Grafana, like Prometheus, is capable of alerting and notification. You visually define alert rules for your critical metrics. Then, Grafana will continuously evaluate metrics against the rules and send notifications when pre-defined thresholds are breached.
Prometheus supports multiple popular notification channels, including PagerDuty, HipChat, Email, Kafka, and Slack. Below is an example of a Prometheus notification channel that sends alert notifications to a Slack support channel.

Below is an example of an alert based on an arbitrarily high CPU usage of 300 millicpu or millicores (m). When the CPU usage of a single pod goes above that value for more than 3 minutes, an alert is sent. The high CPU usage could be caused by the Horizontal Pod Autoscaler not functioning, or the HPA has reached its maxReplicas
limit, or there are not enough resources available within the cluster’s existing worker nodes to schedule additional pods.

Triggered by the alert, Prometheus sends detailed notifications to the designated Slack channel.

Amazon CloudWatch Container Insights
Lastly, in the category of Metrics, Amazon CloudWatch Container Insights collects, aggregates, summarizes, and visualizes metrics and logs from your containerized applications and microservices. CloudWatch alarms can be set on metrics that Container Insights collects. Container Insights is available for Amazon Elastic Container Service (Amazon ECS), including Fargate, Amazon EKS, and Kubernetes platforms on Amazon EC2.

In Amazon EKS, Container Insights uses a containerized version of the CloudWatch agent to discover all running containers in a cluster. It then collects performance data at every layer of the performance stack. Container Insights collects data as performance log events using the embedded metric format. These performance log events are entries that use a structured JSON schema that enables high-cardinality data to be ingested and stored at scale.
In the previous post, we also installed CloudWatch Container Insights monitoring for Prometheus, which automates the discovery of Prometheus metrics from containerized systems and workloads.

Below is an example of a basic Performance Monitoring CloudWatch Container Insights Dashboard. The dashboard is filtered to the dev
namespace of the EKS cluster, where the reference application platform is running. During this period, the backend services were put under a simulated load using hey
. As the load on the application increased, the ‘Number of Pods’ increased from 20 pods to 56 pods based on the container’s requested resources and HPA configurations. There is also a CloudWatch Alarm, shown on the right of the screen. An alarm was triggered for an arbitrarily high level of network transmission activity.

Next is an example of Container Insights’ Container Map view in CPU mode. You see a visual representation of the dev
namespace, with each of the backend service’s Service
and Deployment
resources shown.

Below, there is a warning icon indicating an Alarm on the cluster was triggered.

Lastly, CloudWatch Insights allows you to jump from the CloudWatch Insights to the CloudWatch Log Insights console. CloudWatch Insights will also write the CloudWatch Insights query for you. Below, we went from the Service D container metrics view in the CloudWatch Insights Performance Monitoring console directly to the CloudWatch Log Insights console with a query, ready to run.

Pillar 3: Traces
According to the Open Tracing website, distributed tracing, also called distributed request tracing, is used to profile and monitor applications, especially those built using a microservices architecture. Distributed tracing helps pinpoint where failures occur and what causes poor performance.
Header Propagation
According to Istio, header propagation may be accomplished through client libraries, such as Zipkin or Jaeger. Header propagation may also be accomplished manually, referred to as trace context propagation, documented in the Distributed Tracing Task. Alternately, Istio proxies can automatically send spans. Applications need to propagate the appropriate HTTP headers so that when the proxies send span information, the spans can be correlated correctly into a single trace. To accomplish this, an application needs to collect and propagate the following headers from the incoming request to any outgoing requests.
x-request-id
x-b3-traceid
x-b3-spanid
x-b3-parentspanid
x-b3-sampled
x-b3-flags
x-ot-span-context
The x-b3
headers originated as part of the Zipkin project. The B3 portion of the header is named for the original name of Zipkin, BigBrotherBird. Passing these headers across service calls is known as B3 propagation. According to Zipkin, these attributes are propagated in-process and eventually downstream (often via HTTP headers) to ensure all activity originating from the same root are collected together.
To demonstrate distributed tracing with Jaeger and Zipkin, the gRPC Gateway passes the b3 headers. While the RESTful JSON-based services passed these headers in the HTTP request object, with gRPC, the heders are passed in the gRPC Context
object. The following code has been added to the gRPC Gateway. The Istio sidecar proxy (Envoy) generates the initial headers, which are then propagated throughout the service call chain. It is critical only to propagate the headers present in the downstream request with values, as the code below does.
Below, in the CloudWatch logs, we see an example of the HTTP request headers recorded in a log message for Service A. The b3
headers are propagated from the gRPC Gateway reverse proxy to gRPC-based Go services. Header propagation ensures a complete distributed trace across the entire service call chain.

Headers propagated from Service A are shown below. Note the b3
headers propagated from the gRPC Gateway reverse proxy.
Jaeger
According to their website, Jaeger, inspired by Dapper and OpenZipkin, is a distributed tracing system released as open source by Uber Technologies. Jaeger is used for monitoring and troubleshooting microservices-based distributed systems, including distributed context propagation, distributed transaction monitoring, root cause analysis, service dependency analysis, and performance and latency optimization. The Jaeger website contains a helpful overview of Jaeger’s architecture and general tracing-related terminology.
If you deployed Jaeger using the Istio addons process demonstrated in part one of the previous post, access Jaeger similar to the other tools:
istioctl dashboard jaeger
Below are examples of the Jaeger UI’s Search view, displaying the results of a search for the Angular UI and the Istio Ingress Gateway services over a period of time. We see a timeline of traces across the top with a list of trace results below. As discussed on the Jaeger website, a trace is composed of spans. A span represents a logical unit of work in Jaeger that has an operation name. A trace is an execution path through the system and can be thought of as a directed acyclic graph (DAG) of spans. If you have worked with systems like Apache Spark, you are probably already familiar with the concept of DAGs.


Below is a detailed view of a single trace in Jaeger’s Trace Timeline mode. The 16 spans encompass nine of the reference platform’s components: seven backend services, gRPC Gateway, and Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 195.49 ms. The root span in the trace is the Istio Ingress Gateway. The Angular UI, loaded in the end user’s web browser, calls gRPC Gateway via the Istio Ingress Gateway. From there, we see the expected flow of our service-to-service IPC. Service A calls Services B and Service C. Service B calls Service E, which calls Service G and Service H.
In this demonstration, traces are not instrumented to span the RabbitMQ message queue nor MongoDB. You will not see a trace that includes a call from Service D to Service F via the RabbitMQ.

The visualization of the trace’s timeline demonstrates the synchronous nature of the reference platform’s service-to-service IPC instead of the asynchronous nature of the decoupled communications using the RabbitMQ messaging queue. Service A waits for each service in its call chain to respond before returning its response to the requester.
Within Jaeger’s Trace Timeline view, you have the ability to drill into a single span, which contains additional metadata. The span’s metadata includes the API endpoint URL being called, HTTP method, response status, and several other headers.

A Trace Statistics view is also available.

Additionally, Jaeger has an experimental Trace Graph mode that displays a graph view of the same trace.

Jaeger also includes a Compare Trace feature and two dependency views: Force-Directed Graph and DAG. I find both views rather primitive compared to Kiali. Lacking access to Kiali, the views are marginally useful as a dependency graph.

Zipkin
Zipkin is a distributed tracing system, which helps gather timing data needed to troubleshoot latency problems in service architectures. According to a 2012 post on Twitter’s Engineering Blog, Zipkin started as a project during Twitter’s first Hack Week. During that week, they implemented a basic version of the Google Dapper paper for Thrift.

Zipkin and Jaeger are very similar in terms of capabilities. I have chosen to focus on Jaeger in this post as I prefer it over Zipkin. If you want to try Zipkin instead of Jaeger, you can use the following commands to remove Jaeger and install Zipkin from the Istio addons extras directory. In part one of the post, we did not install Zipkin by default when we deployed the Istio addons. Be aware that running both tools simultaneously in the same Kubernetes cluster will cause unpredictable tracing results.
kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/jaeger.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/extras/zipkin.yaml
Access Zipkin similar to the other observability tools:
istioctl dashboard zipkin
Below is an example of a distributed trace visualized in Zipkin’s UI, containing 16 spans, similar to the trace visualized in Jaeger, shown above. The spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of ~221 ms.

Zipkin can also visualize a dependency graph based on the distributed trace. Below is an example of a traffic simulation over a 24-hour period, showing network traffic flowing between the reference platform’s components, illustrated as a dependency graph.

Kiali: Microservice Observability
According to their website, Kiali is a management console for an Istio-based service mesh. It provides dashboards and observability, and lets you operate your mesh with robust configuration and validation capabilities. It shows the structure of a service mesh by inferring traffic topology and displaying the mesh’s health. Kiali provides detailed metrics, powerful validation, Grafana access, and strong integration for distributed tracing with Jaeger.
If you deployed Kaili using the Istio addons process demonstrated in part one of the previous post, access Kiali similar to the other tools:
istioctl dashboard kaili
For improved security, install the latest version of Kaili using the customizable install mentioned in Istio’s documentation. Using Kiali’s Install via Kiali Server Helm Chart option adds token-based authentication, similar to the Kubernetes Dashboard.

Kiali’s Overview tab provides a global view of all namespaces within the Istio service mesh and the number of applications within each namespace.

The Graph tab in the Kiali UI represents the components running in the Istio service mesh. Below, filtering on the cluster’s dev
Namespace, we can observe that Kiali has mapped 11 applications (workloads), 11 services, and 24 edges (a graph term). Specifically, we see the Istio Ingres Proxy at the edge of the service mesh, gRPC Gateway, Angular UI, and eight backend services, all with their respective Envoy proxy sidecars that are taking traffic (Service F did not take any direct traffic from another service in this example), the external DocumentDB egress point, and the external Amazon MQ egress point. Note how service-to-service traffic flows with Istio, from the service to its sidecar proxy, to the other service’s sidecar proxy, and finally to the service.

Kiali allows you to zoom in and focus on a single component in the graph and its individual metrics.

Kiali can also display average request times and other metrics for each edge in the graph (communication between two components). Kaili can even show those metrics over a given period of time, using Kiali’s Replay feature, shown below.

The Applications tab lists all the applications, their namespace, and labels.

You can drill into an individual component on both the Applications and Workloads tabs and view additional details. Details include the overall health, Pods, and Istio Config status. Below is an overview of the Service A workload in the dev
Namespace.

The Workloads detailed view also includes inbound and outbound network metrics. Below is an example of the outbound for Service A in the dev
Namespace.

Kiali also gives you access to the individual pod’s container logs. Although log access is not as user-friendly as other log sources discussed previously, having logs available alongside metrics (integration with Grafana), traces (integration with Jaeger), and mesh visualization, all in Kiali, can act as a very effective single pane of glass for observability.

Kiali also has an Istio Config tab. The Istio Config tab displays a list of all of the available Istio configuration objects that exist in the user’s environment.

You can use Kiali to configure and manage the Istio service mesh and its installed resources. Using Kiali, you can actually modify the deployed resources, similar to using the kubectl edit
command.

Oftentimes, I find Kiali to be my first stop when troubleshooting platform issues. Once I identify the specific components or communication paths having issues, I then review the specific application logs and Prometheus metrics through the Grafana dashboard.
Tear Down
To tear down the EKS cluster, DocumentDB cluster, and Amazon MQ broker, use the following commands:
# EKS cluster
eksctl delete cluster --name $CLUSTER_NAME
# Amazon MQ
aws mq list-brokers | jq -r '.BrokerSummaries[] | .BrokerId'aws mq delete-broker --broker-id {{ your_broker_id }}
# DocumentDB
aws docdb describe-db-clusters \
| jq -r '.DBClusters[] | .DbClusterResourceId'aws docdb delete-
db-cluster \
--db-cluster-identifier {{ your_cluster_id }}
Conclusion
In this post, we explored a set of popular open source observability tools, easily integrated with the Istio service mesh. These tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset using Fluent Bit for log processing and forwarding to Amazon CloudWatch Container Insights. Using these tools, we successfully observed a gRPC-based, distributed reference application platform deployed to Amazon EKS.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce
Posted by Gary A. Stafford in AWS, Build Automation, Cloud, Python, Software Development on December 2, 2020
Introduction
According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.
With EMR, individuals and teams can also use notebooks, including EMR Notebooks, based on JupyterLab, the web-based interactive development environment for Jupyter notebooks for ad-hoc data analytics. Apache Zeppelin is also available to collaborate and interactively explore, process, and visualize data. With EMR notebooks and the EMR API, users can programmatically execute a notebook without the need to interact with the EMR console, referred to as headless execution.
AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. One major difference between EMR versions relevant to this post is EMR 6.x’s support for the latest Hadoop and Spark 3.x frameworks. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.
PySpark on EMR
In the following series of posts, we will focus on the options available to interact with Amazon EMR using the Python API for Apache Spark, known as PySpark. We will divide the methods for accessing PySpark on EMR into two categories: PySpark applications and notebooks. We will explore both interactive and automated patterns for running PySpark applications (Python scripts) and PySpark-based notebooks. In this first post, I will cover the first four PySpark Application Methods listed below. In part two, I will cover Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and in part three, the use of notebooks.
PySpark Application Methods
- Add Job Flow Steps: Remote execution of EMR Steps on an existing EMR cluster using the
add_job_flow_steps
method; - EMR Master Node: Remote execution over SSH of PySpark applications using
spark-submit
on an existing EMR cluster’s Master node; - Run Job Flow: Remote execution of EMR Steps on a newly created long-lived or auto-terminating EMR cluster using the
run_job_flow
method; - AWS Step Functions: Remote execution of EMR Steps using AWS Step Functions on an existing or newly created long-lived or auto-terminating EMR cluster;
- Apache Airflow: Remote execution of EMR Steps using the recently released Amazon MWAA on an existing or newly created long-lived or auto-terminating EMR cluster (see part two of this series);
Notebook Methods
- EMR Notebooks for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Jupyter Notebooks on an existing EMR cluster;
- Headless Execution of EMR Notebooks: Headless execution of notebooks from an existing EMR cluster or newly created auto-terminating cluster;
- Apache Zeppelin for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Zeppelin notebooks on an existing EMR cluster;
Note that wherever the AWS SDK for Python (boto3
) is used in this post, we can substitute the AWS CLI or AWS Tools for PowerShell. Typically, these commands and Python scripts would be run as part of a DevOps or DataOps deployment workflow, using CI/CD platforms like AWS CodePipeline, Jenkins, Harness, CircleCI, Travis CI, or Spinnaker.
Preliminary Tasks
To prepare the AWS EMR environment for this post, we need to perform a few preliminary tasks.
- Download a copy of this post’s GitHub repository;
- Download three Kaggle datasets and organize locally;
- Create an Amazon EC2 key pair;
- Upload the EMR bootstrap script and create the CloudFormation Stack;
- Allow your IP address access to the EMR Master node on port 22;
- Upload CSV data files and PySpark applications to S3;
- Crawl the raw data and create a Data Catalog using AWS Glue;
Step 1: GitHub Repository
Using this git clone
command, download a copy of this post’s GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-demo.git
Step 2: Kaggle Datasets
Kaggle is a well-known data science resource with 50,000 public datasets and 400,000 public notebooks. We will be using three Kaggle datasets in this post. You will need to join Kaggle to access these free datasets. Download the following three Kaggle datasets as CSV files. Since we are working with (moderately) big data, the total size of the datasets will be approximately 1 GB.
- Movie Ratings: https://www.kaggle.com/rounakbanik/the-movies-dataset
- Bakery: https://www.kaggle.com/sulmansarwar/transactions-from-a-bakery
- Stocks: https://www.kaggle.com/timoboz/stock-data-dow-jones
Organize the (38) downloaded CSV files into the raw_data
directory of the locally cloned GitHub repository, exactly as shown below. We will upload these files to Amazon S3, in the proceeding step.
> tree raw_data --si -v -A
raw_data
├── [ 128] bakery
│ ├── [711k] BreadBasket_DMS.csv
├── [ 320] movie_ratings
│ ├── [190M] credits.csv
│ ├── [6.2M] keywords.csv
│ ├── [989k] links.csv
│ ├── [183k] links_small.csv
│ ├── [ 34M] movies_metadata.csv
│ ├── [710M] ratings.csv
│ └── [2.4M] ratings_small.csv
└── [1.1k] stocks
├── [151k] AAPL.csv
├── [146k] AXP.csv
├── [150k] BA.csv
├── [147k] CAT.csv
├── [146k] CSCO.csv
├── [149k] CVX.csv
├── [147k] DIS.csv
├── [ 42k] DWDP.csv
├── [150k] GS.csv
└── [...] abrdiged...
In this post, we will be using three different datasets. However, if you want to limit the potential costs associated with big data analytics on AWS, you can choose to limit job submissions to only one or two of the datasets. For example, the bakery and stocks datasets are fairly small yet effectively demonstrate most EMR features. In contrast, the movie rating dataset has nearly 27 million rows of ratings data, which starts to demonstrate the power of EMR and PySpark for big data.
Step 3: Amazon EC2 key pair
According to AWS, a key pair, consisting of a private key and a public key, is a set of security credentials that you use to prove your identity when connecting to an [EC2] instance. Amazon EC2 stores the public key, and you store the private key. To SSH into the EMR cluster, you will need an Amazon key pair. If you do not have an existing Amazon EC2 key pair, create one now. The easiest way to create a key pair is from the AWS Management Console.

Your private key is automatically downloaded when you create a key pair in the console. Store your private key somewhere safe. If you use an SSH client on a macOS or Linux computer to connect to EMR, use the following chmod
command to set the correct permissions of your private key file so that only you can read it.
chmod 0400 /path/to/my-key-pair.pem
Step 4: Bootstrap Script and CloudFormation Stack
The bulk of the resources that are used as part of this demonstration are created using the CloudFormation stack, emr-dem-dev
. The CloudFormation template that creates the stack, cloudformation/emr-demo.yml
, is included in the repository. Please review all resources and understand the cost and security implications before continuing.
There is also a JSON-format CloudFormation parameters file, cloudformation/emr-demo-params-dev.json
, containing values for all but two of the parameters in the CloudFormation template. The two parameters not in the parameter file are the name of the EC2 key pair you just created and the bootstrap bucket’s name. Both will be passed along with the CloudFormation template using the Python script, create_cfn_stack.py
. For each type of environment, such as Development, Test, and Production, you could have a separate CloudFormation parameters file, with different configurations.
The template will create approximately (39) AWS resources, including a new AWS VPC, a public subnet, an internet gateway, route tables, a 3-node EMR v6.2.0 cluster, a series of Amazon S3 buckets, AWS Glue data catalog, AWS Glue crawlers, several Systems Manager Parameter Store parameters, and so forth.
The CloudFormation template includes the location of the EMR bootstrap script located on Amazon S3. Before creating the CloudFormation stack, the Python script creates an S3 bootstrap bucket and copies the bootstrap script, bootstrap_actions.sh
, from the local project repository to the S3 bucket. The script will be used to install additional packages on EMR cluster nodes, which are required by our PySpark applications. The script also sets the default AWS Region for boto3
.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create the bootstrap bucket, copy the bootstrap script, and provision the CloudFormation stack. You will need to pass the name of your EC2 key pair to the script as a command-line argument.
python3 ./scripts/create_cfn_stack.py \ --environment dev \ --ec2-key-name <my-key-pair-name>
The CloudFormation template should create a CloudFormation stack, emr-demo-dev
, as shown below.

Step 5: SSH Access to EMR
For this demonstration, we will need access to the new EMR cluster’s Master EC2 node, using SSH and your key pair, on port 22. The easiest way to add a new inbound rule to the correct AWS Security Group is to use the AWS Management Console. First, find your EC2 Security Group named ElasticMapReduce-master
.

Then, add a new Inbound rule for SSH (port 22) from your IP address, as shown below.

Alternately, you could use the AWS CLI or AWS SDK to create a new security group ingress rule.
export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r '.SecurityGroups[] | select(.GroupName=="ElasticMapReduce-master").GroupId')
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32
Step 6: Raw Data and PySpark Apps to S3
As part of the emr-demo-dev
CloudFormation stack, we now have several new Amazon S3 buckets within our AWS Account. The naming conventions and intended usage of these buckets follow common organizational patterns for data lakes. The data buckets use the common naming convention of raw
, processed
, and analyzed
data in reference to the data stored within them. We also use a widely used, corresponding naming convention of ‘bronze’, ‘silver’, and ‘gold’ when referring to these data buckets as parameters.
> aws s3api list-buckets | \ jq -r '.Buckets[] | select(.Name | startswith("emr-demo-")).Name' emr-demo-raw-123456789012-us-east-1 emr-demo-processed-123456789012-us-east-1 emr-demo-analyzed-123456789012-us-east-1 emr-demo-work-123456789012-us-east-1 emr-demo-logs-123456789012-us-east-1 emr-demo-glue-db-123456789012-us-east-1 emr-demo-bootstrap-123456789012-us-east-1
There is a raw
data bucket (aka bronze) that will contain the original CSV files. There is a processed
data bucket (aka silver) that will contain data that might have had any number of actions applied: data cleansing, obfuscation, data transformation, file format changes, file compression, and data partitioning. Finally, there is an analyzed
data bucket (aka gold) that has the results of the data analysis. We also have a work
bucket that holds the PySpark applications, a logs
bucket that holds EMR logs, and a glue-db
bucket to hold the Glue Data Catalog metadata.
Whenever we submit PySpark jobs to EMR, the PySpark application files and data will always be accessed from Amazon S3. From the GitHub repository’s local copy, run the following command, which will execute a Python script to upload the approximately (38) Kaggle dataset CSV files to the raw
S3 data bucket.
python3 ./scripts/upload_csv_files_to_s3.py
Next, run the following command, which will execute a Python script to upload a series of PySpark application files to the work
S3 data bucket.
python3 ./scripts/upload_apps_to_s3.py
Step 7: Crawl Raw Data with Glue
The last preliminary step to prepare the EMR demonstration environment is to catalog the raw CSV data into an AWS Glue data catalog database, using one of the two Glue Crawlers we created. The three kaggle dataset’s data will reside in Amazon S3, while their schema and metadata will reside within tables in the Glue data catalog database, emr_demo
. When we eventually query the data from our PySpark applications, we will be querying the Glue data catalog’s database tables, which reference the underlying data in S3.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the raw data’s schema and metadata information into the Glue data catalog database, emr_demo
.
python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-raw
Once the crawler is finished, from the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo
, all prefixed with raw_
. The tables hold metadata and schema information for the three CSV-format Kaggle datasets loaded into S3.

Alternately, we can use the glue get-tables
AWS CLI command to review the tables.
> aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("raw_")).Name' raw_bakery raw_credits_csv raw_keywords_csv raw_links_csv raw_links_small_csv raw_movies_metadata_csv raw_ratings_csv raw_ratings_small_csv raw_stocks
PySpark Applications
Let’s explore four methods to run PySpark applications on EMR.

1. Add Job Flow Steps to an Existing EMR Cluster
We will start by looking at running PySpark applications using EMR Steps. According to AWS, we can use Amazon EMR steps to submit work to the Spark framework installed on an EMR cluster. The EMR step for PySpark uses a spark-submit
command. According to Spark’s documentation, the spark-submit
script, located in Spark’s bin
directory, is used to launch applications on a [EMR] cluster. A typical spark-submit
command we will be using resembles the following example. This command runs a PySpark application in S3, bakery_sales_ssm.py
.
We will target the existing EMR cluster created by CloudFormation earlier to execute our PySpark applications using EMR Steps. We have two sets of PySpark applications. The first set of three PySpark applications will transform the raw CSV-format datasets into Apache Parquet, a more efficient file format for big data analytics. Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data.
There are two versions of each PySpark application. Files with suffix _ssm
use the AWS Systems Manager (SSM) Parameter Store service to obtain dynamic parameter values at runtime on EMR. Corresponding non-SSM applications require those same parameter values to be passed on the command line when they are submitted to Spark. Therefore, these PySpark applications are not tightly coupled to boto3
or the SSM Parameter Store. We will use _ssm
versions of the scripts in this post’s demonstration.
> tree pyspark_apps --si -v -A
pyspark_apps
├── [ 320] analyze
│ ├── [1.4k] bakery_sales.py
│ ├── [1.5k] bakery_sales_ssm.py
│ ├── [2.6k] movie_choices.py
│ ├── [2.7k] movie_choices_ssm.py
│ ├── [2.0k] movies_avg_ratings.py
│ ├── [2.3k] movies_avg_ratings_ssm.py
│ ├── [2.2k] stock_volatility.py
│ └── [2.3k] stock_volatility_ssm.py
└── [ 256] process
├── [1.1k] bakery_csv_to_parquet.py
├── [1.3k] bakery_csv_to_parquet_ssm.py
├── [1.3k] movies_csv_to_parquet.py
├── [1.5k] movies_csv_to_parquet_ssm.py
├── [1.9k] stocks_csv_to_parquet.py
└── [2.0k] stocks_csv_to_parquet_ssm.py
We will start by executing the three PySpark processing applications. They will convert the CSV data to Parquet. Below, we see an example of one of the PySpark applications we will run, bakery_csv_to_parquet_ssm.py
. The PySpark application will convert the Bakery Sales dataset’s CSV file to Parquet and write it to S3.
The three PySpark data processing application’s spark-submit
commands are defined in a separate JSON-format file, job_flow_steps_process.json
, a snippet of which is shown below. The same goes for the four analytics applications.
Using this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python script, add_job_flow_steps_process.py
, shown below. Note line 31, where the Steps are injected into the add_job_flow_steps
method’s parameters.
The Python script used for this task takes advantage of AWS Systems Manager Parameter Store parameters. The parameters were placed in the Parameter Store, within the /emr_demo
path, by CloudFormation. We will reference these parameters in several scripts throughout the post.
> aws ssm get-parameters-by-path --path '/emr_demo' | \ jq -r ".Parameters[] | {Name: .Name, Value: .Value}"
From the GitHub repository’s local copy, run the following command, which will execute a Python script to load the three spark-submit
commands from JSON-format file, job_flow_steps_process.json
, and run the PySpark processing applications on the existing EMR cluster.
python3 ./scripts/add_job_flow_steps.py --job-type process
While the three Steps are running concurrently, the view from the Amazon EMR Console’s Cluster Steps tab should look similar to the example below.

Once the three Steps have been completed, we should note three sub-directories in the processed
data bucket containing Parquet-format files.

Of special note is the Stocks dataset, which has been converted to Parquet and partitioned by stock symbol. According to AWS, by partitioning your data, we can restrict the amount of data scanned by each query by specifying filters based on the partition, thus improving performance and reducing cost.

Lastly, the movie ratings dataset has been divided into sub-directories, based on the schema of each table. Each sub-directory contains Parquet files specific to that unique schema.

Crawl Processed Data with Glue
Similar to the raw data earlier, catalog the newly processed Parquet data into the same AWS Glue data catalog database using one of the two Glue Crawlers we created. Similar to the raw data, earlier, processed data will reside in the Amazon S3 processed
data bucket while their schemas and metadata will reside within tables in the Glue data catalog database, emr_demo
.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the processed data’s schema and metadata information into the Glue data catalog database, emr_demo
.
python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-processed
Once the crawler has finished successfully, using the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo
, all prefixed with processed_
. The tables represent the three kaggle dataset’s contents converted to Parquet and correspond to the equivalent tables with the raw_
prefix.

Alternately, we can use the glue get-tables
AWS CLI command to review the tables.
> aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("processed_")).Name' processed_bakery processed_credits processed_keywords processed_links processed_links_small processed_movies_metadata processed_ratings processed_ratings_small processed_stocks
2. Run PySpark Jobs from EMR Master Node
Next, we will explore how to execute PySpark applications remotely on the Master node on the EMR cluster using boto3
and SSH. Although this method may be optimal for certain use cases as opposed to using the EMR SDK, remote SSH execution does not scale as well in my opinion due to a lack of automation, and it exposes some potential security risks.
There are four PySpark applications in the GitHub repository. For this part of the demonstration, we will just submit the bakery_sales_ssm.py
application. This application will perform a simple analysis of the bakery sales data. While the other three PySpark applications use AWS Glue, the bakery_sales_ssm.py
application reads data directly from the processed
data S3 bucket.
The application writes its results into the analyzed
data S3 bucket, in both Parquet and CSV formats. The CSV file is handy for business analysts and other non-technical stakeholders who might wish to import the results of the analysis into Excel or business applications.
Earlier, we created an inbound rule to allow your IP address to access the Master node on port 22. From the EMR Console’s Cluster Summary tab, note the command necessary to SSH into the Master node of the EMR cluster.

The Python script, submit_spark_ssh.py
, shown below, will submit the PySpark job to the EMR Master Node, using paramiko
, a Python implementation of SSHv2. The script is replicating the same functionality as the shell-based SSH command above to execute a remote command on the EMR Master Node. The spark-submit
command is on lines 36–38, below.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to submit the job. The script requires one input parameter, which is the path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem
)
python3 ./scripts/submit_spark_ssh.py \
--ec2-key-path </path/to/my-key-pair.pem>
The spark-submit
command will be executed remotely on the EMR cluster’s Master node over SSH. All variables in the commands will be replaced by the environment variables, set in advance, which use AWS CLI emr
and ssm
commands.

Monitoring Spark Jobs
We set spark.yarn.submit.waitAppCompletion
to true
. According to Spark’s documentation, this property controls whether the client waits to exit in YARN cluster mode until the application is completed. If set to true
, the client process will stay alive, reporting the application’s status. Otherwise, the client process will exit after submission. We can watch the job’s progress from the terminal.

We can also use the YARN Timeline Server and the Spark History Server in addition to the terminal. Links to both are shown on both the EMR Console’s Cluster ‘Summary’ and ‘Application user interfaces’ tabs. Unlike other EMR application web interfaces, using port forwarding, also known as creating an SSH tunnel, is not required for the YARN Timeline Server or the Spark History Server.

YARN Timeline Server
Below, we see that the job we submitted running on the YARN Timeline Server also includes useful tools like access to configuration, local logs, server stacks, and server metrics.

YARN Timeline Server allows us to drill down into individual jobs and view logs. Logs are ideal for troubleshooting failed jobs, especially the stdout
logs.

Spark History Server
You can also view the PySpark application we submitted from the Master node using the Spark History Server. Below, we see completed Spark applications (aka Spark jobs) in the Spark History Server.

Below, we see more details about our Spark job using the Spark History Server.

We can even see visual representations of each Spark job’s Directed Acyclic Graph (DAG).

3. Run Job Flow on an Auto-Terminating EMR Cluster
The next option to run PySpark applications on EMR is to create a short-lived, auto-terminating EMR cluster using the run_job_flow
method. We will create a new EMR cluster, run a series of Steps (PySpark applications), and then auto-terminate the cluster. This is a cost-effective method of running PySpark applications on-demand.
We will create a second 3-node EMR v6.2.0 cluster to demonstrate this method, using Amazon EC2 Spot instances for all the EMR cluster’s Master and Core nodes. Unlike the first, long-lived, more general-purpose EMR cluster, we will only deploy the Spark application to this cluster as that is the only application we will need to run the Steps.
Using the run_job_flow
method, we will execute the four PySpark data analysis applications. The PySpark application’s spark-submit
commands are defined in a separate JSON-format file, job_flow_steps_analyze.json
. Similar to the previous add_job_flow_steps.py
script, this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python execution script. Also similar, this script retrieves parameter values from the SSM Parameter Store.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create a new cluster, run the two PySpark applications, and then auto-terminate.
python3 ./scripts/run_job_flow.py --job-type analyze
As shown below, we see the short-lived EMR cluster in the process of terminating after successfully running the PySpark applications as EMR Steps.


4. Using AWS Step Functions
According to AWS, AWS Step Functions is a serverless function orchestrator that makes it easy to sequence AWS Lambda functions and multiple AWS services. Step Functions manages sequencing, error handling, retry logic, and state, removing a significant operational burden from your team. Step Functions is based on state machines and tasks. A state machine is a workflow. A task is a state in a workflow that represents a single unit of work that another AWS service performs. Each step in a workflow is a state. Using AWS Step Functions, we define our workflows as state machines, which transform complex code into easy to understand statements and diagrams.

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.
We will create two state machines for this demo, one for the PySpark data processing applications and one for the PySpark data analysis applications. To create state machines, we first need to create JSON-based state machine definition files. The files are written in Amazon States Language. According to AWS, Amazon States Language is a JSON-based, structured language used to define a state machine, a collection of states that can do work (Task states), determine which states to transition to next (Choice states), stop execution with an error (Fail states), and so on.
The definition files contain specific references to AWS resources deployed to your AWS account originally created by CloudFormation. Below is a snippet of the state machine definition file, step_function_emr_analyze.json
, showing part of the configuration of the EMR cluster. Note the parameterized key/value pairs (e.g., “Ec2KeyName.$”: “$.InstancesEc2KeyName” on line 5). The values will come from a JSON-formatted inputs file and are dynamically replaced upon the state machine’s execution.
Python Templating
To automate the process of adding dynamic resource references to the state machine’s inputs files, we will use Jinja, the modern and designer-friendly templating language for Python, modeled after Django’s templates. We will render the Jinja template to a JSON-based state machine inputs file, replacing the template’s resource tags (keys) with values from the SSM Parameter Store’s parameters. Below is a snippet from the inputs file Jinja template, step_function_inputs_analyze.j2
.
First, install Jinja2, then create two JSON-based state machine inputs files from the Jinja templates using the included Python file.
# install Jinja2 python3 -m pip install Jinja2 python3 ./scripts/create_inputs_files.py
Below we see the same snippet of the final inputs file. Jinja tags have been replaced with values from the SSM Parameter Store.
Using the definition files, create two state machines using the included Python files.
python3 ./scripts/create_state_machine.py \ --definition-file step_function_emr_process.json \ --state-machine EMR-Demo-Process python3 ./scripts/create_state_machine.py \ --definition-file step_function_emr_analyze.json \ --state-machine EMR-Demo-Analysis
Both state machines should appear in the AWS Step Functions Console’s State Machines tab. Below, we see the ‘EMR-Demo-Analysis’ state machine’s definition both as JSON and rendered visually to a layout.

To execute either of the state machines, use the included Python file, passing in the exact name of the state machine to execute, either ‘EMR-Demo-Process’ or ‘EMR-Demo-Analysis’, and the name of the inputs file. I suggest running the EMR-Demo-Analysis version so as not to re-process all the raw data.
python3 ./scripts/execute_state_machine.py \ --state-machine EMR-Demo-Process \ --inputs-file step_function_inputs_process.json python3 ./scripts/execute_state_machine.py \ --state-machine EMR-Demo-Analysis \ --inputs-file step_function_inputs_analyze.json
When the PySpark analysis application’s Step Function state machine is executed, a new EMR cluster is created, the PySpark applications are run, and finally, the cluster is auto-terminated. Below, we see a successfully executed state machine, which successfully ran the four PySpark analysis applications in parallel, on a new auto-terminating EMR cluster.

Conclusion
This post explored four methods for running PySpark applications on Amazon Elastic MapReduce (Amazon EMR). The key to scaling data analytics with PySpark on EMR is the use of automation. Therefore, we looked at ways to automate the deployment of EMR resources, create and submit PySpark jobs, and terminate EMR resources when the jobs are complete. Furthermore, we were able to decouple references to dynamic AWS resources within our PySpark applications using parameterization. This allows us to deploy and run PySpark resources across multiple AWS Accounts and AWS Regions without code changes.
In part two of the series, we will explore the use of the recently announced service, Amazon Managed Workflows for Apache Airflow (MWAA), and in part three, the use of Juypter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Streaming Analytics with Data Warehouses, using Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight
Posted by Gary A. Stafford in AWS, Cloud, Python, Software Development, SQL on March 5, 2020
Introduction
Databases are ideal for storing and organizing data that requires a high volume of transaction-oriented query processing while maintaining data integrity. In contrast, data warehouses are designed for performing data analytics on vast amounts of data from one or more disparate sources. In our fast-paced, hyper-connected world, those sources often take the form of continuous streams of web application logs, e-commerce transactions, social media feeds, online gaming activities, financial trading transactions, and IoT sensor readings. Streaming data must be analyzed in near real-time, while often first requiring cleansing, transformation, and enrichment.
In the following post, we will demonstrate the use of Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight to analyze streaming data. We will simulate time-series data, streaming from a set of IoT sensors to Kinesis Data Firehose. Kinesis Data Firehose will write the IoT data to an Amazon S3 Data Lake, where it will then be copied to Redshift in near real-time. In Amazon Redshift, we will enhance the streaming sensor data with data contained in the Redshift data warehouse, which has been gathered and denormalized into a star schema.
In Redshift, we can analyze the data, asking questions like, what is the min, max, mean, and median temperature over a given time period at each sensor location. Finally, we will use Amazon Quicksight to visualize the Redshift data using rich interactive charts and graphs, including displaying geospatial sensor data.
Featured Technologies
The following AWS services are discussed in this post.
Amazon Kinesis Data Firehose
According to Amazon, Amazon Kinesis Data Firehose can capture, transform, and load streaming data into data lakes, data stores, and analytics tools. Direct Kinesis Data Firehose integrations include Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. Kinesis Data Firehose enables near real-time analytics with existing business intelligence (BI) tools and dashboards.
Amazon Redshift
According to Amazon, Amazon Redshift is the most popular and fastest cloud data warehouse. With Redshift, users can query petabytes of structured and semi-structured data across your data warehouse and data lake using standard SQL. Redshift allows users to query and export data to and from data lakes. Redshift can federate queries of live data from Redshift, as well as across one or more relational databases.
Amazon Redshift Spectrum
According to Amazon, Amazon Redshift Spectrum can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables. Redshift Spectrum tables are created by defining the structure for data files and registering them as tables in an external data catalog. The external data catalog can be AWS Glue or an Apache Hive metastore. While Redshift Spectrum is an alternative to copying the data into Redshift for analysis, we will not be using Redshift Spectrum in this post.
Amazon QuickSight
According to Amazon, Amazon QuickSight is a fully managed business intelligence service that makes it easy to deliver insights to everyone in an organization. QuickSight lets users easily create and publish rich, interactive dashboards that include Amazon QuickSight ML Insights. Dashboards can then be accessed from any device and embedded into applications, portals, and websites.
What is a Data Warehouse?
According to Amazon, a data warehouse is a central repository of information that can be analyzed to make better-informed decisions. Data flows into a data warehouse from transactional systems, relational databases, and other sources, typically on a regular cadence. Business analysts, data scientists, and decision-makers access the data through business intelligence tools, SQL clients, and other analytics applications.
Demonstration
Source Code
All the source code for this post can be found on GitHub. Use the following command to git clone a local copy of the project.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone \ | |
–branch master –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/kinesis-redshift-streaming-demo.git |
CloudFormation
Use the two AWS CloudFormation templates, included in the project, to build two CloudFormation stacks. Please review the two templates and understand the costs of the resources before continuing.
The first CloudFormation template, redshift.yml, provisions a new Amazon VPC with associated network and security resources, a single-node Redshift cluster, and two S3 buckets.
The second CloudFormation template, kinesis-firehose.yml, provisions an Amazon Kinesis Data Firehose delivery stream, associated IAM Policy and Role, and an Amazon CloudWatch log group and two log streams.
Change the REDSHIFT_PASSWORD
value to ensure your security. Optionally, change the REDSHIFT_USERNAME
value. Make sure that the first stack completes successfully, before creating the second stack.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export AWS_DEFAULT_REGION=us-east-1 | |
REDSHIFT_USERNAME=awsuser | |
REDSHIFT_PASSWORD=5up3r53cr3tPa55w0rd | |
# Create resources | |
aws cloudformation create-stack \ | |
–stack-name redshift-stack \ | |
–template-body file://cloudformation/redshift.yml \ | |
–parameters ParameterKey=MasterUsername,ParameterValue=${REDSHIFT_USERNAME} \ | |
ParameterKey=MasterUserPassword,ParameterValue=${REDSHIFT_PASSWORD} \ | |
ParameterKey=InboundTraffic,ParameterValue=$(curl ifconfig.me -s)/32 \ | |
–capabilities CAPABILITY_NAMED_IAM | |
# Wait for first stack to complete | |
aws cloudformation create-stack \ | |
–stack-name kinesis-firehose-stack \ | |
–template-body file://cloudformation/kinesis-firehose.yml \ | |
–parameters ParameterKey=MasterUserPassword,ParameterValue=${REDSHIFT_PASSWORD} \ | |
–capabilities CAPABILITY_NAMED_IAM |
Review AWS Resources
To confirm all the AWS resources were created correctly, use the AWS Management Console.
Kinesis Data Firehose
In the Amazon Kinesis Dashboard, you should see the new Amazon Kinesis Data Firehose delivery stream, redshift-delivery-stream.
The Details tab of the new Amazon Kinesis Firehose delivery stream should look similar to the following. Note the IAM Role, FirehoseDeliveryRole, which was created and associated with the delivery stream by CloudFormation.
We are not performing any transformations of the incoming messages. Note the new S3 bucket that was created and associated with the stream by CloudFormation. The bucket name was randomly generated. This bucket is where the incoming messages will be written.
Note the buffer conditions of 1 MB and 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written in JSON format, using GZIP compression, to S3. These are the minimal buffer conditions, and as close to real-time streaming to Redshift as we can get.
Note the COPY
command, which is used to copy the messages from S3 to the message
table in Amazon Redshift. Kinesis uses the IAM Role, ClusterPermissionsRole, created by CloudFormation, for credentials. We are using a Manifest to copy the data to Redshift from S3. According to Amazon, a Manifest ensures that the COPY
command loads all of the required files, and only the required files, for a data load. The Manifests are automatically generated and managed by the Kinesis Firehose delivery stream.
Redshift Cluster
In the Amazon Redshift Console, you should see a new single-node Redshift cluster consisting of one Redshift dc2.large Dense Compute node type.
Note the new VPC, Subnet, and VPC Security Group created by CloudFormation. Also, observe that the Redshift cluster is publicly accessible from outside the new VPC.
Redshift Ingress Rules
The single-node Redshift cluster is assigned to an AWS Availability Zone in the US East (N. Virginia) us-east-1 AWS Region. The cluster is associated with a VPC Security Group. The Security Group contains three inbound rules, all for Redshift port 5439. The IP addresses associated with the three inbound rules provide access to the following: 1) a /27
CIDR block for Amazon QuickSight in us-east-1, a /27
CIDR block for Amazon Kinesis Firehose in us-east-1, and to you, a /32
CIDR block with your current IP address. If your IP address changes or you do not use the us-east-1 Region, you will need to change one or all of these IP addresses. The list of Kinesis Firehose IP addresses is here. The list of QuickSight IP addresses is here.
If you cannot connect to Redshift from your local SQL client, most often, your IP address has changed and is incorrect in the Security Group’s inbound rule.
Redshift SQL Client
You can choose to use the Redshift Query Editor to interact with Redshift or use a third-party SQL client for greater flexibility. To access the Redshift Query Editor, use the user credentials specified in the redshift.yml CloudFormation template.
There is a lot of useful functionality in the Redshift Console and within the Redshift Query Editor. However, a notable limitation of the Redshift Query Editor, in my opinion, is the inability to execute multiple SQL statements at the same time. Whereas, most SQL clients allow multiple SQL queries to be executed at the same time.
I prefer to use JetBrains PyCharm IDE. PyCharm has out-of-the-box integration with Redshift. Using PyCharm, I can edit the project’s Python, SQL, AWS CLI shell, and CloudFormation code, all from within PyCharm.
If you use any of the common SQL clients, you will need to set-up a JDBC (Java Database Connectivity) or ODBC (Open Database Connectivity) connection to Redshift. The ODBC and JDBC connection strings can be found in the Redshift cluster’s Properties tab or in the Outputs tab from the CloudFormation stack, redshift-stack
.
You will also need the Redshift database username and password you included in the aws cloudformation create-stack
AWS CLI command you executed previously. Below, we see PyCharm’s Project Data Sources window containing a new data source for the Redshift dev
database.
Database Schema and Tables
When CloudFormation created the Redshift cluster, it also created a new database, dev
. Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL commands to create a new database schema, sensor
, and six tables in the sensor
schema.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
— Create new schema in Redshift DB | |
DROP SCHEMA IF EXISTS sensor CASCADE; | |
CREATE SCHEMA sensor; | |
SET search_path = sensor; | |
— Create (6) tables in Redshift DB | |
CREATE TABLE message — streaming data table | |
( | |
id BIGINT IDENTITY (1, 1), — message id | |
guid VARCHAR(36) NOT NULL, — device guid | |
ts BIGINT NOT NULL DISTKEY SORTKEY, — epoch in seconds | |
temp NUMERIC(5, 2) NOT NULL, — temperature reading | |
created TIMESTAMP DEFAULT ('now'::text)::timestamp with time zone — row created at | |
); | |
CREATE TABLE location — dimension table | |
( | |
id INTEGER NOT NULL DISTKEY SORTKEY, — location id | |
long NUMERIC(10, 7) NOT NULL, — longitude | |
lat NUMERIC(10, 7) NOT NULL, — latitude | |
description VARCHAR(256) — location description | |
); | |
CREATE TABLE history — dimension table | |
( | |
id INTEGER NOT NULL DISTKEY SORTKEY, — history id | |
serviced BIGINT NOT NULL, — service date | |
action VARCHAR(20) NOT NULL, — INSTALLED, CALIBRATED, FIRMWARE UPGRADED, DECOMMISSIONED, OTHER | |
technician_id INTEGER NOT NULL, — technician id | |
notes VARCHAR(256) — notes | |
); | |
CREATE TABLE sensor — dimension table | |
( | |
id INTEGER NOT NULL DISTKEY SORTKEY, — sensor id | |
guid VARCHAR(36) NOT NULL, — device guid | |
mac VARCHAR(18) NOT NULL, — mac address | |
sku VARCHAR(18) NOT NULL, — product sku | |
upc VARCHAR(12) NOT NULL, — product upc | |
active BOOLEAN DEFAULT TRUE, —active status | |
notes VARCHAR(256) — notes | |
); | |
CREATE TABLE manufacturer — dimension table | |
( | |
id INTEGER NOT NULL DISTKEY SORTKEY, — manufacturer id | |
name VARCHAR(100) NOT NULL, — company name | |
website VARCHAR(100) NOT NULL, — company website | |
notes VARCHAR(256) — notes | |
); | |
CREATE TABLE sensors — fact table | |
( | |
id BIGINT IDENTITY (1, 1) DISTKEY SORTKEY, — fact id | |
sensor_id INTEGER NOT NULL, — sensor id | |
manufacturer_id INTEGER NOT NULL, — manufacturer id | |
location_id INTEGER NOT NULL, — location id | |
history_id BIGINT NOT NULL, — history id | |
message_guid VARCHAR(36) NOT NULL — sensor guid | |
); |
Star Schema
The tables represent denormalized data, taken from one or more relational database sources. The tables form a star schema. The star schema is widely used to develop data warehouses. The star schema consists of one or more fact tables referencing any number of dimension tables. The location
, manufacturer
, sensor
, and history
tables are dimension tables. The sensors
table is a fact table.
In the diagram below, the foreign key relationships are virtual, not physical. The diagram was created using PyCharm’s schema visualization tool. Note the schema’s star shape. The message
table is where the streaming IoT data will eventually be written. The message
table is related to the sensors
fact table through the common guid
field.
Sample Data to S3
Next, copy the sample data, included in the project, to the S3 data bucket created with CloudFormation. Each CSV-formatted data file corresponds to one of the tables we previously created. Since the bucket name is semi-random, we can use the AWS CLI and jq to get the bucket name, then use it to perform the copy commands.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get data bucket name | |
DATA_BUCKET=$(aws cloudformation describe-stacks \ | |
–stack-name redshift-stack \ | |
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "DataBucket") | .OutputValue') | |
echo $DATA_BUCKET | |
# Copy data | |
aws s3 cp data/history.csv s3://${DATA_BUCKET}/history/history.csv | |
aws s3 cp data/location.csv s3://${DATA_BUCKET}/location/location.csv | |
aws s3 cp data/manufacturer.csv s3://${DATA_BUCKET}/manufacturer/manufacturer.csv | |
aws s3 cp data/sensor.csv s3://${DATA_BUCKET}/sensor/sensor.csv | |
aws s3 cp data/sensors.csv s3://${DATA_BUCKET}/sensors/sensors.csv |
The output from the AWS CLI should look similar to the following.
Sample Data to Redshift
Whereas a relational database, such as Amazon RDS is designed for online transaction processing (OLTP), Amazon Redshift is designed for online analytic processing (OLAP) and business intelligence applications. To write data to Redshift we typically use the COPY
command versus frequent, individual INSERT
statements, as with OLTP, which would be prohibitively slow. According to Amazon, the Redshift COPY
command leverages the Amazon Redshift massively parallel processing (MPP) architecture to read and load data in parallel from files on Amazon S3, from a DynamoDB table, or from text output from one or more remote hosts.
In the following series of SQL statements, replace the placeholder, your_bucket_name
, in five places with your S3 data bucket name. The bucket name will start with the prefix, redshift-stack-databucket
. The bucket name can be found in the Outputs tab of the CloudFormation stack, redshift-stack
. Next, replace the placeholder, cluster_permissions_role_arn
, with the ARN (Amazon Resource Name) of the ClusterPermissionsRole. The ARN is formatted as follows, arn:aws:iam::your-account-id:role/ClusterPermissionsRole
. The ARN can be found in the Outputs tab of the CloudFormation stack, redshift-stack
.
Using the Redshift Query Editor or your SQL client of choice, execute the SQL statements to copy the sample data from S3 to each of the corresponding tables in the Redshift dev
database. The TRUNCATE
commands guarantee there is no previous sample data present in the tables.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
— ** MUST FIRST CHANGE your_bucket_name and cluster_permissions_role_arn ** | |
— sensor schema | |
SET search_path = sensor; | |
— Copy sample data to tables from S3 | |
TRUNCATE TABLE history; | |
COPY history (id, serviced, action, technician_id, notes) | |
FROM 's3://your_bucket_name/history/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
TRUNCATE TABLE location; | |
COPY location (id, long, lat, description) | |
FROM 's3://your_bucket_name/location/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
TRUNCATE TABLE sensor; | |
COPY sensor (id, guid, mac, sku, upc, active, notes) | |
FROM 's3://your_bucket_name/sensor/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
TRUNCATE TABLE manufacturer; | |
COPY manufacturer (id, name, website, notes) | |
FROM 's3://your_bucket_name/manufacturer/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
TRUNCATE TABLE sensors; | |
COPY sensors (sensor_id, manufacturer_id, location_id, history_id, message_guid) | |
FROM 's3://your_bucket_name/sensors/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
SELECT COUNT(*) FROM history; — 30 | |
SELECT COUNT(*) FROM location; — 6 | |
SELECT COUNT(*) FROM sensor; — 6 | |
SELECT COUNT(*) FROM manufacturer; —1 | |
SELECT COUNT(*) FROM sensors; — 30 |
Database Views
Next, create four Redshift database Views. These views may be used to analyze the data in Redshift, and later, in Amazon QuickSight.
- sensor_msg_detail: Returns aggregated sensor details, using the
sensors
fact table and all five dimension tables in a SQL Join. - sensor_msg_count: Returns the number of messages received by Redshift, for each sensor.
- sensor_avg_temp: Returns the average temperature from each sensor, based on all the messages received from each sensor.
- sensor_avg_temp_current: View is identical for the previous view but limited to the last 30 minutes.
Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL statements.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
— sensor schema | |
SET search_path = sensor; | |
— View 1: Sensor details | |
DROP VIEW IF EXISTS sensor_msg_detail; | |
CREATE OR REPLACE VIEW sensor_msg_detail AS | |
SELECT ('1970-01-01'::date + e.ts * interval '1 second') AS recorded, | |
e.temp, | |
s.guid, | |
s.sku, | |
s.mac, | |
l.lat, | |
l.long, | |
l.description AS location, | |
('1970-01-01'::date + h.serviced * interval '1 second') AS installed, | |
e.created AS redshift | |
FROM sensors f | |
INNER JOIN sensor s ON (f.sensor_id = s.id) | |
INNER JOIN history h ON (f.history_id = h.id) | |
INNER JOIN location l ON (f.location_id = l.id) | |
INNER JOIN manufacturer m ON (f.manufacturer_id = m.id) | |
INNER JOIN message e ON (f.message_guid = e.guid) | |
WHERE s.active IS TRUE | |
AND h.action = 'INSTALLED' | |
ORDER BY f.id; | |
— View 2: Message count per sensor | |
DROP VIEW IF EXISTS sensor_msg_count; | |
CREATE OR REPLACE VIEW sensor_msg_count AS | |
SELECT count(e.temp) AS msg_count, | |
s.guid, | |
l.lat, | |
l.long, | |
l.description AS location | |
FROM sensors f | |
INNER JOIN sensor s ON (f.sensor_id = s.id) | |
INNER JOIN history h ON (f.history_id = h.id) | |
INNER JOIN location l ON (f.location_id = l.id) | |
INNER JOIN message e ON (f.message_guid = e.guid) | |
WHERE s.active IS TRUE | |
AND h.action = 'INSTALLED' | |
GROUP BY s.guid, l.description, l.lat, l.long | |
ORDER BY msg_count, s.guid; | |
— View 3: Average temperature per sensor (all data) | |
DROP VIEW IF EXISTS sensor_avg_temp; | |
CREATE OR REPLACE VIEW sensor_avg_temp AS | |
SELECT avg(e.temp) AS avg_temp, | |
count(s.guid) AS msg_count, | |
s.guid, | |
l.lat, | |
l.long, | |
l.description AS location | |
FROM sensors f | |
INNER JOIN sensor s ON (f.sensor_id = s.id) | |
INNER JOIN history h ON (f.history_id = h.id) | |
INNER JOIN location l ON (f.location_id = l.id) | |
INNER JOIN message e ON (f.message_guid = e.guid) | |
WHERE s.active IS TRUE | |
AND h.action = 'INSTALLED' | |
GROUP BY s.guid, l.description, l.lat, l.long | |
ORDER BY avg_temp, s.guid; | |
— View 4: Average temperature per sensor (last 30 minutes) | |
DROP VIEW IF EXISTS sensor_avg_temp_current; | |
CREATE OR REPLACE VIEW sensor_avg_temp_current AS | |
SELECT avg(e.temp) AS avg_temp, | |
count(s.guid) AS msg_count, | |
s.guid, | |
l.lat, | |
l.long, | |
l.description AS location | |
FROM sensors f | |
INNER JOIN sensor s ON (f.sensor_id = s.id) | |
INNER JOIN history h ON (f.history_id = h.id) | |
INNER JOIN location l ON (f.location_id = l.id) | |
INNER JOIN (SELECT ('1970-01-01'::date + ts * interval '1 second') AS recorded_time, | |
guid, | |
temp | |
FROM message | |
WHERE DATEDIFF(minute, recorded_time, GETDATE()) <= 30) e ON (f.message_guid = e.guid) | |
WHERE s.active IS TRUE | |
AND h.action = 'INSTALLED' | |
GROUP BY s.guid, l.description, l.lat, l.long | |
ORDER BY avg_temp, s.guid; |
At this point, you should have a total of six tables and four views in the sensor
schema of the dev
database in Redshift.
Test the System
With all the necessary AWS resources and Redshift database objects created and sample data in the Redshift database, we can test the system. The included Python script, kinesis_put_test_msg.py, will generate a single test message and send it to Kinesis Data Firehose. If everything is working, the message should be delivered from Kinesis Data Firehose to S3, then copied to Redshift, and appear in the message
table.
Install the required Python packages and then execute the Python script.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Install required Python packages | |
python3 -m pip install –user -r scripts/requirements.txt | |
# Set default AWS Region for script | |
export AWS_DEFAULT_REGION=us-east-1 | |
# Execute script in foreground | |
python3 ./scripts/kinesis_put_test_msg.py |
Run the following SQL query to confirm the record is in the message
table of the dev
database. It will take at least one minute for the message to appear in Redshift.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT COUNT(*) FROM message; |
Once the message is confirmed to be present in the message
table, delete the record by truncating the table.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TRUNCATE TABLE message; |
Streaming Data
Assuming the test message worked, we can proceed with simulating the streaming IoT sensor data. The included Python script, kinesis_put_streaming_data.py, creates six concurrent threads, representing six temperature sensors.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Simulated multiple streaming time-series iot sensor data | |
# Author: Gary A. Stafford | |
# Date: Revised October 2020 | |
import json | |
import random | |
from datetime import datetime | |
import boto3 | |
import time as tm | |
import numpy as np | |
import threading | |
STREAM_NAME = 'redshift-delivery-stream' | |
client = boto3.client('firehose') | |
class MyThread(threading.Thread): | |
def __init__(self, thread_id, sensor_guid, temp_max): | |
threading.Thread.__init__(self) | |
self.thread_id = thread_id | |
self.sensor_id = sensor_guid | |
self.temp_max = temp_max | |
def run(self): | |
print("Starting Thread: " + str(self.thread_id)) | |
self.create_data() | |
print("Exiting Thread: " + str(self.thread_id)) | |
def create_data(self): | |
start = 0 | |
stop = 20 | |
step = 0.1 # step size (e.g 0 to 20, step .1 = 200 steps in cycle) | |
repeat = 2 # how many times to repeat cycle | |
freq = 60 # frequency of temperature reading in seconds | |
max_range = int(stop * (1 / step)) | |
time = np.arange(start, stop, step) | |
amplitude = np.sin(time) | |
for x in range(0, repeat): | |
for y in range(0, max_range): | |
temperature = round((((amplitude[y] + 1.0) * self.temp_max) + random.uniform(–5, 5)) + 60, 2) | |
payload = { | |
'guid': self.sensor_id, | |
'ts': int(datetime.now().strftime('%s')), | |
'temp': temperature | |
} | |
print(json.dumps(payload)) | |
self.send_to_kinesis(payload) | |
tm.sleep(freq) | |
@staticmethod | |
def send_to_kinesis(payload): | |
_ = client.put_record( | |
DeliveryStreamName=STREAM_NAME, | |
Record={ | |
'Data': json.dumps(payload) | |
} | |
) | |
def main(): | |
sensor_guids = [ | |
"03e39872-e105-4be4-83c0-9ade818465dc", | |
"fa565921-fddd-4bfb-a7fd-d617f816df4b", | |
"d120422d-5789-435d-9dc6-73d8489b04c2", | |
"93238559-4d55-4b2a-bdcb-6aa3be0f3908", | |
"dbc05806-6872-4f0a-aca2-f794cc39bd9b", | |
"f9ade639-f936-4954-aa5a-1f2ed86c9bcf" | |
] | |
timeout = 300 # arbitrarily offset the start of threads (60 / 5 = 12) | |
# Create new threads | |
thread1 = MyThread(1, sensor_guids[0], 25) | |
thread2 = MyThread(2, sensor_guids[1], 10) | |
thread3 = MyThread(3, sensor_guids[2], 7) | |
thread4 = MyThread(4, sensor_guids[3], 30) | |
thread5 = MyThread(5, sensor_guids[4], 5) | |
thread6 = MyThread(6, sensor_guids[5], 12) | |
# Start new threads | |
thread1.start() | |
tm.sleep(timeout * 1) | |
thread2.start() | |
tm.sleep(timeout * 2) | |
thread3.start() | |
tm.sleep(timeout * 1) | |
thread4.start() | |
tm.sleep(timeout * 3) | |
thread5.start() | |
tm.sleep(timeout * 2) | |
thread6.start() | |
# Wait for threads to terminate | |
thread1.join() | |
thread2.join() | |
thread3.join() | |
thread4.join() | |
thread5.join() | |
thread6.join() | |
print("Exiting Main Thread") | |
if __name__ == '__main__': | |
main() |
The simulated data uses an algorithm that follows an oscillating sine wave or sinusoid, representing rising and falling temperatures. In the script, I have configured each thread to start with an arbitrary offset to add some randomness to the simulated data.
The variables within the script can be adjusted to shorten or lengthen the time it takes to stream the simulated data. By default, each of the six threads creates 400 messages per sensor, in one-minute increments. Including the offset start of each proceeding thread, the total runtime of the script is about 7.5 hours to generate 2,400 simulated IoT sensor temperature readings and push to Kinesis Data Firehose. Make sure you can guarantee you will maintain a connection to the Internet for the entire runtime of the script. I normally run the script in the background, from a small EC2 instance.
To use the Python script, execute either of the two following commands. Using the first command will run the script in the foreground. Using the second command will run the script in the background.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Install required Python packages | |
python3 -m pip install –user -r scripts/requirements.txt | |
# Set default AWS Region for script | |
export AWS_DEFAULT_REGION=us-east-1 | |
# Option #1: Execute script in foreground | |
python3 ./scripts/kinesis_put_streaming_data.py | |
# Option #2: execute script in background | |
nohup python3 -u ./scripts/kinesis_put_streaming_data.py > output.log 2>&1 </dev/null & | |
# Check that the process is running | |
ps -aux | grep 'python3 -u ./scripts/kinesis_put_streaming_data.py' | |
# Wait 1-2 minutes, then check output to confirm script is working | |
cat output.log |
Viewing the output.log file, you should see messages being generated on each thread and sent to Kinesis Data Firehose. Each message contains the GUID of the sensor, a timestamp, and a temperature reading.
The messages are sent to Kinesis Data Firehose, which in turn writes the messages to S3. The messages are written in JSON format using GZIP compression. Below, we see an example of the GZIP compressed JSON files in S3. The JSON files are partitioned by year, month, day, and hour.
Confirm Data Streaming to Redshift
From the Amazon Kinesis Firehose Console Metrics tab, you should see incoming messages flowing to S3 and on to Redshift.
Executing the following SQL query should show an increasing number of messages.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT COUNT(*) FROM message; |
How Near Real-time?
Earlier, we saw how the Amazon Kinesis Data Firehose delivery stream was configured to buffer data at the rate of 1 MB or 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written to S3. Each record in the message
table has two timestamps. The first timestamp, ts, is when the temperature reading was recorded. The second timestamp, created, is when the message was written to Redshift, using the COPY
command. We can calculate the delta in seconds between the two timestamps using the following SQL query in Redshift.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT ('1970-01-01'::date + ts * interval '1 second') AS recorded_time, | |
created AS redshift_time, | |
DATEDIFF(seconds, recorded_time, redshift_time) AS diff_seconds | |
FROM message | |
ORDER BY diff_seconds; |
Using the results of the Redshift query, we can visualize the results in Amazon QuickSight. In my own tests, we see that for 2,400 messages, over approximately 7.5 hours, the minimum delay was 1 second, and a maximum delay was 64 seconds. Hence, near real-time, in this case, is about one minute or less, with an average latency of roughly 30 seconds.
Analyzing the Data with Redshift
I suggest waiting at least thirty minutes for a significant number of messages copied into Redshift. With the data streaming into Redshift, execute each of the database views we created earlier. You should see the streaming message data, joined to the existing static data in Redshift. As data continues to stream into Redshift, the views will display different results based on the current message
table contents.
Here, we see the first ten results of the sensor_msg_detail
view.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
recorded | temp | guid | sku | mac | lat | long | location | installed | redshift | |
---|---|---|---|---|---|---|---|---|---|---|
2020-03-04 03:31:59.000000 | 105.56 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:33:01.580147 | |
2020-03-04 03:29:59.000000 | 95.93 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:31:01.388887 | |
2020-03-04 03:26:58.000000 | 91.93 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:28:01.099796 | |
2020-03-04 03:25:58.000000 | 88.70 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:26:00.196113 | |
2020-03-04 03:22:58.000000 | 87.65 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:23:01.558514 | |
2020-03-04 03:20:58.000000 | 77.35 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:21:00.691347 | |
2020-03-04 03:16:57.000000 | 71.84 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:17:59.307510 | |
2020-03-04 03:15:57.000000 | 72.35 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:15:59.813656 | |
2020-03-04 03:14:57.000000 | 67.95 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:15:59.813656 |
Next, we see the results of the sensor_avg_temp
view.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
avg_temp | guid | lat | long | location | |
---|---|---|---|---|---|
65.25 | dbc05806-6872-4f0a-aca2-f794cc39bd9b | 37.7066541 | -122.4181399 | Wafer Inspection Lab #0210A | |
67.23 | d120422d-5789-435d-9dc6-73d8489b04c2 | 37.7072686 | -122.4187016 | Zone 4 Wafer Processing Area B3 | |
70.23 | fa565921-fddd-4bfb-a7fd-d617f816df4b | 37.7071763 | -122.4190397 | Research Lab #2209 | |
72.22 | f9ade639-f936-4954-aa5a-1f2ed86c9bcf | 37.7067618 | -122.4186191 | Wafer Inspection Lab #0211C | |
85.48 | 03e39872-e105-4be4-83c0-9ade818465dc | 37.7068476 | -122.4191599 | Research Lab #2203 | |
90.69 | 93238559-4d55-4b2a-bdcb-6aa3be0f3908 | 37.7070334 | -122.4184393 | Zone 2 Semiconductor Assembly Area A2 |
Amazon QuickSight
In a recent post, Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 2, I detailed getting started with Amazon QuickSight. In this post, I will assume you are familiar with QuickSight.
Amazon recently added a full set of aws quicksight
APIs for interacting with QuickSight. Though, for this part of the demonstration, we will be working directly in the Amazon QuickSight Console, as opposed to the AWS CLI, AWS CDK, or CloudFormation.
Redshift Data Sets
To visualize the data from Amazon Redshift, we start by creating Data Sets in QuickSight. QuickSight supports a large number of data sources for creating data sets. We will use the Redshift data source. If you recall, we added an inbound rule for QuickSight, allowing us to connect to our Redshift cluster in us-east-1.
We will select the sensor
schema, which is where the tables and views for this demonstration are located.
We can choose any of the tables or views in the Redshift dev
database that we want to use for visualization.
Below, we see examples of two new data sets, shown in the QuickSight Data Prep Console. Note how QuickSight automatically recognizes field types, including dates, latitude, and longitude.
Visualizations
Using the data sets, QuickSight allows us to create a wide number of rich visualizations. Below, we see the simulated time-series data from the six temperature sensors.
Next, we see an example of QuickSight’s ability to show geospatial data. The Map shows the location of each sensor and the average temperature recorded by that sensor.
Cleaning Up
To remove the resources created for this post, use the following series of AWS CLI commands.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get data bucket name | |
DATA_BUCKET=$(aws cloudformation describe-stacks \ | |
–stack-name redshift-stack \ | |
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "DataBucket") | .OutputValue') | |
echo ${DATA_BUCKET} | |
# Get log bucket name | |
LOG_BUCKET=$(aws cloudformation describe-stacks \ | |
–stack-name redshift-stack \ | |
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "LogBucket") | .OutputValue') | |
echo ${LOG_BUCKET} | |
# Delete demonstration resources | |
python3 ./scripts/delete_buckets.py | |
aws cloudformation delete-stack –stack-name kinesis-firehose-stack | |
# Wait for first stack to be deleted | |
aws cloudformation delete-stack –stack-name redshift-stack |
Conclusion
In this brief post, we have learned how streaming data can be analyzed in near real-time, in Amazon Redshift, using Amazon Kinesis Data Firehose. Further, we explored how the results of those analyses can be visualized in Amazon QuickSight. For customers that depend on a data warehouse for data analytics, but who also have streaming data sources, the use of Amazon Kinesis Data Firehose or Amazon Redshift Spectrum is an excellent choice.
This blog represents my own viewpoints and not of my employer, Amazon Web Services.
Getting Started with PostgreSQL using Amazon RDS, CloudFormation, pgAdmin, and Python
Posted by Gary A. Stafford in AWS, Cloud, Python, Software Development on August 9, 2019
Introduction
In the following post, we will explore how to get started with Amazon Relational Database Service (RDS) for PostgreSQL. CloudFormation will be used to build a PostgreSQL master database instance and a single read replica in a new VPC. AWS Systems Manager Parameter Store will be used to store our CloudFormation configuration values. Amazon RDS Event Notifications will send text messages to our mobile device to let us know when the RDS instances are ready for use. Once running, we will examine a variety of methods to interact with our database instances, including pgAdmin, Adminer, and Python.
Technologies
The primary technologies used in this post include the following.
PostgreSQL
According to its website, PostgreSQL, commonly known as Postgres, is the world’s most advanced Open Source relational database. Originating at UC Berkeley in 1986, PostgreSQL has more than 30 years of active core development. PostgreSQL has earned a strong reputation for its proven architecture, reliability, data integrity, robust feature set, extensibility. PostgreSQL runs on all major operating systems and has been ACID-compliant since 2001.
Amazon RDS for PostgreSQL
According to Amazon, Amazon Relational Database Service (RDS) provides six familiar database engines to choose from, including Amazon Aurora, PostgreSQL, MySQL, MariaDB, Oracle Database, and SQL Server. RDS is available on several database instance types - optimized for memory, performance, or I/O.
Amazon RDS for PostgreSQL makes it easy to set up, operate, and scale PostgreSQL deployments in the cloud. Amazon RDS supports the latest PostgreSQL version 11, which includes several enhancements to performance, robustness, transaction management, query parallelism, and more.
AWS CloudFormation
According to Amazon, CloudFormation provides a common language to describe and provision all the infrastructure resources within AWS-based cloud environments. CloudFormation allows you to use a JSON- or YAML-based template to model and provision all the resources needed for your applications across all AWS regions and accounts, in an automated and secure manner.
Demonstration
Architecture
Below, we see an architectural representation of what will be built in the demonstration. This is not a typical three-tier AWS architecture, wherein the RDS instances would be placed in private subnets (data tier) and accessible only by the application tier, running on AWS. The architecture for the demonstration is designed for interacting with RDS through external database clients such as pgAdmin, and applications like our local Python scripts, detailed later in the post.
Source Code
All source code for this post is available on GitHub in a single public repository, postgres-rds-demo.
. ├── LICENSE.md ├── README.md ├── cfn-templates │ ├── event.template │ ├── rds.template ├── parameter_store_values.sh ├── python-scripts │ ├── create_pagila_data.py │ ├── database.ini │ ├── db_config.py │ ├── query_postgres.py │ ├── requirements.txt │ └── unit_tests.py ├── sql-scripts │ ├── pagila-insert-data.sql │ └── pagila-schema.sql └── stack.yml
To clone the GitHub repository, execute the following command.
git clone --branch master --single-branch --depth 1 --no-tags \ https://github.com/garystafford/aws-rds-postgres.git
Prerequisites
For this demonstration, I will assume you already have an AWS account. Further, that you have the latest copy of the AWS CLI and Python 3 installed on your development machine. Optionally, for pgAdmin and Adminer, you will also need to have Docker installed.
Steps
In this demonstration, we will perform the following steps.
- Put CloudFormation configuration values in Parameter Store;
- Execute CloudFormation templates to create AWS resources;
- Execute SQL scripts using Python to populate the new database with sample data;
- Configure pgAdmin and Python connections to RDS PostgreSQL instances;
AWS Systems Manager Parameter Store
With AWS, it is typical to use services like AWS Systems Manager Parameter Store and AWS Secrets Manager to store overt, sensitive, and secret configuration values. These values are utilized by your code, or from AWS services like CloudFormation. Parameter Store allows us to follow the proper twelve-factor, cloud-native practice of separating configuration from code.
To demonstrate the use of Parameter Store, we will place a few of our CloudFormation configuration items into Parameter Store. The demonstration’s GitHub repository includes a shell script, parameter_store_values.sh
, which will put the necessary parameters into Parameter Store.
Below, we see several of the demo’s configuration values, which have been put into Parameter Store.
SecureString
Whereas our other parameters are stored in Parameter Store as String datatypes, the database’s master user password is stored as a SecureString data-type. Parameter Store uses an AWS Key Management Service (KMS) customer master key (CMK) to encrypt the SecureString parameter value.
SMS Text Alert Option
Before running the Parameter Store script, you will need to change the /rds_demo/alert_phone
parameter value in the script (shown below) to your mobile device number, including country code, such as ‘+12038675309’. Amazon SNS will use it to send SMS messages, using Amazon RDS Event Notification. If you don’t want to use this messaging feature, simply ignore this parameter and do not execute the event.template
CloudFormation template in the proceeding step.
aws ssm put-parameter \ --name /rds_demo/alert_phone \ --type String \ --value "your_phone_number_here" \ --description "RDS alert SMS phone number" \ --overwrite
Run the following command to execute the shell script, parameter_store_values.sh
, which will put the necessary parameters into Parameter Store.
sh ./parameter_store_values.sh
CloudFormation Templates
The GitHub repository includes two CloudFormation templates, cfn-templates/event.template
and cfn-templates/rds.template
. This event template contains two resources, which are an AWS SNS Topic and an AWS RDS Event Subscription. The RDS template also includes several resources, including a VPC, Internet Gateway, VPC security group, two public subnets, one RDS master database instance, and an AWS RDS Read Replica database instance.
The resources are split into two CloudFormation templates so we can create the notification resources, first, independently of creating or deleting the RDS instances. This will ensure we get all our SMS alerts about both the creation and deletion of the databases.
Template Parameters
The two CloudFormation templates contain a total of approximately fifteen parameters. For most, you can use the default values I have set or chose to override them. Four of the parameters will be fulfilled from Parameter Store. Of these, the master database password is treated slightly differently because it is secure (encrypted in Parameter Store). Below is a snippet of the template showing both types of parameters. The last two are fulfilled from Parameter Store.
DBInstanceClass: Type: String Default: "db.t3.small" DBStorageType: Type: String Default: "gp2" DBUser: Type: String Default: "{{resolve:ssm:/rds_demo/master_username:1}}" DBPassword: Type: String Default: "{{resolve:ssm-secure:/rds_demo/master_password:1}}" NoEcho: True
Choosing the default CloudFormation parameter values will result in two minimally-configured RDS instances running the PostgreSQL 11.4 database engine on a db.t3.small instance with 10 GiB of General Purpose (SSD) storage. The db.t3 DB instance is part of the latest generation burstable performance instance class. The master instance is not configured for Multi-AZ high availability. However, the master and read replica each run in a different Availability Zone (AZ) within the same AWS Region.
Parameter Versioning
When placing parameters into Parameter Store, subsequent updates to a parameter result in the version number of that parameter being incremented. Note in the examples above, the version of the parameter is required by CloudFormation, here, ‘1’. If you chose to update a value in Parameter Store, thus incrementing the parameter’s version, you will also need to update the corresponding version number in the CloudFormation template’s parameter.
{ "Parameter": { "Name": "/rds_demo/rds_username", "Type": "String", "Value": "masteruser", "Version": 1, "LastModifiedDate": 1564962073.774, "ARN": "arn:aws:ssm:us-east-1:1234567890:parameter/rds_demo/rds_username" } }
Validating Templates
Although I have tested both templates, I suggest validating the templates yourself, as you usually would for any CloudFormation template you are creating. You can use the AWS CLI CloudFormation validate-template
CLI command to validate the template. Alternately, or I suggest additionally, you can use CloudFormation Linter, cfn-lint
command.
aws cloudformation validate-template \ --template-body file://cfn-templates/rds.template cfn-lint -t cfn-templates/cfn-templates/rds.template
Create the Stacks
To execute the first CloudFormation template and create a CloudFormation Stack containing the two event notification resources, run the following create-stack
CLI command.
aws cloudformation create-stack \ --template-body file://cfn-templates/event.template \ --stack-name RDSEventDemoStack
The first stack only takes less than one minute to create. Using the AWS CloudFormation Console, make sure the first stack completes successfully before creating the second stack with the command, below.
aws cloudformation create-stack \ --template-body file://cfn-templates/rds.template \ --stack-name RDSDemoStack
Wait for my Text
In my tests, the CloudFormation RDS stack takes an average of 25–30 minutes to create and 15–20 minutes to delete, which can seem like an eternity. You could use the AWS CloudFormation console (shown below) or continue to use the CLI to follow the progress of the RDS stack creation.
However, if you recall, the CloudFormation event template creates an AWS RDS Event Subscription. This resource will notify us when the databases are ready by sending text messages to our mobile device.
In the CloudFormation events template, the RDS Event Subscription is configured to generate Amazon Simple Notification Service (SNS) notifications for several specific event types, including RDS instance creation and deletion.
MyEventSubscription: Properties: Enabled: true EventCategories: - availability - configuration change - creation - deletion - failover - failure - recovery SnsTopicArn: Ref: MyDBSNSTopic SourceType: db-instance Type: AWS::RDS::EventSubscription
Amazon SNS will send SMS messages to the mobile number you placed into Parameter Store. Below, we see messages generated during the creation of the two instances, displayed on an Apple iPhone.
Amazon RDS Dashboard
Once the RDS CloudFormation stack has successfully been built, the easiest way to view the results is using the Amazon RDS Dashboard, as shown below. Here we see both the master and read replica instances have been created and are available for our use.
The RDS dashboard offers CloudWatch monitoring of each RDS instance.
The RDS dashboard also provides detailed configuration information about each RDS instance.
The RDS dashboard’s Connection & security tab is where we can obtain connection information about our RDS instances, including the RDS instance’s endpoints. Endpoints information will be required in the next part of the demonstration.
Sample Data
Now that we have our PostgreSQL database instance and read replica successfully provisioned and configured on AWS, with an empty database, we need some test data. There are several sources of sample PostgreSQL databases available on the internet to explore. We will use the Pagila sample movie rental database by pgFoundry. Although the database is several years old, it provides a relatively complex relational schema (table relationships shown below) and plenty of sample data to query, about 100 database objects and 46K rows of data.
In the GitHub repository, I have included the two Pagila database SQL scripts required to install the sample database’s data structures (DDL), sql-scripts/pagila-schema.sql
, and the data itself (DML), sql-scripts/pagila-insert-data.sql
.
To execute the Pagila SQL scripts and install the sample data, I have included a Python script. If you do not want to use Python, you can skip to the Adminer section of this post. Adminer also has the capability to import SQL scripts.
Before running any of the included Python scripts, you will need to install the required Python packages and configure the database.ini
file.
Python Packages
To install the required Python packages using the supplied python-scripts/requirements.txt
file, run the below commands.
cd python-scripts pip3 install --upgrade -r requirements.txt
We are using two packages, psycopg2 and configparser, for the scripts. Psycopg is a PostgreSQL database adapter for Python. According to their website, Psycopg is the most popular PostgreSQL database adapter for the Python programming language. The configparser
module allows us to read configuration from files similar to Microsoft Windows INI files. The unittest package is required for a set of unit tests includes the project, but not discussed as part of the demo.
Database Configuration
The python-scripts/database.ini
file, read by configparser
, provides the required connection information to our RDS master and read replica instance’s databases. Use the input parameters and output values from the CloudFormation RDS template, or the Amazon RDS Dashboard to obtain the required connection information, as shown in the example, below. Your host
values will be unique for your master and read replica. The host values are the instance’s endpoint, listed in the RDS Dashboard’s Configuration tab.
[docker] host=localhost port=5432 database=pagila user=masteruser password=5up3r53cr3tPa55w0rd [master] host=demo-instance.dkfvbjrazxmd.us-east-1.rds.amazonaws.com port=5432 database=pagila user=masteruser password=5up3r53cr3tPa55w0rd [replica] host=demo-replica.dkfvbjrazxmd.us-east-1.rds.amazonaws.com port=5432 database=pagila user=masteruser password=5up3r53cr3tPa55w0rd
With the INI file configured, run the following command, which executes a supplied Python script, python-scripts/create_pagila_data.py
, to create the data structure and insert sample data into the master RDS instance’s Pagila database. The database will be automatically replicated to the RDS read replica instance. From my local laptop, I found the Python script takes approximately 40 seconds to create all 100 database objects and insert 46K rows of movie rental data. That is compared to about 13 seconds locally, using a Docker-based instance of PostgreSQL.
python3 ./create_pagila_data.py --instance master
The Python script’s primary function, create_pagila_db()
, reads and executes the two external SQL scripts.
def create_pagila_db(): """ Creates Pagila database by running DDL and DML scripts """ try: global conn with conn: with conn.cursor() as curs: curs.execute(open("../sql-scripts/pagila-schema.sql", "r").read()) curs.execute(open("../sql-scripts/pagila-insert-data.sql", "r").read()) conn.commit() print('Pagila SQL scripts executed') except (psycopg2.OperationalError, psycopg2.DatabaseError, FileNotFoundError) as err: print(create_pagila_db.__name__, err) close_conn() exit(1)
If the Python script executes correctly, you should see output indicating there are now 28 tables in our master RDS instance’s database.
pgAdmin
pgAdmin is a favorite tool for interacting with and managing PostgreSQL databases. According to its website, pgAdmin is the most popular and feature-rich Open Source administration and development platform for PostgreSQL.
The project includes an optional Docker Swarm stack.yml
file. The stack will create a set of three Docker containers, including a local copy of PostgreSQL 11.4, Adminer, and pgAdmin 4. Having a local copy of PostgreSQL, using the official Docker image, is helpful for development and trouble-shooting RDS issues.
Use the following commands to deploy the Swarm stack.
# create stack docker swarm init docker stack deploy -c stack.yml postgres # get status of new containers docker stack ps postgres --no-trunc docker container ls
If you do not want to spin up the whole Docker Swarm stack, you could use the docker run
command to create just a single pgAdmin Docker container. The pgAdmin 4 Docker image being used is the image recommended by pgAdmin.
docker pull dpage/pgadmin4 docker run -p 81:80 \ -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" \ -e "PGADMIN_DEFAULT_PASSWORD=SuperSecret" \ -d dpage/pgadmin4 docker container ls | grep pgadmin4
Database Server Configuration
Once pgAdmin is up and running, we can configure the master and read replica database servers (RDS instances) using the connection string information from your database.ini
file or from the Amazon RDS Dashboard. Below, I am configuring the master RDS instance (server).
With that task complete, below, we see the master RDS instance and the read replica, as well as my local Docker instance configured in pgAdmin (left side of screengrab). Note how the Pagila database has been replicated automatically, from the RDS master to the read replica instance.
Building SQL Queries
Switching to the Query tab, we can run regular SQL queries against any of the database instances. Below, I have run a simple SELECT query against the master RDS instance’s Pagila database, returning the complete list of movie titles, along with their genre and release date.
The pgAdmin Query tool even includes an Explain tab to view a graphical representation of the same query, very useful for optimization. Here we see the same query, showing an analysis of the execution order. A popup window displays information about the selected object.
Query the Read Replica
To demonstrate the use of the read replica, below I’ve run the same query against the RDS read replica’s copy of the Pagila database. Any schema and data changes against the master instance are replicated to the read replica(s).
Adminer
Adminer is another good general-purpose database management tool, similar to pgAdmin, but with a few different capabilities. According to its website, with Adminer, you get a tidy user interface, rich support for multiple databases, performance, and security, all from a single PHP file. Adminer is my preferred tool for database admin tasks. Amazingly, Adminer works with MySQL, MariaDB, PostgreSQL, SQLite, MS SQL, Oracle, SimpleDB, Elasticsearch, and MongoDB.
Below, we see the Pagila database’s tables and views displayed in Adminer, along with some useful statistical information about each database object.
Similar to pgAdmin, we can also run queries, along with other common development and management tasks, from within the Adminer interface.
Import Pagila with Adminer
Another great feature of Adminer is the ability to easily import and export data. As an alternative to Python, you could import the Pagila data using Adminer’s SQL file import function. Below, you see an example of importing the Pagila database objects into the Pagila database, using the file upload function.
IDE
For writing my AWS infrastructure as code files and Python scripts, I prefer JetBrains PyCharm Professional Edition (v19.2). PyCharm, like all the JetBrains IDEs, has the ability to connect to and manage PostgreSQL database. You can write and run SQL queries, including the Pagila SQL import scripts. Microsoft Visual Studio Code is another excellent, free choice, available on multiple platforms.
Python and RDS
Although our IDE, pgAdmin, and Adminer are useful to build and test our queries, ultimately, we still need to connect to the Amazon RDS PostgreSQL instances and perform data manipulation from our application code. The GitHub repository includes a sample python script, python-scripts/query_postgres.py
. This script uses the same Python packages and connection functions as our Pagila data creation script we ran earlier. This time we will perform the same SELECT query using Python as we did previously with pgAdmin and Adminer.
cd python-scripts python3 ./query_postgres.py --instance master
With a successful database connection established, the scripts primary function, get_movies(return_count)
, performs the SELECT query. The function accepts an integer representing the desired number of movies to return from the SELECT query. A separate function within the script handles closing the database connection when the query is finished.
def get_movies(return_count=100): """ Queries for all films, by genre and year """ try: global conn with conn: with conn.cursor() as curs: curs.execute(""" SELECT title AS title, name AS genre, release_year AS released FROM film f JOIN film_category fc ON f.film_id = fc.film_id JOIN category c ON fc.category_id = c.category_id ORDER BY title LIMIT %s; """, (return_count,)) movies = [] row = curs.fetchone() while row is not None: movies.append(row) row = curs.fetchone() return movies except (psycopg2.OperationalError, psycopg2.DatabaseError) as err: print(get_movies.__name__, err) finally: close_conn() def main(): set_connection('docker') for movie in get_movies(10): print('Movie: {0}, Genre: {1}, Released: {2}' .format(movie[0], movie[1], movie[2]))
Below, we see an example of the Python script’s formatted output, limited to only the first ten movies.
Using the Read Replica
For better application performance, it may be optimal to redirect some or all of the database reads to the read replica, while leaving writes, updates, and deletes to hit the master instance. The script can be easily modified to execute the same query against the read replica rather than the master RDS instance by merely passing the desired section, ‘replica’ versus ‘master’, in the call to the set_connection(section)
function. The section parameter refers to one of the two sections in the database.ini
file. The configparser
module will handle retrieving the correct connection information.
set_connection('replica')
Cleaning Up
When you are finished with the demonstration, the easiest way to clean up all the AWS resources and stop getting billed is to delete the two CloudFormation stacks using the AWS CLI, in the following order.
aws cloudformation delete-stack \ --stack-name RDSDemoStack # wait until the above resources are completely deleted aws cloudformation delete-stack \ --stack-name RDSEventDemoStack
You should receive the following SMS notifications as the first CloudFormation stack is being deleted.
You can delete the running Docker stack using the following command. Note, you will lose all your pgAdmin server connection information, along with your local Pagila database.
docker stack rm postgres
Conclusion
In this brief post, we just scraped the surface of the many benefits and capabilities of Amazon RDS for PostgreSQL. The best way to learn PostgreSQL and the benefits of Amazon RDS is by setting up your own RDS instance, insert some sample data, and start writing queries in your favorite database client or programming language.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Building Asynchronous, Serverless Alexa Skills with AWS Lambda, DynamoDB, S3, and Node.js
Posted by Gary A. Stafford in AWS, Cloud, JavaScript, Serverless, Software Development on July 24, 2018
Introduction
In the following post, we will use the new version 2 of the Alexa Skills Kit, AWS Lambda, Amazon DynamoDB, Amazon S3, and the latest LTS version Node.js, to create an Alexa Custom Skill. According to Amazon, a custom skill allows you to define the requests the skill can handle (intents) and the words users say to invoke those requests (utterances).
If you want to compare the development of an Alexa Custom Skill with those of Google and Azure, in addition to this post, please read my previous two posts in this series, Building and Integrating LUIS-enabled Chatbots with Slack, using Azure Bot Service, Bot Builder SDK, and Cosmos DB and Building Serverless Actions for Google Assistant with Google Cloud Functions, Cloud Datastore, and Cloud Storage. All three of the article’s demonstrations are written in Node.js, all three leverage their cloud platform’s machine learning-based Natural Language Understanding services, and all three take advantage of NoSQL database and storage services available on their respective cloud platforms.
AWS Technologies
The final high-level architecture of our Alexa Custom Skill will look as follows.
Here is a brief overview of the key AWS technologies we will incorporate into our Skill’s architecture.
Alexa Skills Kit
According to Amazon, the Alexa Skills Kit (ASK) is a collection of self-service APIs, tools, documentation, and code samples that makes it possible to add skills to Alexa. The Alexa Skills Kit supports building different types of skills. Currently, Alexa skill types include Custom, Smart Home, Video, Flash Briefing, and List Skills. Each skill type makes use of a different Alexa Skill API.
AWS Serverless Platform
To create a custom skill for Alexa, you currently have the choice of using an AWS Lambda function or a web service. The AWS Lambda is part of an ecosystem of Cloud services and Developer tools, Amazon refers to as the AWS Serverless Platform. The platform’s services are designed to support the development and hosting of highly-performant, enterprise-grade serverless applications.
In this post, we will leverage three of the AWS Serverless Platform’s services, including Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), and AWS Lambda.
Node.js
AWS Lamba supports multiple programming languages, including Node.js (JavaScript), Python, Java (Java 8 compatible), and C# (.NET Core) and Go. All are excellent choices for writing modern serverless functions. For this post, we will use Node.js. According to Node.js Foundation, Node.js is an asynchronous event-driven JavaScript runtime built on Chrome’s V8 JavaScript engine.
In April 2018, AWS Lamba announced support for the Node.js 8.10 runtime, which is the current Long Term Support (LTS) version of Node.js. Node 8, also known as Project Carbon, was the first LTS version of Node to support async/await with Promises. Async/await is the new way of handling asynchronous operations in Node.js. We will make use of async/await and Promises with the custom skill.
Demonstration
To demonstrate Alexa Custom Skills we will build an informational skill that responds to the user with interesting facts about Azure¹, Microsoft’s Cloud computing platform (Alexa talking about Azure, ironic, I know). This is not an official Microsoft skill; it is only used for this demonstration and has not been published.
Source Code
All open-source code for this post can be found on GitHub. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
Important, this post and the associated source code were updated from v1.0 to v2.0 on 13 August 2018. You should clone the GitHub project again, to correspond with this revised post, if you originally cloned the project before 14 August 2018. Code changes were significant.
Objectives
This objective of the fact-based skill will be to demonstrate the following.
- Build, deploy, and test an Alexa Custom Skill using AWS Lambda and Node.js;
- Use DynamoDB to store and retrieve Alexa voice responses;
- Maintain a count of user’s questions in DynamoDB using atomic counters;
- Use Amazon S3 to store and retrieve images, used in Display Cards;
- Log Alexa Skill activities using Amazon CloudWatch Logs;
Steps to Build
Building the Azure fact skill will involve the following steps.
- Design the Alexa skill’s voice interaction model;
- Design the skill’s Display Cards for Alexa-enabled products, to enhance the voice experience;
- Create the skill’s DynamoDB table and import the responses the skill will return;
- Create an S3 bucket and upload the images used for the Display Cards;
- Write the Alexa Skill, which involves mapping the user’s spoken input to the intents your cloud-based service can handle;
- Write the Lambda function, which involves responding to the user’s utterances, by building and returning appropriate voice and display card responses, from DynamoDB and S3;
- Extend the default ASK-generated AWS IAM Role, to allow the Lambda to update DynamoDB;
- Deploy the skill;
- Test the skill;
Let’s explore each step in detail.
Voice Interaction Model
First, we must design the fact skill’s voice interaction model. We need to consider the way we want the user to interact with the skill. What is the user’s conversational journey? How do they invoke your skill? How will the user provide intent?
This skill will require two intent slot values, the fact the user is interested in (i.e. ‘global infrastructure’) and the user’s first name (i.e. ‘Susan’). We will train the skill to allow Alexa to query the user for each slot value, but also allow the user to provide either or both values in the initial intent invocation. We will also allow the user to request a random fact.
Shown below in the Alexa Skills Kit Development Console Test tab are three examples of interactions the skill is trained to understand and handle:
- The first example on the left invokes the skill with no intent (‘Alexa, load Azure Tech Facts). The user is led through a series of three questions to obtain the full intent.
- The center example is similar, however, the initial invocation contains a partial intent (‘Alexa, ask Azure Tech Facts for a fact about certifications’). Alexa must still ask for the user’s name.
- Lastly, the example on the right is a so-called ‘one-shot’ invocation (‘Alexa, ask Azure Tech Facts about Azure’s platforms for Gary’). The user’s invocation of the skill contains a complete intent, allowing Alexa to respond immediately with a fact about Azure platforms.
In all cases, our skill has the ability to continue to provide the user with additional facts if they chose, or they may cancel at any time.
We also need to design how Alexa will respond. What is the persona will Alexa assume through her words, phrases, and use of Speech Synthesis Markup Language (SSML).
User Interaction Previews
Here are a few examples of interactions with the final Alexa skill using an iPhone 8 and the Alexa App. They are intended to show the rich conversational capabilities of custom skills more so the than the display, which is pretty poor on the Alexa App as compared to the Echo Show or even Echo Spot.
Example 1: Indirect Invocation
The first example shows a basic interaction with our Alexa skill. It demonstrates an indirect invocation, a user utterance without initial intent. It also illustrates several variations of user utterances (YouTube).
Example 2: Direct Invocation
The second example of an interaction our skill demonstrates a direct invocation, in which the initial user utterance contains intent. It also demonstrates the user following up with additional requests (YouTube).
Example 3: Direct Invocation, Help, Problem
Lastly, another direct invocation demonstrates the use of the Help Intent. You also see an example of when Alexa does not understand the user’s utterance. The user is able to repeat their request, more clearly (YouTube).
Visual Interaction Model
Many Alexa-enabled devices are capable of both vocal and visual responses. Designing for a multimodal user experience is important. The instructional skill will provide vocal responses, as well as Display Cards optimized for the Amazon Echo Show. The skill contains a basic design for the Display Card shown during the initial invocation, where there is no intent uttered by the user.
The fact skill also contains a Display Card, designed to present the final Alexa response to the user’s intent. The content of the vocal and visual response is returned from DynamoDB via the Lambda function. The random Azure icons, available from Microsoft, are hosted in an S3 bucket. Each fact response is unique, as well as the icon associated with the fact.
The Display Cards will also work on other Alexa-enabled screen-based products. Shown below is the same card on an iPhone 8 using the Amazon Alexa app. This is the same app shown in the videos, above.
DynamoDB
Next, we create the DynamoDB table used to store the facts the Alexa skill will respond with when invoked by the user. DynamoDB is Amazon’s non-relational database that delivers reliable performance at any scale. DynamoDB consists of three basic components: tables, items, and attributes.
There are numerous ways to create a DynamoDB table. For simplicity, I created the AzureFacts
DynamoDB table using the AWS CLI (gist). You could also choose CloudFormation, or create the table using any of nine or more programming languages with an AWS SDK.
aws dynamodb create-table \ | |
--table-name AzureFacts \ | |
--attribute-definitions \ | |
AttributeName=Fact,AttributeType=S \ | |
--key-schema AttributeName=Fact,KeyType=HASH \ | |
--provisioned-throughput ReadCapacityUnits=3,WriteCapacityUnits=3 |
The AzureFacts
table’s schema has four key/value pair attributes per item: Fact, Response, Image, and Hits. The Fact attribute, a string, contains the name of the fact the user is seeking. The Fact attribute also serves as the table’s unique partition key. The Response attribute, a string, contains the conversational response Alexa will return. The Image attribute, a string, contains the name of the image in the S3 bucket displayed by Alexa. Lastly, the Hits attribute, a number, stores the number of user requests for a particular fact.
Importing Table Items
After the DynamoDB table is created, the pre-defined facts are imported into the empty table using AWS CLI (gist). The JSON-formatted data file, AzureFacts.json, is included with the source code on GitHub.
aws dynamodb batch-write-item \ | |
--request-items file://data/AzureFacts.json |
The resulting table should appear as follows in the AWS Management Console.
Note the imported items shown below. The Hits counts reflect the number of times each fact has been requested.
Shown below is a detailed view of a single item that was imported into the DynamoDB table.
Amazon S3 Image Bucket
Next, we create the Amazon S3 bucket, which will house the images, actually Azure icons as PNGs, returned by Alexa with each fact. Again, I used the AWS CLI for simplicity (gist).
aws s3api create-bucket \ | |
--bucket <my_bucket_name> \ | |
--region us-east-1 |
The images can be uploaded manually to the bucket through a web browser, or programmatically, using the AWS CLI or SDKs. You will need to ensure the images are made public so they can be displayed by Alexa.
Alexa Skill
Next, we create the actual Alexa custom skill. I have used version 2 of the Alexa Skills Kit (ASK) Software Development Kit (SDK) for Node.js and the new ASK Command Line Interface (ASK CLI) to create the skill. The ASK SDK v2 for Node.js was recently released in April 2018. If you have previously written Alexa skills using version 1 of the Node.js SDK, the creation of a new project and the format of the Lambda Node.js code is somewhat different. I strongly suggest reviewing the example skills provided by Amazon on GitHub.
With version 1, I would have likely used the Alexa Skills Kit Development Console to develop and deploy the skill, and separate IDE, like JetBrains WebStorm, to write the Lambda. The JSON-format skill would live in the Alexa Skills Kit Development Console, and my Lambda in source control. I would have used AWS Serverless Application Model (AWS SAM) or Claudia.js to handle the deployment of Lambda functions.
With version 2 of ASK, you can easily create and manage the Alexa skill’s JSON-formatted code, as well as the Lambda, all from the command-line and a single IDE or text editor. All components that comprise the skill can be kept together in source control. I now only use the Alexa Skills Kit Development Console to preview my deployed skill and for testing. I am not going to go into detail about creating a new project using the ASK CLI, I suggest reviewing Amazon’s instructional guides.
Below, I have initiated a new AWS profile for the Alexa skill using the ask init
command.
There are three main parts to the new skill project created by the ASK CLI: the skill’s manifest (skill.json), model(s) (en-US.json), and API endpoint, the Lambda (index.js). The skill’s manifest, skill.json, contains information (metadata) about the skill. This is the same information you find in the Distribution tab of the Alexa Skills Kit Development Console. The manifest includes publishing information, example phrases to invoke the skill, the skill’s category, distribution locales, privacy information, and the location of the skill’s API endpoint, the Lambda. An end-user would most commonly see this information in Amazon Alexa app when adding skills to their Alexa-enabled devices.
Next, the skill’s model, en-US.json, is located the models sub-directory. This file defines the skill’s custom interaction model, it contains the skill’s interaction model written in JSON, which includes the invocation name, intents, standard and custom slots, sample utterances, slot values, and synonyms of those values. This is the same information you would find in the Build tab of the Alexa Skills Kit Development Console. Amazon has an excellent guide to creating your custom skill’s interaction model.
Intents and Intent Slots
The skill’s custom interaction model contains the AzureFactsIntent
intent, along with the boilerplate Cancel, Help and Stop intents. The AzureFactsIntent
intent contains two intent slots, myName
and myQuestion
. The myName
intent slot is a standard AMAZON.US_FIRST_NAME slot type. According to Amazon, this slot type understands thousands of popular first names commonly used by speakers in the United States. Shown below, I have included a short list of sample utterances in the intent model, which helps improve voice recognition for Alexa (gist).
{ | |
"name": "AzureFactsIntent", | |
"slots": [{ | |
"name": "myName", | |
"type": "AMAZON.US_FIRST_NAME", | |
"samples": [ | |
"{myName}", | |
"my name is {myName}", | |
"my name's {myName}", | |
"name is {myName}", | |
"the name is {myName}", | |
"name's {myName}", | |
"they call me {myName}" | |
] | |
}] | |
} |
Custom Slot Types and Entities
The myQuestion
intent slot is a custom slot type. According to Amazon, a custom slot type defines a list of representative values for the slot. The myQuestion
slot contains all the available facts the custom instructional skill understands and can retrieve from DynamoDB. Like myName
, the user can provide the fact intent in various ways (gist).