Posts Tagged AWS

Kubernetes-based Microservice Observability with Istio Service Mesh: Part 2 of 2

In part two of this two-part post, we will continue to explore the set of popular open-source observability tools that are easily integrated with the Istio service mesh. While these tools are not a part of Istio, they are essential to making the most of Istio’s observability features. The tools include Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We will round out the toolset with the addition of Fluent Bit for log processing and aggregation. We will observe a distributed, microservices-based reference application platform deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster using these tools. The platform, running on EKS, will use Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.

Kiali Management Console showing reference application platform

Observability

The O’Reilly book, Distributed Systems Observability, by Cindy Sridharan, describes The Three Pillars of Observability in Chapter 4: “Logs, metrics, and traces are often known as the three pillars of observability. While plainly having access to logs, metrics, and traces doesn’t necessarily make systems more observable, these are powerful tools that, if understood well, can unlock the ability to build better systems.

Reference Application Platform

To demonstrate Istio’s observability tools, we deployed a reference application platform to EKS on AWS. I have developed the application platform to demonstrate different Kubernetes platforms, such as EKS, GKE, AKS, and concepts such as service mesh, API management, observability, DevOps, and Chaos Engineering. The platform comprises a backend containing eight Go-based microservices, labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, four MongoDB databases, and one RabbitMQ message queue. The platform and all its source code are open-sourced on GitHub.

Reference Application Platform’s Angular-based UI

The reference application platform is designed to generate HTTP-based service-to-service, TCP-based service-to-database, and TCP-based service-to-queue-to-service IPC (inter-process communication). For example, Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message to a RabbitMQ queue, which Service F consumes message off on the RabbitMQ queue, and writes to MongoDB, and so on. The platform’s distributed service communications can be observed using Istio’s observability tools when the system is deployed to a Kubernetes cluster running the Istio service mesh.

High-level architecture of Reference Application Platform

Part Two

In part one of the post, we configured and deployed the reference application platform to an Amazon EKS development-grade cluster on AWS. The reference application, running on EKS, communicates with two external systems, Amazon DocumentDB (with MongoDB compatibility) and Amazon MQ.

Deployed Reference Application Platform as seen from Argo CD

In part two of the post, we will explore each of the observability tools we installed in greater detail. We will understand how each tool contributes to the three pillars of observability: logs, metrics, and traces.

Logs, metrics, and traces are often known as the three pillars of observability.
 — Cindy Sridharan

Pillar One: Logs

To paraphrase Jay Kreps on the LinkedIn Engineering Blog, a log is an append-only, totally-ordered sequence of records ordered by time. The ordering of records defines a notion of “time” since entries to the left are defined to be older than entries to the right. Logs are a historical record of events that happened in the past. Logs have been around almost as long as computers and are at the heart of many distributed data systems and real-time application architectures.

Go-based Microservice Logging

An effective logging strategy starts with what you log, when you log, and how you log. As part of our logging strategy, the eight Go-based microservices use Logrus, a popular structured logger for Go first released in 2014. The microservices also implement Banzai Cloud’s logrus-runtime-formatter. There is an excellent article on the formatter, Golang runtime Logrus Formatter. These two logging packages give us greater control over what you log, when you log, and how you log information about our microservices. The recommended configuration of the packages is minimal.

func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}

Logrus provides several advantages over Go’s simple logging package, log. For example, log entries are not only for Fatal errors, nor should all verbose log entries be output in a Production environment. The post’s microservices are taking advantage of Logrus’ ability to log at seven levels: Trace, Debug, Info, Warning, Error, Fatal, and Panic. I have also variabilized the log level, allowing it to be easily changed in the Kubernetes Deployment resource at deploy-time.

The microservices also take advantage of Banzai Cloud’s logrus-runtime-formatter. The Banzai formatter automatically tags log messages with runtime and stack information, including function name and line number; extremely helpful when troubleshooting. I am also using Logrus’ JSON formatter.

Service A log entries in CloudWatch Insights

In 2020, Logus entered maintenance mode. The author, Simon Eskildsen (Principal Engineer at Shopify), stated they will not be introducing new features. This does not mean Logrus is dead. With over 18,000 GitHub Stars, Logrus will continue to be maintained for security, bug fixes, and performance. The author states that many fantastic alternatives to Logus now exist, such as Zerolog, Zap, and Apex.

Client-side Angular UI Logging

Likewise, I have enhanced the logging of the Angular UI using NGX Logger. NGX Logger is a simple logging module for angular (currently supports Angular 6+). It allows “pretty print” to the console and allows log messages to be POSTed to a URL for server-side logging. For this demo, the UI will only log to the web browser’s console. Similar to Logrus, NGX Logger supports multiple log levels: Trace, Debug, Info, Warning, Error, Fatal, and Off. However, instead of just outputting messages, NGX Logger allows us to output properly formatted log entries to the browser’s console.

The level of logs output is configured to be dependent on the environment, Production or not Production. Below is an example of the log output from the Angular UI in Chrome. Since the UI’s Docker Image was built with the Production configuration, the log level is set to INFO. You would not want to expose potentially sensitive information in verbose log output to our end-users in Production.

Controlling logging levels is accomplished by adding the following ternary operator to the app.module.ts file.

imports: [
BrowserModule,
HttpClientModule,
FormsModule,
LoggerModule.forRoot({
level: !environment.production ?
NgxLoggerLevel.DEBUG : NgxLoggerLevel.INFO,
serverLogLevel: NgxLoggerLevel.INFO
})
],

Platform Logs

Based on the platform built, configured, and deployed in , you now have access logs from multiple sources.

  1. Amazon DocumentDB: Amazon CloudWatch Audit and Profiler logs;
  2. Amazon MQ: Amazon CloudWatch logs;
  3. Amazon EKS: API server, Audit, Authenticator, Controller manager, and Scheduler CloudWatch logs;
  4. Kubernetes Dashboard: Individual EKS Pod and Replica Set logs;
  5. Kiali: Individual EKS Pod and Container logs;
  6. Fluent Bit: EKS performance, host, dataplane, and application CloudWatch logs;

Fluent Bit

According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open-source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.

With Fluent Bit, deployed in part one, the EKS cluster’s performance, host, dataplane, and application logs will also be available in Amazon CloudWatch.

Within the application log groups, you have access to the individual log streams for each reference application’s components.

Within each CloudWatch log stream, you can view individual log entries.

CloudWatch Logs Insights enables you to interactively search and analyze your log data in Amazon CloudWatch Logs. You can perform queries to help you more efficiently and effectively respond to operational issues. If an issue occurs, you can use CloudWatch Logs Insights to identify potential causes and validate deployed fixes.

CloudWatch Logs Insights supports CloudWatch Logs Insights query syntax, a query language you can use to perform queries on your log groups. Each query can include one or more query commands separated by Unix-style pipe characters (|). For example:

fields @timestamp, @message
| filter kubernetes.container_name = "service-f"
and @message like "error"
| sort @timestamp desc
| limit 20

Pillar Two: Metrics

For metrics, we will examine CloudWatch Container Insights, Prometheus, and Grafana. Prometheus and Grafana are industry-leading tools you installed as part of the Istio deployment.

Prometheus

Prometheus is an open-source systems monitoring and alerting toolkit originally built at SoundCloud circa 2012. Prometheus joined the Cloud Native Computing Foundation (CNCF) in 2016 as the second project hosted after Kubernetes.

According to Istio, the Prometheus addon is a Prometheus server that comes preconfigured to scrape Istio endpoints to collect metrics. You can use Prometheus with Istio to record metrics that track the health of Istio and applications within the service mesh. You can visualize metrics using tools like Grafana and Kiali. The Istio Prometheus addon is intended for demonstration only and is not tuned for performance or security.

The istioctl dashboardcommand provides access to all of the Istio web UIs. With the EKS cluster running, Istio installed, and the reference application platform deployed, access Prometheus using the istioctl dashboard prometheus command from your terminal. You must be logged into AWS from your terminal to connect to Prometheus successfully. If you are not logged in to AWS, you will often see the following error: Error: not able to locate <tool_name> pod: Unauthorized. Since we used the non-production demonstration versions of the Istio Addons, there is no authentication and authorization required to access Prometheus.

According to Prometheus, users select and aggregate time-series data in real-time using a functional query language called PromQL (Prometheus Query Language). The result of an expression can either be shown as a graph, viewed as tabular data in Prometheus’s expression browser, or consumed by external systems through Prometheus’ HTTP API. The expression browser includes a drop-down menu with all available metrics as a starting point for building queries. Shown below are a few PromQL examples that were developed as part of writing this post.

istio_agent_go_info{kubernetes_namespace="dev"}
istio_build{kubernetes_namespace="dev"}
up{alpha_eksctl_io_cluster_name="istio-observe-demo", job="kubernetes-nodes"}
sum by (pod) (rate(container_network_transmit_packets_total{stack="reference-app",namespace="dev",pod=~"service-.*"}[5m]))
sum by (instance) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code="200"})
sum by (response_code) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code!~"200|0"})

Prometheus APIs

Prometheus has both an HTTP API and a Management API. There are many useful endpoints in addition to the Prometheus UI, available at http://localhost:9090/graph. For example, the Prometheus HTTP API endpoint that lists all the command-line configuration flags is available at http://localhost:9090/api/v1/status/flags. The endpoint that lists all the available Prometheus metrics is available at http://localhost:9090/api/v1/label/__name__/values; a total of 951 metrics in this demonstration!

The Prometheus endpoint that lists many available metrics with HELP and TYPE to explain their function is found at http://localhost:9090/metrics.

Understanding Metrics

In addition to these endpoints, the standard service level metrics exported by Istio and available via Prometheus are found in the Istio Standard Metrics documentation. An explanation of many of the metrics available via Prometheus are also found in the cAdvisor README on their GitHub site. As mentioned in this AWS Blog Post, the cAdvisor metrics are also available from the command line using the following commands:

export NODE=$(kubectl get nodes | sed -n '2 p') | awk {'print $1'}
kubectl get --raw "/api/v1/nodes/${NODE}/proxy/metrics/cadvisor"

Observing Metrics

Below is an example graph of the backend microservice containers deployed to EKS. The graph PromQL expression returns the amount of working set memory, including recently accessed memory, dirty memory, and kernel memory (container_memory_working_set_bytes), summed by pod, in megabytes (MB). There was no load on the services during the period displayed.

sum by (pod) (container_memory_working_set_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2)

The container_memory_working_set_bytes metric is the same metric used by the kubectl top command (not container_memory_usage_bytes).

> kubectl top pod -n dev --containers=true --use-protocol-buffer
POD                          NAME          CPU(cores)   MEMORY(bytes)
service-a-546fbd558d-28jlm service-a 1m 6Mi
service-a-546fbd558d-2lcsg service-a 1m 6Mi
service-b-545c85df9-dl9h8 service-b 1m 6Mi
service-b-545c85df9-q99xm service-b 1m 5Mi
service-c-58996574-58wd8 service-c 1m 7Mi
service-c-58996574-6q7n4 service-c 1m 7Mi
service-d-867796bb47-87ps5 service-d 1m 6Mi
service-d-867796bb47-fh6wl service-d 1m 6Mi
...

In another Prometheus example, the PromQL query expression returns the per-second rate of CPU resources measured in CPU units (1 CPU = 1 AWS vCPU), as measured over the last 5 minutes, per time series in the range vector, summed by the pod. During this period, the backend services were under a consistent, simulated load of 25 concurrent users using hey. The four Service D pods were consuming the most CPU units during this time period.

sum by (pod) (rate(container_cpu_usage_seconds_total{image=~"registry.hub.docker.com/garystafford/.*"}[5m])) * 1000

The container_cpu_usage_seconds_total metric is the same metric used by the kubectl top command. The above PromQL expression multiplies the query results by 1,000 to match the results from kubectl top, shown below.

> kubectl top pod -n dev --containers=true --use-protocol-buffer
POD                          NAME          CPU(cores)   MEMORY(bytes)
service-a-546fbd558d-28jlm service-a 25m 9Mi
service-a-546fbd558d-2lcsg service-a 27m 8Mi
service-b-545c85df9-dl9h8 service-b 29m 11Mi
service-b-545c85df9-q99xm service-b 23m 8Mi
service-c-58996574-c8hkn service-c 62m 9Mi
service-c-58996574-kx895 service-c 55m 8Mi
service-d-867796bb47-87ps5 service-d 285m 12Mi
service-d-867796bb47-9ln7p service-d 226m 11Mi
...

Limits

Prometheus also exposes container resource limits. For example, the memory limits set on the reference platform’s backend services, displayed in megabytes (MB), using the container_spec_memory_limit_bytes metric. When viewed alongside the real-time resources consumed by the services, these metrics are useful to properly configure and monitor Kubernetes management features such as the Horizontal Pod Autoscaler.

sum by (container) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2) / count by (container) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"})

Or, memory limits by Pod:

sum by (pod) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2)

Cluster Metrics

Prometheus also contains metrics about Istio components, Kubernetes components, and the EKS cluster. For example, the total memory in gigabytes (GB) of each m5.large EC2 worker nodes in the istio-observe-demo EKS cluster’s managed-ng-1 Managed Node Group.

machine_memory_bytes{alpha_eksctl_io_cluster_name="istio-observe-demo", alpha_eksctl_io_nodegroup_name="managed-ng-1"} / (1024^3)

For total physical cores, use the machine_cpu_physical_core metric, and for vCPU cores use the machine_cpu_cores metric.

Grafana

Grafana describes itself as the leading open-source software for time-series analytics. According to Grafana Labs, Grafana allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. You can easily create, explore, and share visually rich, data-driven dashboards. Grafana also allows users to visually define alert rules for their most important metrics. Grafana will continuously evaluate rules and can send notifications.

If you deployed Grafana using the Istio addons process demonstrated in part one of the post, access Grafana similar to the other tools:

istioctl dashboard grafana

According to Istio, Grafana is an open-source monitoring solution used to configure dashboards for Istio. You can use Grafana to monitor the health of Istio and applications within the service mesh. While you can build your own dashboards, Istio offers a set of preconfigured dashboards for all of the most important metrics for the mesh and the control plane. The preconfigured dashboards use Prometheus as the data source.

Below is an example of the Istio Mesh Dashboard, filtered to show the eight backend services workloads running in the dev namespace. During this period, the backend services were under a consistent simulated load of approximately 20 concurrent users using hey. You can observe the p50, p90, and p99 latency of requests to these workloads.

Dashboards are built from Panels, the basic visualization building blocks in Grafana. Each panel has a query editor specific to the data source (Prometheus in this case) selected. The query editor allows you to write your (PromQL) query. Below is the PromQL expression query responsible for the p50 latency Panel displayed in the Istio Mesh Dashboard.

 label_join((histogram_quantile(0.50, sum(rate(istio_request_duration_milliseconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)) / 1000) or histogram_quantile(0.50, sum(rate(istio_request_duration_seconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)), "destination_workload_var", ".", "destination_workload", "destination_workload_namespace")

Below is an example of the Outbound Workloads section of the Istio Workload Dashboard. The complete dashboard contains three sections: General, Inbound Workloads, and Outbound Workloads. Here we have filtered the on reference platform’s backend services in the dev namespace.

Here is a different view of the Istio Workload Dashboard, the dashboard’s Inbound Workloads section filtered to a single workload, Service A, the backend’s edge service. Service A accepts incoming traffic from the Istio Ingress Gateway as shown in the dashboard’s panels.

Grafana provides the ability to Explore a Panel. Explore strips away the dashboard and panel options so that you can focus on the query. It helps you iterate until you have a working query and then think about building a dashboard. Below is an example of the Panel showing the egress TCP traffic, based on the istio_tcp_sent_bytes_total metric, for Service F. Service F consumes messages off on the RabbitMQ queue (Amazon MQ) and writes messages to MongoDB (DocumentDB).

You can monitor the resource usage of Istio with the Performance Dashboard.

Additional Dashboards

Grafana provides a site containing official and community-built dashboards, including the above-mentioned Istio dashboards. Importing dashboards into your Grafana instance is as simple as copying the dashboard URL or the ID provided from the Grafana dashboard site and pasting it into the dashboard import option of your Grafana instance. Be aware that not every Kubernetes dashboard in Grafan’s site is compatible with your specific version of Kubernetes, Istio, or EKS, nor relies on Prometheus as a data source. As a result, you might have to test and tweak imported dashboards to get them working.

Below is an example of an imported community dashboard, Kubernetes cluster monitoring (via Prometheus) by Instrumentisto Team (dashboard ID 315).

Alerting

An effective observability strategy must include more than just the ability to visualize results. An effective strategy must also detect anomalies and notify (alert) the appropriate resources or directly resolve incidents. Grafana, like Prometheus, is capable of alerting and notification. You visually define alert rules for your critical metrics. Grafana will continuously evaluate metrics against the rules and send notifications when pre-defined thresholds are breached.

Prometheus supports multiple popular notification channels, including PagerDuty, HipChat, Email, Kafka, and Slack. Below is an example of a Prometheus notification channel that sends alert notifications to a Slack support channel.

Below is an example of an alert based on an arbitrarily high CPU usage of 300 milliCPUs (m). When the CPU usage of a single pod goes above that value for more than 3 minutes, an alert is sent. The high CPU usage could be caused by the Horizontal Pod Autoscaler not functioning, or the HPA has reached its maxReplicas limit, or there are not enough resources available within the cluster to schedule additional pods.

Triggered by the alert, Prometheus sends detailed notifications to the designated Slack channel.

Amazon CloudWatch Container Insights

Lastly in the category of Metrics, Amazon CloudWatch Container Insights collects, aggregates, and summarizes metrics and logs from your containerized applications and microservices. CloudWatch alarms can be set on metrics that Container Insights collects. Container Insights is available for Amazon Elastic Container Service (Amazon ECS) including Fargate, Amazon EKS, and Kubernetes platforms on Amazon EC2.

In Amazon EKS, Container Insights uses a containerized version of the CloudWatch agent to discover all running containers in a cluster. It then collects performance data at every layer of the performance stack. Container Insights collects data as performance log events using the embedded metric format. These performance log events are entries that use a structured JSON schema that enables high-cardinality data to be ingested and stored at scale.

In part one of the post, we also installed CloudWatch Container Insights monitoring for Prometheus, which automates the discovery of Prometheus metrics from containerized systems and workloads.

Below is an example of a basic Performance Monitoring CloudWatch Container Insights Dashboard. The dashboard is filtered to the dev namespace of the EKS cluster, where the reference application platform is running. During this period, the backend services were put under a simulated load using hey. As the load on the application increases, observe the Number of Pods increases from 19 to 34 pods, based on the Deployment resources and HPA configurations. There is also an Alert, shown on the right of the screen. An alarm was triggered for an arbitrarily high level of network transmission activity.

Next is an example of Container Insights’ Container Map view in Memory mode. You see a visual representation of the dev namespace, with each of the backend service’s Service and Deployment resources shown.

There is a warning icon indicating an Alarm on the cluster was triggered.

Lastly, CloudWatch Insights allows you to jump from the CloudWatch Insights to the CloudWatch Log Insights console. CloudWatch Insights will also write the CloudWatch Insights query for you. Below, we went from the Service D container metrics view in the CloudWatch Insights Performance Monitoring console directly to the CloudWatch Log Insights console with a query, ready to run.

Pillar 3: Traces

According to the Open Tracing website, distributed tracing, also called distributed request tracing, is used to profile and monitor applications, especially those built using a microservices architecture. Distributed tracing helps pinpoint where failures occur and what causes poor performance.

According to Istio, header propagation may be accomplished through client libraries, such as Zipkin or Jaeger. It may also be accomplished manually, referred to as trace context propagation, documented in the Distributed Tracing Task. Istio proxies can automatically send spans. Applications need to propagate the appropriate HTTP headers so that when the proxies send span information, the spans can be correlated correctly into a single trace. To accomplish this, an application needs to collect and propagate the following headers from the incoming request to any outgoing requests.

  • x-request-id
  • x-b3-traceid
  • x-b3-spanid
  • x-b3-parentspanid
  • x-b3-sampled
  • x-b3-flags
  • x-ot-span-context

The x-b3 headers originated as part of the Zipkin project. The B3 portion of the header is named for the original name of Zipkin, BigBrotherBird. Passing these headers across service calls is known as B3 propagation. According to Zipkin, these attributes are propagated in-process and eventually downstream (often via HTTP headers) to ensure all activity originating from the same root are collected together.

To demonstrate distributed tracing with Jaeger and Zipkin, Service A, Service B, and Service E have been modified to pass the b3 headers. These are the three services that make HTTP requests to other upstream services. The following code has been added to propagate the headers from one service to the next. The Istio sidecar proxy (Envoy) generates the first headers. It is critical to only propagate the headers that are present in the downstream request and have a value, as the code below does. Propagating an empty header will break the distributed tracing.

incomingHeaders := []string{
"x-b3-flags",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-spanid",
"x-b3-traceid",
"x-ot-span-context",
"x-request-id",
}
for _, header := range incomingHeaders {
if r.Header.Get(header) != "" {
req.Header.Add(header, r.Header.Get(header))
}
}

Below, the highlighted section of the response payload from a call to Service A’s /api/request-echo endpoint reveals the b3 headers originating from the Istio proxy and passed to Service A.

Jaeger

According to their website, Jaeger, inspired by Dapper and OpenZipkin, is a distributed tracing system released as open source by Uber Technologies. Jaeger is used for monitoring and troubleshooting microservices-based distributed systems, including distributed context propagation, distributed transaction monitoring, root cause analysis, service dependency analysis, and performance and latency optimization. The Jaeger website contains a helpful overview of Jaeger’s architecture and general tracing-related terminology.

If you deployed Jaeger using the Istio addons process demonstrated in part one of the post, access Jaeger similar to the other tools:

istioctl dashboard jaeger

Below is an example of the Jaeger UI’s Search view, displaying the results of a search for the Istio Ingress Gateway service over a period of time. We see a timeline of traces across the top with a list of trace results below. As discussed on the Jaeger website, a trace is composed of spans. A span represents a logical unit of work in Jaeger that has an operation name. A trace is an execution path through the system and can be thought of as a directed acyclic graph (DAG) of spans. If you have worked with systems like Apache Spark, you are probably already familiar with the concept of DAGs.

Below is a detailed view of a single trace in Jaeger’s Trace Timeline mode. The 14 spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 160 ms. The root span in the trace is the Istio Ingress Gateway. The Angular UI, loaded in the end user’s web browser, calls Service A via the Istio Ingress Gateway. From there, we see the expected flow of our service-to-service IPC. Service A calls Services B and Service C. Service B calls Service E, which calls Service G and Service H.

In this demonstration, traces are not instrumented to span the RabbitMQ message queue nor MongoDB. This means you would not see a trace that includes a call from Service D to Service F via the RabbitMQ.

The visualization of the trace’s timeline demonstrates the synchronous nature of the reference platform’s service-to-service IPC instead of the asynchronous nature of the decoupled communications using the RabbitMQ messaging queue. Note how Service A waits for each service in its call chain to respond before returning its response to the requester.

Within Jaeger’s Trace Timeline view, you have the ability to drill into a single span, which contains additional metadata. The span’s metadata includes the API endpoint URL being called, HTTP method, response status, and several other headers.

Jaeger also has an experimental Trace Graph mode, which displays a graph view of the same trace.

Jaeger also includes a Compare Trace feature and two Dependencies views: Force-Directed Graph and DAG. I find both views rather primitive compared to Kiali. Lacking access to Kiali, the views are marginally useful as a dependency graph.

Zipkin

Zipkin is a distributed tracing system, which helps gather timing data needed to troubleshoot latency problems in service architectures. According to a 2012 post on Twitter’s Engineering Blog, Zipkin started as a project during Twitter’s first Hack Week. During that week, they implemented a basic version of the Google Dapper paper for Thrift.

Zipkin and Jaeger are very similar in terms of capabilities. I have chosen to focus on Jaeger in this post as I prefer it over Zipkin. If you want to try Zipkin instead of Jaeger, you can use the following commands to remove Jaeger and install Zipkin from the Istio addons extras directory. In part one of the post, we did not install Zipkin by default when we deployed the Istio addons. Be aware that running both tools at the same time in the same Kubernetes cluster will cause unpredictable tracing results.

kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/jaeger.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/extras/zipkin.yaml

Access Zipkin similar to the other observability tools:

istioctl dashboard zipkin

Below is an example of a distributed trace visualized in Zipkin’s UI, containing 14 spans. This is very similar to the trace visualized in Jaeger, shown above. The spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 154 ms.

Zipkin can also visualize a dependency graph based on the distributed trace. Below is an example of a traffic simulation over a two-minute period, showing network traffic flowing between the reference platform’s components, illustrated as a dependency graph.

Kiali: Microservice Observability

According to their website, Kiali is a management console for an Istio-based service mesh. It provides dashboards, observability, and lets you operate your mesh with robust configuration and validation capabilities. It shows the structure of a service mesh by inferring traffic topology and displaying the mesh’s health. Kiali provides detailed metrics, powerful validation, Grafana access, and strong integration for distributed tracing with Jaeger.

If you deployed Kaili using the Istio addons process demonstrated in part one of the post, access Kiali similar to the other tools:

istioctl dashboard kaili

For improved security, I optionally chose to install the latest version of Kaili using the customizable install mentioned in Istio’s documentation. Using Kiali’s Install via Kiali Server Helm Chart option adds token-based authentication, similar to the Kubernetes Dashboard.

Logging into Kiali, we see the Overview tab, which provides a global view of all namespaces within the Istio service mesh and the number of applications within each namespace.

The Graph tab in the Kiali UI represents the components running in the Istio service mesh. Below, filtering on the cluster’s dev Namespace, we can observe that Kiali has mapped 8 applications (Workloads), 10 services, and 22 edges (a graph term). Specifically, we see the Istio Ingres Proxy at the edge of the service mesh, the Angular UI and eight backend services all with their respective Envoy proxy sidecars that are taking traffic (Service F did not take any direct traffic from another service in this example), the external DocumentDB egress point, and the external Amazon MQ egress point. Finally, note how service-to-service traffic flows, with Istio, from the service to its sidecar proxy, to the other service’s sidecar proxy, and finally to the service.

Below is a similar view of the service mesh, but this time, there are failures between the Istio Ingress Gateway and Service A, shown in red. We can also observe overall metrics for the HTTP traffic, such as the request per second inbound and outbound, total requests, success and error rates, and HTTP status codes.

Kiali allows you to zoom in and focus on a single component in the graph and its individual metrics.

Kiali can also display average request times and other metrics for each edge in the graph (communication between two components). Kaili can even show those metrics over a given period of time, using Kiali’s Replay feature, shown below.

Focusing on the external DocumentDB cluster, Kiali also allows us to view TCP traffic between the four services within the service mesh that connect to the external cluster.

The Applications tab lists all the applications, their namespace, and labels.

You can drill into an individual component on both the Applications and Workloads tabs and view additional details. Details include the overall health, Pods, and Istio Config status. Below is an overview of the Service A workload in the dev Namespace.

The Workloads detailed view also includes inbound and outbound metrics. Below is an example of the outbound request volume, duration, throughput, and size metrics, for Service A in the dev Namespace.

Kiali also gives you access to the individual pod’s container logs. Although log access is not as user-friendly as other log sources discussed previously, having logs available alongside metrics (integration with Grafana), traces (integration with Jaeger), and mesh visualization, all in Kiali, can be very effective as a single source for observability.

Kiali also has an Istio Config tab. The Istio Config tab displays a list of all of the available Istio configuration objects that exist in the user’s environment.

You can use Kiali to configure and manage the Istio service mesh and its installed resources. Using Kiali, you can actually modify the deployed resources, similar to using the kubectl edit command.

Oftentimes, I find Kiali to be my first stop when troubleshooting platform issues. Once I identify the specific components or communication paths having issues, I can query the CloudWatch logs and Prometheus metrics through the Grafana dashboard.

Conclusion

In this two-part post, we explored a set of popular open-source observability tools, easily integrated with the Istio service mesh. These tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset with the addition of Fluent Bit for log processing and forwarding to Amazon CloudWatch Container Insights. Using these tools, we successfully observed a microservices-based, distributed reference application platform deployed to Amazon EKS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

1 Comment

Kubernetes-based Microservice Observability with Istio Service Mesh: Part 1 of 2

This two-part post explores a set of popular open-source observability tools that are easily integrated with the Istio service mesh. While these tools are not a part of Istio, they are essential to making the most of Istio’s observability features. The tools include Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We will round out the toolset with the addition of Fluent Bit for log processing and aggregation. We will observe a distributed, microservices-based reference application platform deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster using these tools. The platform, running on EKS, will use Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.

Kiali Management Console showing reference application platform

Observability

Similar to quantum computing, big data, artificial intelligence, machine learning, and 5G, observability is currently a hot buzzword in the IT industry. According to Wikipedia, observability is a measure of how well the internal states of a system can be inferred from its external outputs. The O’Reilly book, Distributed Systems Observability, by Cindy Sridharan, describes The Three Pillars of Observability in Chapter 4: “Logs, metrics, and traces are often known as the three pillars of observability. While plainly having access to logs, metrics, and traces doesn’t necessarily make systems more observable, these are powerful tools that, if understood well, can unlock the ability to build better systems.

Logs, metrics, and traces are often known as the three pillars of observability.

Cindy Sridharan

Honeycomb is a developer of observability tools for production systems. The honeycomb.io site includes articles, blog posts, whitepapers, and podcasts on observability. According to Honeycomb, “Observability is achieved when a system is understandable — which is difficult with complex systems, where most problems are the convergence of many things failing at once.

As modern distributed systems grow ever more complex, the ability to observe those systems demands equally modern tooling designed with this level of complexity in mind. Traditional logging and monitoring tools struggle with today’s polyglot, distributed, event-driven, ephemeral, containerized and serverless application environments. Tools like the Istio service mesh attempt to solve the observability challenge by offering easy integration with several popular open-source telemetry tools. Istio’s integrations include Jaeger for distributed tracing, Kiali for Istio service mesh-based microservice visualization, and Prometheus and Grafana for metric collection, monitoring, and alerting. Combined with cloud-native monitoring and logging tools such as Fluent Bit and Amazon CloudWatch Container Insights, we have a complete observability platform for modern distributed applications running on Amazon Elastic Kubernetes Service (Amazon EKS).

Traditional logging and monitoring tools struggle with today’s polyglot, distributed, event-driven, ephemeral, containerized and serverless application environments.

Gary Stafford

Reference Application Platform

To demonstrate Istio’s observability tools, we will deploy a reference application platform, written in Go and TypeScript with Angular, to EKS on AWS. The reference application platform was developed to demonstrate different Kubernetes platforms, such as EKS, GKE, and AKS, and concepts such as service mesh, API management, observability, DevOps, and Chaos Engineering. The platform is currently comprised of a backend containing eight Go-based microservices, labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, four MongoDB databases, and one RabbitMQ message queue for event-based communications. The platform and all its source code are open-sourced on GitHub.

Reference Application Platform’s Angular-based UI

The reference application platform is designed to generate HTTP-based service-to-service, TCP-based service-to-database, and TCP-based service-to-queue-to-service IPC (inter-process communication). Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message on a RabbitMQ queue, which Service F consumes and writes to MongoDB, and so on. Distributed service communications can be observed using Istio’s observability tools when the system is deployed to a Kubernetes cluster running the Istio service mesh.

High-level architecture of Reference Application Platform

Service Responses

Each Go microservice contains a /greeting, /health, and /metrics endpoint. The service’s /health endpoint is used to configure Kubernetes Liveness, Readiness, and Startup Probes. The /metrics endpoint exposes metrics that Prometheus scraps. Lastly, upstream services respond to requests from downstream services when calling their /greeting endpoint by returning a small informational JSON payload — a greeting.

{
"id": "1f077127-2f9f-4a90-ad88-da52327c2620",
"service": "Service C",
"message": "Konnichiwa (こんにちは), from Service C!",
"created": "2021-06-04T04:34:02.901726709Z",
"hostname": "service-c-6d5cc8fdfd-stsq9"
}

The responses are aggregated across the service call chain, resulting in an array of service responses being returned to the edge service, Service A, and subsequently, the platform’s UI running in the end user’s web browser.

[
{
"id": "a9afab6a-3e2a-41a6-aec7-7257d2904076",
"service": "Service D",
"message": "Shalom (שָׁלוֹם), from Service D!",
"created": "2021-06-04T14:28:32.695151047Z",
"hostname": "service-d-565c775894-vdsjx"
},
{
"id": "6d4cc38a-b069-482c-ace5-65f0c2d82713",
"service": "Service G",
"message": "Ahlan (أهلا), from Service G!",
"created": "2021-06-04T14:28:32.814550521Z",
"hostname": "service-g-5b846ff479-znpcb"
},
{
"id": "988757e3-29d2-4f53-87bf-e4ff6fbbb105",
"service": "Service H",
"message": "Nǐ hǎo (你好), from Service H!",
"created": "2021-06-04T14:28:32.947406463Z",
"hostname": "service-h-76cb7c8d66-lkr26"
},
{
"id": "966b0bfa-0b63-4e21-96a1-22a76e78f9cd",
"service": "Service E",
"message": "Bonjour, from Service E!",
"created": "2021-06-04T14:28:33.007881464Z",
"hostname": "service-e-594d4754fc-pr7tc"
},
{
"id": "c612a228-704f-4562-90c5-33357b12ff8d",
"service": "Service B",
"message": "Namasté (नमस्ते), from Service B!",
"created": "2021-06-04T14:28:33.015985983Z",
"hostname": "service-b-697b78cf54-4lk8s"
},
{
"id": "b621bd8a-02ee-4f9b-ac1a-7d91ddad85f5",
"service": "Service C",
"message": "Konnichiwa (こんにちは), from Service C!",
"created": "2021-06-04T14:28:33.042001406Z",
"hostname": "service-c-7fd4dd5947-5wcgs"
},
{
"id": "52eac1fa-4d0c-42b4-984b-b65e70afd98a",
"service": "Service A",
"message": "Hello, from Service A!",
"created": "2021-06-04T14:28:33.093380628Z",
"hostname": "service-a-6f776d798f-5l5dz"
}
]

CORS

The platform’s backend edge service, Service A, is configured for Cross-Origin Resource Sharing (CORS) using the access-control-allow-origin response header. The CORS configuration allows the Angular UI, running in the end user’s web browser, to call Service A’s /greeting endpoint, which potentially resides in a different host from the UI. Shown below is the Go source code for Service A. Note the use of the ALLOWED_ORIGINS environment variable on lines 32 and 195, which allows you to configure the origins that are allowed from the service’s Deployment resource.

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: Service A
// date: 2021-06-05
package main
import (
"encoding/json"
"fmt"
runtime "github.com/banzaicloud/logrus-runtime-formatter"
"io"
"io/ioutil"
"net/http"
"net/http/httputil"
"os"
"strconv"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
log "github.com/sirupsen/logrus"
)
var (
logLevel = getEnv("LOG_LEVEL", "debug")
port = getEnv("PORT", ":8080")
serviceName = getEnv("SERVICE_NAME", "Service A")
message = getEnv("GREETING", "Hello, from Service A!")
allowedOrigins = getEnv("ALLOWED_ORIGINS", "*")
URLServiceB = getEnv("SERVICE_B_URL", "http://service-b&quot;)
URLServiceC = getEnv("SERVICE_C_URL", "http://service-c&quot;)
)
type Greeting struct {
ID string `json:"id,omitempty"`
ServiceName string `json:"service,omitempty"`
Message string `json:"message,omitempty"`
CreatedAt time.Time `json:"created,omitempty"`
Hostname string `json:"hostname,omitempty"`
}
var greetings []Greeting
// *** HANDLERS ***
func GreetingHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
log.Debug(r)
greetings = nil
callNextServiceWithTrace(URLServiceB+"/api/greeting", r)
callNextServiceWithTrace(URLServiceC+"/api/greeting", r)
tmpGreeting := Greeting{
ID: uuid.New().String(),
ServiceName: serviceName,
Message: message,
CreatedAt: time.Now().Local(),
Hostname: getHostname(),
}
greetings = append(greetings, tmpGreeting)
err := json.NewEncoder(w).Encode(greetings)
if err != nil {
log.Error(err)
}
}
func HealthCheckHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{\"alive\": true}"))
if err != nil {
log.Error(err)
}
}
func ResponseStatusHandler(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
statusCode, err := strconv.Atoi(params["code"])
if err != nil {
log.Error(err)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(statusCode)
}
func RequestEchoHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
requestDump, err := httputil.DumpRequest(r, true)
if err != nil {
log.Error(err)
}
_, err = fmt.Fprintf(w, string(requestDump))
if err != nil {
log.Error(err)
}
}
// *** UTILITY FUNCTIONS ***
func callNextServiceWithTrace(url string, r *http.Request) {
log.Debug(url)
var tmpGreetings []Greeting
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Error(err)
}
// Headers must be passed for Jaeger Distributed Tracing
incomingHeaders := []string{
"x-b3-flags",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-spanid",
"x-b3-traceid",
"x-ot-span-context",
"x-request-id",
}
for _, header := range incomingHeaders {
if r.Header.Get(header) != "" {
req.Header.Add(header, r.Header.Get(header))
}
}
log.Info(req)
client := &http.Client{
Timeout: time.Second * 10,
}
response, err := client.Do(req)
if err != nil {
log.Error(err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
log.Error(err)
}
}(response.Body)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error(err)
}
err = json.Unmarshal(body, &tmpGreetings)
if err != nil {
log.Error(err)
}
for _, r := range tmpGreetings {
greetings = append(greetings, r)
}
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Error(err)
}
return hostname
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func run() error {
c := cors.New(cors.Options{
AllowedOrigins: []string{allowedOrigins},
AllowCredentials: true,
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"},
})
router := mux.NewRouter()
api := router.PathPrefix("/api").Subrouter()
api.HandleFunc("/greeting", GreetingHandler).Methods("GET", "OPTIONS")
api.HandleFunc("/health", HealthCheckHandler).Methods("GET", "OPTIONS")
api.HandleFunc("/request-echo", RequestEchoHandler).Methods(
"GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD")
api.HandleFunc("/status/{code}", ResponseStatusHandler).Methods("GET", "OPTIONS")
api.Handle("/metrics", promhttp.Handler())
handler := c.Handler(router)
return http.ListenAndServe(port, handler)
}
func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}
func main() {
if err := run(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}
view raw main.go hosted with ❤ by GitHub

MongoDB- and RabbitMQ-as-a-Service

Using external services will help us understand how Istio and its observability tools collect telemetry for communications between the reference application platform on Kubernetes and external systems.

Amazon DocumentDB

For this demonstration, the reference application platform’s MongoDB databases will be hosted, external to EKS, on Amazon DocumentDB (with MongoDB compatibility). According to AWS, Amazon DocumentDB is a purpose-built database service for JSON data management at scale, fully managed and integrated with AWS, and enterprise-ready with high durability.

Amazon MQ

Similarly, the reference application platform’s RabbitMQ queue will be hosted, external to EKS, on Amazon MQ. AWS MQ is a managed message broker service for Apache ActiveMQ and RabbitMQ, making it easy to set up and operate message brokers on AWS. Amazon MQ reduces your operational responsibilities by managing the provisioning, setup, and maintenance of message brokers for you. For RabbitMQ, Amazon MQ provides access to the RabbitMQ web console. The console allows us to monitor and manage RabbitMQ.

RabbitMQ Web Console showing the reference platform’s greeting queue

Shown below is the Go source code for Service F. This service consumes messages from the RabbitMQ queue, placed there by Service D, and writes the messages to MongoDB. Services use Sean Treadway’s Go RabbitMQ Client Library and MongoDB’s MongoDB Go Driver for connectivity.

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: Service F
// date: 2021-06-05
package main
import (
"bytes"
"context"
"encoding/json"
runtime "github.com/banzaicloud/logrus-runtime-formatter"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var (
logLevel = getEnv("LOG_LEVEL", "debug")
port = getEnv("PORT", ":8080")
serviceName = getEnv("SERVICE_NAME", "Service F")
message = getEnv("GREETING", "Hola, from Service F!")
queueName = getEnv("QUEUE_NAME", "service-d.greeting")
mongoConn = getEnv("MONGO_CONN", "mongodb://mongodb:27017/admin")
rabbitMQConn = getEnv("RABBITMQ_CONN", "amqp://guest:guest@rabbitmq:5672")
)
type Greeting struct {
ID string `json:"id,omitempty"`
ServiceName string `json:"service,omitempty"`
Message string `json:"message,omitempty"`
CreatedAt time.Time `json:"created,omitempty"`
Hostname string `json:"hostname,omitempty"`
}
var greetings []Greeting
// *** HANDLERS ***
func GreetingHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
greetings = nil
tmpGreeting := Greeting{
ID: uuid.New().String(),
ServiceName: serviceName,
Message: message,
CreatedAt: time.Now().Local(),
Hostname: getHostname(),
}
greetings = append(greetings, tmpGreeting)
callMongoDB(tmpGreeting, mongoConn)
err := json.NewEncoder(w).Encode(greetings)
if err != nil {
log.Error(err)
}
}
func HealthCheckHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{\"alive\": true}"))
if err != nil {
log.Error(err)
}
}
// *** UTILITY FUNCTIONS ***
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Error(err)
}
return hostname
}
func callMongoDB(greeting Greeting, mongoConn string) {
log.Info(greeting)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoConn))
if err != nil {
log.Error(err)
}
defer func(client *mongo.Client, ctx context.Context) {
err := client.Disconnect(ctx)
if err != nil {
log.Error(err)
}
}(client, nil)
collection := client.Database("service-f").Collection("messages")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = collection.InsertOne(ctx, greeting)
if err != nil {
log.Error(err)
}
}
func getMessages(rabbitMQConn string) {
conn, err := amqp.Dial(rabbitMQConn)
if err != nil {
log.Error(err)
}
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
log.Error(err)
}
}(conn)
ch, err := conn.Channel()
if err != nil {
log.Error(err)
}
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
log.Error(err)
}
}(ch)
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
if err != nil {
log.Error(err)
}
msgs, err := ch.Consume(
q.Name,
"service-f",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Error(err)
}
forever := make(chan bool)
go func() {
for delivery := range msgs {
log.Debug(delivery)
callMongoDB(deserialize(delivery.Body), mongoConn)
}
}()
<-forever
}
func deserialize(b []byte) (t Greeting) {
log.Debug(b)
var tmpGreeting Greeting
buf := bytes.NewBuffer(b)
decoder := json.NewDecoder(buf)
err := decoder.Decode(&tmpGreeting)
if err != nil {
log.Error(err)
}
return tmpGreeting
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func run() error {
go getMessages(rabbitMQConn)
router := mux.NewRouter()
api := router.PathPrefix("/api").Subrouter()
api.HandleFunc("/greeting", GreetingHandler).Methods("GET")
api.HandleFunc("/health", HealthCheckHandler).Methods("GET")
api.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(port, router)
}
func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}
func main() {
if err := run(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}
view raw main.go hosted with ❤ by GitHub

Source Code

All source code for this post is available on GitHub within two projects. Go-based microservices source code and Kubernetes resources are located in the k8s-istio-observe-backend project repository. The Angular UI TypeScript-based source code is located in the k8s-istio-observe-frontend project repository. You do not need to clone the Angular UI project for this demonstration. The demonstration uses the 2021-istio branch for both projects.

git clone --branch 2021-istio --single-branch \
https://github.com/garystafford/k8s-istio-observe-backend.git
# optional - not needed for demonstration
git clone --branch 2021-istio --single-branch \
https://github.com/garystafford/k8s-istio-observe-frontend.git

Docker images referenced in the Kubernetes Deployment resource files for the Go services and UI are all available on Docker Hub. The Go microservice Docker images were built using the official Golang Alpine image on DockerHub, containing Go version 1.16.4. Using the Alpine image to compile the Go source code ensures the containers will be as small as possible and minimize the container’s potential attack surface.

Prerequisites

This post will assume a basic level of knowledge of AWS EKS, Kubernetes, and Istio. Furthermore, the post assumes you have already installed recent versions of the AWS CLI v2, kubectl, Weaveworks’ eksctl, Docker, and Istio. Meaning that the aws, kubectl, eksctl, istioctl, and docker command tools are all available from the terminal.

CLI for Amazon EKS

Weaveworks’ eksctl is a simple CLI tool for creating and managing clusters on EKS — Amazon’s managed Kubernetes service for EC2. It is written in Go and uses CloudFormation.

CLI for Istio

The Istio configuration command-line utility, istioctl, is designed to help debug and diagnose the Istio mesh.

Set-up and Installation

To deploy the microservices platform to EKS, we will proceed in roughly the following order:

  1. Create a TLS certificate and Route53 hosted zone records for ALB;
  2. Create an Amazon DocumentDB database cluster;
  3. Create an Amazon MQ RabbitMQ message broker;
  4. Create an EKS cluster;
  5. Modify Kubernetes resources for your own environment;
  6. Deploy AWS Application Load Balancer (ALB) and associated resources;
  7. Deploy Istio to the EKS cluster;
  8. Deploy Fluent Bit to the EKS cluster;
  9. Deploy the reference platform to EKS;
  10. Test and troubleshoot the platform;
  11. Observe the results in part two;

Amazon DocumentDB

As previously mentioned, the MongoDB databases will be hosted, external to EKS, on Amazon DocumentDB, with MongoDB compatibility. Create a DocumentDB cluster. For the sake of simplicity and affordability of the demo, I recommend creating a single db.r5.large node cluster. We will connect from the microservices to Amazon DocumentDB using the supplied mongodb:// connection string.

Amazon DocumentDB clusters are deployed within an Amazon Virtual Private Cloud (Amazon VPC). If you are installing DocumentDB in a separate VPC than EKS, you will need to ensure that the EKS VPC can access the DocumentDB VPC. Per the DocumentDB documentation, DocumentDB clusters can be accessed directly by Amazon EC2 instances or other AWS services that are deployed in the same Amazon VPC. Additionally, Amazon DocumentDB can be accessed via EC2 instances or other AWS services from different VPCs in the same AWS Region or other Regions via VPC peering.

Amazon MQ

Similarly, the RabbitMQ queues will be hosted, external to EKS, on Amazon MQ. Create an Amazon MQ RabbitMQ broker. To ensure the simplicity and affordability of the demo, I recommend a single mq.m5.large instance broker. The broker is running the RabbitMQ engine and has TLS disabled. We will connect from the microservices to Amazon MQ using AMQP (Advanced Message Queuing Protocol). Amazon MQ provides an amqps:// endpoint. The amqps URI scheme is used to instruct a client to make a secure connection to the server. You can manage and observe RabbitMQ from the RabbitMQ web console provided by Amazon MQ.

Modify Kubernetes Resources

You will need to change several configuration settings in the GitHub project’s Kubernetes resource files to match your environment.

Istio ServiceEntry for Document DB

Modify the Istio ServiceEntry resource, external-mesh-document-db.yaml, adding your DocumentDB host address. This file allows egress traffic from the microservices on EKS to the DocumentDB cluster.

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: docdb-external-mesh
spec:
hosts:
- {{ your_document_db_hostname }}
ports:
- name: mongo
number: 27017
protocol: MONGO
location: MESH_EXTERNAL
resolution: NONE

Istio ServiceEntry for Amazon MQ

Modify the Istio ServiceEntry resource, external-mesh-amazon-mq.yaml, adding your Amazon MQ host address. This file allows egress traffic from the microservices on EKS to the Amazon MQ RabbitMQ broker.

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: amazon-mq-external-mesh
spec:
hosts:
- {{ your_amazon_mq_hostname }}
ports:
- name: rabbitmq
number: 5671
protocol: TCP
location: MESH_EXTERNAL
resolution: NONE

Istio Gateway

There are numerous strategies you can use to route traffic into the EKS cluster via Istio. For this demonstration, I am using an AWS Application Load Balancer (ALB). I have mapped one hostname, observe-ui.example-api.com, to the Angular UI application running on EKS. The backend microservice-based API, specifically the edge service, Service A, is mapped to a second hostname, observe-api.example-api.com.

According to Istio, the Gateway describes a load balancer operating at the edge of the mesh, receiving incoming or outgoing HTTP/TCP connections. Modify the Istio Ingress Gateway resource, gateway.yaml. Insert your own DNS entries into the hosts section. These are the only hosts that will be allowed into the mesh on port 80.

apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: istio-gateway
spec:
selector:
istio: ingressgateway # use istio default controller
servers:
- port:
number: 80
name: ui
protocol: HTTP
hosts:
- {{ your_ui_hostname }}
- {{ your_api_hostname }}

Istio VirtualService

According to Istio, a VirtualService defines a set of traffic routing rules to apply when a host is addressed. A VirtualService is bound to a Gateway to control the forwarding of traffic arriving at a particular host and port. Modify the project’s two Istio VirtualServices resources, virtualservices.yaml. Insert the corresponding DNS entries from the Istio Gateway.

---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: angular-ui
spec:
hosts:
- {{ your_ui_hostname }}
gateways:
- istio-gateway
http:
- match:
- uri:
prefix: /
route:
- destination:
host: angular-ui.dev.svc.cluster.local
subset: v1
port:
number: 80
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: service-a
spec:
hosts:
- {{ your_api_hostname }}
gateways:
- istio-gateway
http:
- match:
- uri:
prefix: /api
route:
- destination:
host: service-a.dev.svc.cluster.local
subset: v1
port:
number: 8080

Kubernetes Secret

According to the Kubernetes project, Kubernetes Secrets lets you store and manage sensitive information, such as passwords, OAuth tokens, and SSH keys. Storing confidential information in a Secret is safer and more flexible than putting it verbatim in a Pod definition or in a container image.

The project contains a Kubernetes Opaque type Secret resource, go-srv-demo.yaml. The Secret contains several pieces of arbitrary user-defined data we want to secure. Data includes the full DocumentDB mongodb:// connection string and the Amazon MQ amqps:// connection string used by the microservices. We will use the Secret to secure the entire connection string, including the hostname, port, username, and password. The data also includes the DocumentDB host, username, and password, and an arbitrary username and password to login to Mongo Express using Basic Authentication.

You must encode your secret’s values using base64. On Linux and Mac, you can use the base64 program to encode the connection strings.

echo -n '{{ your_secret_to_encode }}' | base64
# e.g., echo -n 'amqps://username:password@hostname.mq.us-east-1.amazonaws.com:5671/' | base64

Add the base64 encoded values to the Secret resource.

apiVersion: v1
kind: Secret
metadata:
name: go-srv-config
namespace: dev
type: Opaque
data:
mongodb.conn: {{ your_base64_encoded_secret }}
rabbitmq.conn: {{ your_base64_encoded_secret }}
---
apiVersion: v1
kind: Secret
metadata:
name: mongo-express-config
namespace: mongo-express
type: Opaque
data:
me.basicauth.username: {{ your_base64_encoded_secret }}
me.basicauth.password: {{ your_base64_encoded_secret }}
mongodb.host: {{ your_base64_encoded_secret }}
mongodb.username: {{ your_base64_encoded_secret }}
mongodb.password: {{ your_base64_encoded_secret }}

AWS Load Balancer Controller

The project contains a Custom Resource Definition (CRD) and associated resources, aws-load-balancer-controller-v220-all.yaml. These resources configure the AWS Application Load Balancer (ALB) using the AWS Load Balancer Controller v2.2.0, aws-load-balancer-controller. The AWS Load Balancer Controller manages AWS Elastic Load Balancers (ELB) for a Kubernetes cluster. The controller provisions an AWS ALB when you create a Kubernetes Ingress.

Modify line 797 to include the name of your own cluster. I am using the cluster name istio-observe-demo throughout the demo.

spec:
containers:
- args:
- --cluster-name=istio-observe-demo
- --ingress-class=alb
image: amazon/aws-alb-ingress-controller:v2.2.0
livenessProbe:
failureThreshold: 2
httpGet:
path: /healthz
port: 61779
scheme: HTTP

EKS Cluster Config

The project contains an eksctl ClusterConfig resource, cluster.yaml. The ClusterConfig defines the configuration of the Amazon EKS cluster along with networking, security, and other associated resources. Instead of a pre-existing Amazon Virtual Private Cloud (Amazon VPC) for this demo, eksctl will create a VPC and associated AWS resources as part of cluster creation. Modify the file to match your AWS Region, desired EKS cluster name, and Kubernetes release. For the demo, I am using the latest Kubernetes 1.20 release.

apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: istio-observe-demo
region: us-east-1
version: "1.20"
iam:
withOIDC: true

Set Environment Variables

Modify and set the following environment variables in your terminal. I will be using us-east-1 for all the demonstration’s AWS resources that are part of the demonstration. They should match the eksctl ClusterConfig resource above.

export AWS_ACCOUNT=$(aws sts get-caller-identity --output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="istio-observe-demo"

Istio Home

Set your ISTIO_HOME directory. I have the latest Istio 1.10.0 installed and have theISTIO_HOME environment variable set in my Oh My Zsh .zshrc file. I have also set Istio’s bin/ subdirectory in my PATH environment variable. The bin/ subdirectory contains the istioctl executable.

echo $ISTIO_HOME
/Applications/Istio/istio-1.10.0
where istioctl
/Applications/Istio/istio-1.10.0/bin/istioctl
istioctl version

client version: 1.10.0
control plane version: 1.10.0
data plane version: 1.10.0 (4 proxies)

Create EKS Cluster

With the cluster.yaml file modified previously, deploy the EKS cluster to a new VPC on AWS.

eksctl create cluster -f ./resources/other/cluster.yaml

This step deploys a large number of resources using CloudFormation. The complete EKS provisioning process can take up to 15–20 minutes to complete.

For the complete demonstration, eksctl will deploy a total of four CloudFormation stacks to your AWS environment.

Once complete, configure kubectl so that you can connect to an Amazon EKS cluster.

aws eks --region ${EKS_REGION} update-kubeconfig \
--name ${CLUSTER_NAME}

Confirm that your cluster creation was successful with the following commands:

kubectl cluster-info
eksctl utils describe-stacks \
--region ${EKS_REGION} --cluster ${CLUSTER_NAME}

Use the EKS Management Console to review the new cluster’s details.

The EKS cluster in this demonstration was created with a single Amazon EKS managed node group, managed-ng-1. The managed node group contains three m5.large EC2 instances. The composition of the EKS cluster can be modified in the eksctl ClusterConfig resource, cluster.yaml.

Deploy AWS Load Balancer Controller

Using the aws-load-balancer-controller-v220-all.yaml file you previously modified, deploy the AWS Load Balancer Controller v2.2.0. Please carefully review the AWS Load Balancer Controller instructions to understand how this resource is configured and integrated with EKS.

curl -o resources/aws/iam-policy.json \
https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.2.0/docs/install/iam_policy.json
aws iam create-policy \
--policy-name AWSLoadBalancerControllerIAMPolicy220 \
--policy-document file://resources/aws/iam-policy.json

eksctl create iamserviceaccount \
--region ${EKS_REGION} \
--cluster ${CLUSTER_NAME} \
--namespace=kube-system \
--name=aws-load-balancer-controller \
--attach-policy-arn=arn:aws:iam::${AWS_ACCOUNT}:policy/AWSLoadBalancerControllerIAMPolicy220 \
--override-existing-serviceaccounts \
--approve
kubectl apply --validate=false \
-f https://github.com/jetstack/cert-manager/releases/download/v1.3.1/cert-manager.yaml

kubectl apply -f resources/other/aws-load-balancer-controller-v220-all.yaml

To confirm the aws-load-balancer-controller is deployed and ready, run the following command:

kubectl get deployment -n kube-system aws-load-balancer-controller
NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
aws-load-balancer-controller
1/1 1 1 55s

AWS Load Balancer Controller Policy

There is an OpenID Connect provider URL associated with the EKS cluster. To use IAM roles for service accounts, an IAM OIDC provider must exist for your cluster. Obtain the URL from the EKS Management Console’s Details tab.

You can also obtain the URL using the following AWS CLI commands:

aws eks describe-cluster --name ${CLUSTER_NAME}
aws iam list-open-id-connect-providers

The project contains a policy document, trust-eks-policy.json. Modify the policy document by adding the OpenID Connect information found above. Instructions are also included in the AWS Create an IAM OIDC provider for your cluster documentation.

{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Principal":{
"Federated":" {{ your_openid_connect_arn }}"
},
"Action":"sts:AssumeRoleWithWebIdentity",
"Condition":{
"StringEquals":{
"oidc.eks.us-east-1.amazonaws.com/id/{{ your_open_id_connect_id }}:sub":"system:serviceaccount:kube-system:alb-ingress-controller"
}
}
}
]
}

Create and attach the AWS Load Balancer Controller IAM policies and roles.

aws iam create-role \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--assume-role-policy-document file://resources/aws/trust-eks-policy.json

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn="arn:aws:iam::${AWS_ACCOUNT}:policy/AWSLoadBalancerControllerIAMPolicy220"

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy

Create Namespaces

Kubernetes supports multiple virtual clusters backed by the same physical cluster. These virtual clusters are called namespaces. The dev namespace will house the reference application platform for this demonstration — the Angular UI frontend and Go microservices backend. This namespace represents a development environment on EKS for our reference application platform. A second namespace, mongo-express, will be used to deploy Mongo Express later in the post.

kubectl apply -f ./minikube/resources/namespaces.yaml

Enable Automatic Sidecar Injection

To take advantage of Istio’s features, pods in the mesh must be running an Istio sidecar proxy. By setting the istio-injection=enabled label on a namespace and the injection webhook is enabled, any new pods created in that namespace will automatically have an Istio sidecar proxy added to them. Labeling the dev namespace for automatic sidecar injection ensures that our reference application platform — the UI and the microservices — will have Istio sidecar proxy automatically injected into their pods.

kubectl label namespace dev istio-injection=enabled

Deploy Secret Resources

Create the DocumentDB and Amazon MQ Secrets in the appropriate dev and mongo-express namespaces.

kubectl apply -f ./resources/secrets/secrets.yaml

Install Istio Configuration Profile

Istio comes with several built-in configuration profiles. The profiles provide customization of the Istio control plane and the sidecars for the Istio data plane.

istioctl profile list
Istio configuration profiles:
default
demo
empty
external
minimal
openshift
preview
remote

For this demonstration, use the default profile, which installs Istio core, istiod, istio-ingressgateway, and istio-egressgateway.

istioctl install --set profile=demo -y
✔ Istio core installed
✔ Istiod installed
✔ Ingress gateways installed
✔ Egress gateways installed
✔ Installation complete

Deploy Istio Gateway, VirtualService, and DestinationRule Resources

An Istio Gateway describes a load balancer operating at the edge of the mesh receiving incoming or outgoing HTTP/TCP connections. An Istio VirtualService defines a set of traffic routing rules to apply when a host is addressed. Lastly, an Istio DestinationRule defines policies that apply to traffic intended for a Service after routing has occurred. You need to deploy an Istio Gateway and a set of VirtualService. You will also need to deploy a set of DestinationRule resources. Create the Istio Gateway, Virtual Services, and Destination Rules, which you modified earlier.

kubectl apply -f resources/istio/gateway.yaml -n dev
kubectl apply -f resources/istio/virtualservices.yaml -n dev
kubectl apply -f resources/istio/destination-rules.yaml -n dev

Deploy Istio Telemetry Add-ons

The Istio project includes sample deployments of various telemetry add-ons that integrate with Istio. The add-ons include Jaeger, Zipkin, Kiali, Prometheus, and Grafana. While these applications are not a part of Istio, they are essential to making the most of Istio’s observability features. According to the Istio project, the deployments are meant to quickly get up and running and are optimized for this case. As a result, they may not be suitable for production. See the GitHub project for more info on integrating a production-grade version of each add-on.

Install the add-ons using the default configurations and then replace Prometheus with a modified version included in the project. The modified Kubernetes ConfigMap in the prometheus.yaml file has added configuration to scrape our reference platform’s /api/metrics endpoint.

kubectl apply -f $ISTIO_HOME/samples/addons
kubectl apply -f resources/istio/prometheus.yaml -n istio-system

You should see seven workloads in the namespace from the EKS Management Console’s Workloads tab, each with one pod up and running. The workloads include Grafana, Jaeger, Kiali, and Prometheus. Also included is the Istio Configuration demo Profile’s istiod, istio-ingressgateway, and istio-egressgateway, installed previously.

Deploy Kubernetes Web UI (Dashboard)

Kubernetes Web UI (Dashboard) is a web-based Kubernetes user interface. You can use the Dashboard to deploy containerized applications to a Kubernetes cluster, troubleshoot your containerized application, and manage cluster resources. You can use the Dashboard to get an overview of applications running on your cluster, as well as for creating or modifying individual Kubernetes resources.

To deploy the dashboard, follow the steps outlined in the Tutorial: Deploy the Kubernetes Dashboard (web UI).

kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.0.5/aio/deploy/recommended.yaml
kubectl apply -f resources/aws/eks-admin-service-account.yaml

Each Service Account has a Secret with a valid Bearer Token that can be used to log in to the Dashboard. Use the following command to retrieve the token associated with the eks-admin Account.

kubectl -n kube-system describe secret $(kubectl -n kube-system get secret | grep eks-admin | awk '{print $1}')

Start the kubectl proxy in a separate terminal window.

kubectl proxy

Use the eks-admin Account’s token to log in to the Kubernetes Dashboard at the following URL:

http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#!/login

Deploy Mongo Express

Mongo Express is a web-based MongoDB administrative interface written with Node.js, Express, and Bootstrap3. Install Mongo Express into the mongo-express namespace on the EKS cluster to manage the DocumentDB cluster.

kubectl apply -f ./resources/services/mongo-express.yaml -n mongo-express

Obtain the external IP address of any of the Kubernetes worker nodes and the NodePort of Mongo Express with the following two commands:

kubectl get nodes -o wide |  awk {'print $1" " $2 " " $7'} | column -t
kubectl get service/mongo-express -n mongo-express

To ensure secure access to Mongo Express, create an Inbound Rule in your VPC’s Security Group that allows only your IP address (the ‘My IP’ option) access to Mongo Express running on the NodePort obtained above.

Start the kubectl proxy in a separate terminal window.

kubectl proxy

Use the external IP address of any of the Kubernetes worker nodes and current NodePort to access Mongo Express. Mongo Express will require you to enter the username and password you encoded in the Kubernetes Secret created earlier using basic authentication. Once you have deployed the reference application platform, later in the post, you will observe four databases: service-c, service-f, service-g, and service-h. The typical operational databases you would normally see with your own MongoDB installation are unavailable in the UI since DocumentDB is a managed service.

Mongo Express UI showing the four reference platform’s databases

Modify and Deploy the ALB Ingress

The project contains an ALB Ingress resource, alb-ingress.yaml. The AWS Load Balancer Controller installed earlier is configured to limit the ingresses ALB ingress controller controls. By setting the --ingress-class=alb argument, it constrains the controller’s scope to ingresses with matching kubernetes.io/ingress.class: alb annotation. This is especially helpful when running multiple ingress controllers in the same cluster.

The ALB Ingress resource, alb-ingress.yaml, needs to be modified before deployment. First, update the alb.ingress.kubernetes.io/healthcheck-port annotation. The port value is derived from the status-port of the istio-ingressgateway, which was installed as part of the Istio demo configuration profile. To obtain the status-port from the istio-ingressgateway, run the following command:

kubectl -n istio-system get svc istio-ingressgateway \
-o jsonpath='{.spec.ports[?(@.name=="status-port")].nodePort}'

Next, insert the ARN of your SSL/TLS (Transport Layer Security) certificate that is associated with the domain listed in the external-dns.alpha.kubernetes.io/hostname annotation into the ALB Ingress resource, alb-ingress.yaml. Run the following command to insert the TLS certificate’s ARN into the alb.ingress.kubernetes.io/certificate-arn annotation. This command assumes that your SSL/TLS certificate is registered with AWS Certificate Manager (ACM).

export ALB_CERT=$(aws acm list-certificates --certificate-statuses ISSUED \
| jq -r '.CertificateSummaryList[] | select(.DomainName=="*.example-api.com") | .CertificateArn')
yq e '.metadata.annotations."alb.ingress.kubernetes.io/certificate-arn" = env(ALB_CERT)' -i resources/other/alb-ingress.yaml

The alb.ingress.kubernetes.io/actions.ssl-redirect annotation will redirect all HTTP traffic to HTTPS. The TLS certificate is used for HTTPS traffic. The ALB then terminates the HTTPS traffic at the ALB and forwards the unencrypted traffic to the EKS cluster on port 80.

Finally, update external-dns.alpha.kubernetes.io/hostname annotation with a common-delimited list of your platform’s UI and API hostnames. Below is the complete ALB Ingress resource, alb-ingress.yaml.

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: demo-ingress
namespace: istio-system
annotations:
kubernetes.io/ingress.class: alb
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/tags: Environment=dev
alb.ingress.kubernetes.io/healthcheck-port: '{{ your_status_port }}'
alb.ingress.kubernetes.io/healthcheck-path: /healthz/ready
alb.ingress.kubernetes.io/healthcheck-protocol: HTTP
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/actions.ssl-redirect: '{"Type": "redirect", "RedirectConfig": { "Protocol": "HTTPS", "Port": "443", "StatusCode": "HTTP_301"}}'
external-dns.alpha.kubernetes.io/hostname: "{{ your_ui_hostname, your_api_hostname }}"
alb.ingress.kubernetes.io/certificate-arn: "{{ your_ssl_tls_cert_arn }}"
alb.ingress.kubernetes.io/load-balancer-attributes: routing.http2.enabled=true,idle_timeout.timeout_seconds=30
labels:
app: reference-app
spec:
rules:
- http:
paths:
- pathType: Prefix
path: /
backend:
service:
name: ssl-redirect
port:
name: use-annotation
- pathType: Prefix
path: /
backend:
service:
name: istio-ingressgateway
port:
number: 80
- pathType: Prefix
path: /api
backend:
service:
name: istio-ingressgateway
port:
number: 80

To deploy the ALB Ingress resource, alb-ingress.yaml, run the following command:

kubectl apply -f resources/other/alb-ingress.yaml

To confirm the configuration of the AWS Load Balancer Controller and the ingresses ALB ingress controller controls, run the following command:

kubectl describe ingress.networking.k8s.io --all-namespaces

Any misconfigurations should show up as errors in the Events section.

Running the following command should display the public DNS address of the ALB associated with port 80.

kubectl -n istio-system get ingress
NAME           CLASS    HOSTS   ADDRESS                                                                   PORTS   AGE
demo-ingress <none> * k8s-istiosys-demoingr-
...us-east-1.elb.amazonaws.com 80 23m

Use the EC2 Load Balancer Management Console to review the new ALB’s details.

Deploy Fluent Bit

According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open-source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.

We will use Fluent Bit to send the reference platform’s logs to Amazon CloudWatch Container Insights. To install Fluent Bit, I have used the procedure outlined in the AWS documentation: Quick Start Setup for Container Insights on Amazon EKS and Kubernetes. I recommend reviewing this documentation for detailed installation instructions.

kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/cloudwatch-namespace.yaml

ClusterName=${CLUSTER_NAME}
RegionName=${EKS_REGION}
FluentBitHttpPort='2020'
FluentBitReadFromHead='Off'
[[ ${FluentBitReadFromHead} = 'On' ]] && FluentBitReadFromTail='Off'|| FluentBitReadFromTail='On'
[[ -z ${FluentBitHttpPort} ]] && FluentBitHttpServer='Off' || FluentBitHttpServer='On'
kubectl create configmap fluent-bit-cluster-info \
--from-literal=cluster.name=${ClusterName} \
--from-literal=http.server=${FluentBitHttpServer} \
--from-literal=http.port=${FluentBitHttpPort} \
--from-literal=read.head=${FluentBitReadFromHead} \
--from-literal=read.tail=${FluentBitReadFromTail} \
--from-literal=logs.region=${RegionName} -n amazon-cloudwatch

kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluent-bit/fluent-bit.yaml

kubectl get pods -n amazon-cloudwatch

DASHBOARD_NAME=istio_observe_demo
REGION_NAME=${EKS_REGION}
CLUSTER_NAME=${CLUSTER_NAME}

curl https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/service/cwagent-prometheus/sample_cloudwatch_dashboards/fluent-bit/cw_dashboard_fluent_bit.json \
| sed "s/{{YOUR_AWS_REGION}}/${REGION_NAME}/g" \
| sed "s/{{YOUR_CLUSTER_NAME}}/${CLUSTER_NAME}/g" \
| xargs -0 aws cloudwatch put-dashboard --dashboard-name ${DASHBOARD_NAME} --dashboard-body

curl https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluentd/fluentd.yaml | kubectl delete -f -
kubectl delete configmap cluster-info -n amazon-cloudwatch

From the EKS Management Console’s Workloads tab, you should see three fluent-bit pods up and running in the amazon-cloudwatch namespace. There is one fluent-bit pod per EKS worker node.

Once the reference application platform is deployed and running, you should be able to visualize the application in the Amazon CloudWatch Container Insights console’s Map view.

Amazon CloudWatch Container Insights UI

The reference platform’s cluster logs will also be available in Amazon CloudWatch. You should have access to individual Log groups for each application’s components.

Lastly, individual pod logs can also be viewed through the Kubernetes Dashboard. The microservice’s log verbosity level is set to info by default. This level can be changed using the LOG_LEVEL environment variable in the service’s Kubernetes Deployment resource.

Deploy ServiceEntry Resources

Using Istio ServiceEntry configurations, you can reach any publicly accessible service from within your Istio cluster. The Istio proxy can be configured to block any host without an HTTP service or service entry defined within the mesh. We will not go to this extreme in the demonstration. However, we will configure ServiceEntry configurations to monitor egress traffic to the reference platform’s two external services, DocumentDB and Amazon MQ.

Confirm the istio-egressgateway is running, then deploy the two ServiceEntry resources you modified earlier.

kubectl get pod -l istio=egressgateway -n istio-system
NAME                                   READY   STATUS    RESTARTS   AGE
istio-egressgateway-585f7668fc-74qtf 1/1 Running 0 14h
kubectl apply -f resources/istio/external-mesh-document-db-internal.yaml
kubectl apply -f resources/istio/external-mesh-amazon-mq-internal.yaml

Deploy the Reference Application Platform

Each of the platform’s components has a file in the project containing both the Kubernetes Service and corresponding Deployment resources.

apiVersion: v1
kind: Service
metadata:
name: service-h
labels:
app: service-h
component: service
spec:
ports:
name: http
port: 8080
selector:
app: service-h
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: service-h
labels:
app: service-h
component: service
version: v1
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 1
selector:
matchLabels:
app: service-h
component: service
version: v1
template:
metadata:
labels:
app: service-h
component: service
version: v1
spec:
containers:
name: service-h
image: registry.hub.docker.com/garystafford/go-srv-h:1.6.8
livenessProbe:
httpGet:
path: /api/health
port: 8080
initialDelaySeconds: 3
periodSeconds: 3
env:
name: LOG_LEVEL
value: info
name: MONGO_CONN
valueFrom:
secretKeyRef:
name: go-srv-config
key: mongodb.conn
name: GREETING
value: "Nǐ hǎo (你好), from Service H!"
ports:
containerPort: 8080
imagePullPolicy: Always
view raw service-h.yaml hosted with ❤ by GitHub

Deploy the reference application platform’s frontend UI and eight backend microservices to the EKS cluster using the following commands:

kubectl apply -f ./resources/services/angular-ui.yaml -n dev

for service in a b c d e f g h; do
kubectl apply -f "./resources/services/service-$service.yaml" -n dev
done

From the EKS Management Console’s Workloads tab, you should observe that the three pods for each reference application platform component are up and running in the dev namespace.

You can also use the Kubernetes Dashboard to confirm that the deployments were successful to the dev namespace.

Test the Platform

You want to ensure the platform’s web-based UI is reachable via the AWS Application Load Balancer to EKS through Istio and to the UI’s FQDN (fully qualified domain name) of angular-ui.dev.svc.cluster.local. You want to ensure the platform’s eight microservices are communicating with each other and communicating with the external DocumentDB cluster and Amazon MQ RabbitMQ broker. The easiest way to test the cluster is by viewing the Angular UI in a web browser. For example, in my case, https://observe-ui.example-api.com.

Reference Application Platform’s Angular-based UI

The UI requires you to input the hostname of the backend, which is the edge service, Service A. For example, in my case, https://observe-api.example-api.com. Since you want to use your own hostname and the UI’s JavaScript code is running locally in your web browser, this option allows you to provide your own hostname. This is the same hostname you inserted into the Istio VirtualService for Service A. This hostname routes the API calls to the FQDN of Service A running in the dev namespace, service-a.dev.svc.cluster.local. You should observe seven greeting responses displayed in the UI, all but Service F.

You can also use tools like Postman to test the backend directly, using the same hostname of the backend, as above.

Using Postman to make requests against the /greeting endpoint of Service A
Using Postman to make requests against the /request-echo endpoint of Service A

Load Testing with Hey

You can also use performance testing tools to load-test the platform. Many issues will not show up until the platform is placed under elevated load. I recently tried hey, a modern go-based load generator tool as a replacement for Apache Bench (ab), Unlike ab, hey supports HTTP/2 endpoints, which is required to test the platform on EKS with Istio. You can install hey with Homebrew.

brew install hey

Using hey, you can test the reference application platform by hitting the API hostname and /api/greeting endpoint. The command below generates 1,000 requests, simulates 25 concurrent users, and uses HTTP/2. Traffic will be generated across all the services, the RabbitMQ broker, and the DocumentDB databases.

hey -n 1000 -c 25 -h2 {{ your_api_hostname }}/api/greeting

The results show 1,000 successful HTTP 200 responses from the reference platform’s API in about 43 seconds with an average response time of 1.0430 seconds.

To generate a consistent level of traffic over a longer period of time, try this variation of the command:

hey -n 25000 -c 25 -q 1 -h2 {{ your_api_hostname }}/api/greeting

This command generates a steady stream of traffic for about 18 minutes, making it more convenient when exploring and troubleshooting your observability tools.

Part Two

In part two of this post, we will explore each observability tool and see how they can help us manage the reference application platform running on the EKS cluster.

Jaeger UI showing a distributed trace of Service A

To tear down the EKS cluster, DocumentDB cluster, and Amazon MQ broker, use the commands below.

# EKS cluster
eksctl delete cluster --name $CLUSTER_NAME

# Amazon MQ
aws mq list-brokers | jq -r '.BrokerSummaries[] | .BrokerId'
aws mq delete-broker --broker-id {{ your_broker_id }}
# DocumentDB
aws docdb describe-db-clusters \
| jq -r '.DBClusters[] | .DbClusterResourceId'
aws docdb delete-db-cluster \
--db-cluster-identifier {{ your_cluster_id }}

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Cross-Account Amazon Elastic Container Registry (ECR) Access for ECS

Deploying containerized applications on Amazon ECS using cross-account elastic container registries

This is an updated version of a post, originally published in October 2019. This post uses AWS CLI version 2 and contains updated versions of all Docker images.

Introduction

There are two scenarios I frequently encounter that require sharing Amazon Elastic Container Registry (ECR)-based Docker images across multiple AWS Accounts. In the first scenario, a vendor wants to share a Docker image with their customer, stored in the vendor’s private container registry. Many popular container security and observability solutions function in this manner.

Below, we see an example of an application consisting of three containers. Two of the container images originated from the customer’s own ECR repositories (right side). The third image originated from their vendor’s ECR repository (left side).

In the second scenario, an enterprise operates multiple AWS accounts to create logical security boundaries between environments and responsibilities. The first AWS account contains the enterprise’s deployable assets, including their ECR image repositories. The enterprise has additional accounts, such as Development, Test, Staging, and Production, for each Software Development Life Cycle (SDLC) phase. The ECR images in the repository account need to be accessed from multiple AWS accounts and often across different AWS Regions for deployment.

Below, we see an example of a deployed application also consisting of three containers. All the container images originated from the ECR repositories account (left side). The images were pulled into the Production account during deployment to ECS (right side).

This post will explore the first scenario — a vendor who wants to share a private Docker image with their customer securely. The post will demonstrate how to share images across AWS accounts for use with Docker Swarm and Amazon Elastic Container Service (ECS) with AWS Fargate, both using ECR Repository Policies.

For the demonstration, we will use an existing application I have created, a RESTful, HTTP-based NLP (Natural Language Processing) API, consisting of four Golang microservices. The edge service, nlp-client, communicates with the rake-app, lang-app, and prose-app services. There is a fifth service, dyanmo-app, which is not discussed in this post, but easily added to the API.

A customer has developed the nlp-client, lang-app, and prose-app container-based microservices as part of their NLP application in the post’s scenario. Instead of developing their own implementation of the RAKE (Rapid Automatic Keyword Extraction) algorithm, they have licensed a version from a vendor. The vendor’s rake-app service is delivered in the form of a licensed Docker image. The acronym ISV (Independent Software Vendor) is used to represent the vendor throughout the code.

The NPL API exposes several endpoints accessible through the nlp-client service. The endpoints perform common NLP operations on text input, such as extracting keywords, tokens, entities, and sentences, and determining the language. All the endpoints can be listed using the /routes endpoint.

[
{
"method": "GET",
"path": "/error",
"name": "main.getError"
},
{
"method": "POST",
"path": "/keywords",
"name": "main.getKeywords"
},
{
"method": "POST",
"path": "/language",
"name": "main.getLanguage"
},
{
"method": "GET",
"path": "/health",
"name": "main.getHealth"
},
{
"method": "GET",
"path": "/health/:app",
"name": "main.getHealthUpstream"
},
{
"method": "GET",
"path": "/routes",
"name": "main.getRoutes"
},
{
"method": "POST",
"path": "/tokens",
"name": "main.getTokens"
},
{
"method": "POST",
"path": "/entities",
"name": "main.getEntities"
},
{
"method": "POST",
"path": "/sentences",
"name": "main.getSentences"
}
]

Requirements

To follow along with the post’s demonstration, you will need two AWS accounts, one representing the vendor and one representing one of their customers. It is relatively simple to create additional AWS accounts — all you need is a unique email address (easy with Gmail) and a credit card. Using AWS Organizations can make the task of creating and managing multiple accounts even easier.

I have intentionally used different AWS Regions to demonstrate how you can share ECR images across both AWS accounts and regions. You will need a current version of the AWS CLI version 2 and of Docker. Lastly, you will need adequate access to each AWS account to create resources.

Source Code

The demonstration’s source code is contained in five public GitHub repositories.

git clone --branch v2.0.0 \
    --single-branch --depth 1 \
    https://github.com/garystafford/ecr-cross-account-demo.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/nlp-client.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/prose-app.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/rake-app.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/lang-app.git

The v2.0.0 branch of the ecr-cross-account-demo GitHub repository contains all the CloudFormation templates and the Docker Compose Stack file.

.
├── LICENSE
├── README.md
├── cfn-templates
│ ├── development-user-group-customer.yml
│ ├── development-user-group-isv.yml
│ ├── ecr-repo-not-shared.yml
│ ├── ecr-repo-shared.yml
│ ├── public-subnet-public-loadbalancer.yml
│ └── public-vpc.yml
└── docker
└── stack.yml

Each of the other four GitHub repositories, such as the nlp-client repository, contains a Golang-based microservice, which together comprises the NLP API. Each repository also contains a Dockerfile.

.
├── Dockerfile
├── LICENSE
├── README.md
├── buildspec.yml
├── go.mod
├── go.sum
└── main.go

We will use AWS CloudFormation to create the necessary resources within both AWS accounts. We will also use CloudFormation to create an ECS Cluster and an Amazon ECS Task Definition for the customer account. Task Definition defines how ECS will deploy the application, consisting of four Docker containers, using AWS Fargate. In addition to ECS, we will create an Amazon Virtual Private Cloud (VPC) to house the ECS cluster and a public-facing, Layer 7 Application Load Balancer (ALB) to load-balance our ECS-based application.

Creating ECR Repositories

In the first AWS account, representing the vendor, we will execute two CloudFormation templates. The first template, development-user-group-isv.yml, creates the Development group and VendorDev user. The VendorDev user will be given explicit access to the vendor’s rake-app ECR repository. Change the DevUserPassword parameter’s value to something more secure.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export IAM_USER_PSWD=T0pS3cr3Tpa55w0rD
aws --region ${ISV_ECR_REGION} cloudformation create-stack \
--stack-name development-user-group-isv \
--template-body file://cfn-templates/development-user-group-isv.yml \
--parameters \
ParameterKey=DevUserPassword,ParameterValue=${IAM_USER_PSWD} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting CloudFormation Stack showing the new IAM Group and User.

Next, we will execute the second CloudFormation template, ecr-repo-shared.yml, which creates the vendor’s rake-app ECR image repository. The rake-app repository will house a copy of the vendor’s rake-app Docker Image. But first, let’s look at the CloudFormation template used to create the repository, specifically the RepositoryPolicyText section. Here we define two repository policies:

  • The AllowPushPull policy explicitly allows the VendorDev user to push and pull versions of the image to the ECR repository. We import the exported Amazon Resource Name (ARN) of the VendorDev user from the previous CloudFormation Stack Outputs. We have also allowed AWS CodeBuild service access to the ECR repository. This is known as a Service-Linked Role. We will not use CodeBuild in this brief post.
  • The AllowPull policy allows anyone in the customer’s AWS account (root) to pull any version of the image. They cannot push, only pull. Cross-account access can be restricted to a finer-grained set of the specific customer’s IAM Entities and source IP addresses.

Note the "ecr:GetAuthorizationToken" policy Action. This action will allow the customer’s user to log into the vendor’s ECR repository and receive an Authorization Token. The customer retrieves a token that is valid for a specified container registry for 12 hours.

RepositoryPolicyText:
Version: '2012-10-17'
Statement:
- Sid: AllowPushPull
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
AWS:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'DevUserArn']]
Action:
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:BatchGetImage'
- 'ecr:CompleteLayerUpload'
- 'ecr:DescribeImages'
- 'ecr:DescribeRepositories'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:GetRepositoryPolicy'
- 'ecr:InitiateLayerUpload'
- 'ecr:ListImages'
- 'ecr:PutImage'
- 'ecr:UploadLayerPart'
- Sid: AllowPull
Effect: Allow
Principal:
AWS: !Join [':', ['arn:aws:iam:', !Ref 'CustomerAccount', 'root']]
Action:
- 'ecr:GetAuthorizationToken'
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:BatchGetImage'
- 'ecr:DescribeRepositories' # optional permission
- 'ecr:DescribeImages' # optional permission

Before executing the following command to deploy the CloudFormation Stack, ecr-repo-shared.yml, replace the CUSTOMER_ACCOUNT value with your pseudo customer’s AWS account ID.

# change me
export CUSTOMER_ACCOUNT=999888777666
export CUSTOMER_ECR_REGION=us-west-2
# NLP Rake Microservice
REPO_NAME=rake-app
aws --region ${ISV_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-shared.yml \
--parameters \
ParameterKey=CustomerAccount,ParameterValue=${CUSTOMER_ACCOUNT} \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting CloudFormation Stack showing the new ECR repository.

Below, we see the ECR repository policies applied correctly in the Permissions tab of the rake-app repository. The first policy covers both the VendorDev user, referred to as an IAM Entity, as well as AWS CodeBuild, referred to as a Service Principal.

The second policy covers the customer’s AWS account ID.

Repeat this process in the customer’s AWS account. First, the CloudFormation template, development-user-group-customer.yml, containing the Development group and CustomerDev user.

# change me
export IAM_USER_PSWD=T0pS3cr3Tpa55w0rD
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name development-user-group-customer \
--template-body file://cfn-templates/development-user-group-customer.yml \
--parameters \
ParameterKey=DevUserPassword,ParameterValue=${IAM_USER_PSWD} \
--capabilities CAPABILITY_NAMED_IAM

Next, we will execute the second CloudFormation template, ecr-repo-not-shared.yml, three times, once for each of the customer’s three ECR repositories, nlp-client, lang-app, and prose-app. Note that in the RepositoryPolicyText section of the template we only define a single policy. Identical to the vendor’s policy, the AllowPushPull policy explicitly allows the previously-created CustomerDev user to push and pull versions of the image to the ECR repository. There is no cross-account access required to the customer’s two ECR repositories.

RepositoryPolicyText:
Version: '2012-10-17'
Statement:
- Sid: AllowPushPull
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
AWS:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'DevUserArn']]

Action:
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:BatchGetImage'
- 'ecr:CompleteLayerUpload'
- 'ecr:DescribeImages'
- 'ecr:DescribeRepositories'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:GetRepositoryPolicy'
- 'ecr:InitiateLayerUpload'
- 'ecr:ListImages'
- 'ecr:PutImage'
- 'ecr:UploadLayerPart'

Execute the following commands to create the three CloudFormation Stacks. The Stacks use the same template, ecr-repo-not-shared.yml, with a different Stack name and RepoName parameter values.

# NLP Client microservice
REPO_NAME=nlp-client
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

# NLP Prose microservice
REPO_NAME=prose-app
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM
# NLP Language microservice
REPO_NAME=lang-app
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting three ECR repositories.

At this point, we have our four ECR repositories across the two AWS accounts, with the proper ECR Repository Policies applied to each.

Building and Pushing Images to ECR

Next, we will build and push the three NLP application images to their corresponding ECR repositories. To confirm that the ECR policies are working correctly, log in as the VendorDev user and perform the below command.

aws ecr get-login-password --region ${ISV_ECR_REGION} \
| docker login --username AWS --password-stdin ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com

Logged in as the vendor’s VendorDev user, build and push the Docker image to the rake-app repository. The Dockerfile and Golang source code are located in each GitHub repository. With Golang and Docker multi-stage builds, we will create very small Docker images, based on Scratch, containing just the compiled Go executable binary. At a mere 7–15 MBs in size, pushing and pulling these Docker images across accounts is very fast.

docker build -t ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0 . --no-cache
docker push ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0

Below, we see the output from the vendor’s VendorDev user logging into the rake-app repository.

We see the vendor’s VendorDev user building results and pushing the Docker image to the rake-app repository.

Next, after logging in as the customer’s CustomerDev user, build and push the Docker images to the ECR nlp-client, lang-app, and prose-app repositories. Again, make sure you substitute the variable values below with your pseudo customer’s AWS account and preferred AWS region.

aws ecr get-login-password --region ${CUSTOMER_ECR_REGION} \
| docker login --username AWS --password-stdin ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com
# nlp-client
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0
# prose-app
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0
# lang-app
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0

At this point, each of the four customer ECR repositories has a Docker image pushed to them.

Deploying Locally to Docker Swarm

As a simple demonstration of cross-account ECS access, we will start with Docker Swarm. Logged in as the customer’s CustomerDev user and using the Docker Swarm Stack file included in the project, we can create and run a local copy of our NLP application in our customer’s account. First, we need to log into the vendor’s ECR repository in order to pull the image from the vendor’s ECR registry.

aws ecr get-login-password --region ${ISV_ECR_REGION} \
| docker login --username AWS --password-stdin ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com

Once logged in to the vendor’s ECR repository, we will pull the image. Using the docker describe-repositories and docker describe-images, we can list cross-account repositories and images your IAM user has access to if you are unsure.

aws ecr describe-repositories \
--registry-id ${ISV_ACCOUNT} \
--region ${ISV_ECR_REGION} \
--repository-name rake-app
aws ecr describe-images \
--registry-id ${ISV_ACCOUNT} \
--region ${ISV_ECR_REGION} \
--repository-name rake-app
docker pull ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0

Using the following command, you should see each of our four applications Docker images.

docker image ls --filter=reference='*amazonaws.com/*'

Below, we see an example of the expected terminal output from pulling the image and listing the images.

Build Docker Stack Locally

Next, build the Docker Swarm Stack. The Docker Compose file, stack.yml, is shown below. Note the location of the Docker images.

version: '3.9'
services:
nlp-client:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0
networks:
nlp-demo
ports:
target: 8080
published: 8080
protocol: tcp
mode: host
environment:
NLP_CLIENT_PORT
RAKE_ENDPOINT
PROSE_ENDPOINT
LANG_ENDPOINT
API_KEY
rake-app:
image: ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0
networks:
nlp-demo
environment:
RAKE_PORT
API_KEY
prose-app:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0
networks:
nlp-demo
environment:
PROSE_PORT
API_KEY
lang-app:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0
networks:
nlp-demo
environment:
LANG_PORT
API_KEY
networks:
nlp-demo:
volumes:
data: {}
view raw stack.yaml hosted with ❤ by GitHub

Execute the following commands to deploy the Docker Stack to Docker Swarm. Again, make sure you substitute the variable values below with your pseudo vendor and customer AWS accounts and regions. Additionally, the NLP API uses an API Key to protect all exposed endpoints, except the /health endpoint, across all four services. Change the default CloudFormation template’s API Key parameter to something more secure.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export CUSTOMER_ACCOUNT=999888777666
export CUSTOMER_ECR_REGION=us-west-2
export API_KEY=SuP3r5eCRetAutHK3y

# don't change me
export NLP_CLIENT_PORT=8080
export RAKE_PORT=8080
export PROSE_PORT=8080
export LANG_PORT=8080
export RAKE_ENDPOINT=http://rake-app:${RAKE_PORT}
export PROSE_ENDPOINT=http://prose-app:${PROSE_PORT}
export LANG_ENDPOINT=http://lang-app:${LANG_PORT}
export TEXT="The Nobel Prize is regarded as the most prestigious award in the World. Notable winners have included Marie Curie, Theodore Roosevelt, Albert Einstein, George Bernard Shaw, and Winston Churchill."
 
docker swarm init 
docker stack deploy --compose-file docker/stack.yml nlp

You can check the success of the deployment with either of the following commands:

docker stack ps nlp --no-trunc
docker container ls

Below, we see an example of the expected terminal output.

With the Docker Stack, you can hit the nlp-client service directly on localhost:8080. Unlike Fargate, which requires unique static ports for each container in the task, with Docker, we can choose to run all the containers on the same port without conflict since only the nlp-client service is exposing port :8080. Unlike with ECS, there is no load balancer in front of the Stack, since we only have a single node in our Swarm and thus a single container instance of each microservice for testing.

To test that the images were pulled successfully and the Docker Stack is running, we can execute a curl command against any of the API endpoints, such as /keywords. Below, I am using jq to pretty-print the JSON response payload.

curl -s -X POST \
"http://localhost:${NLP_CLIENT_PORT}/keywords" \
-H 'Content-Type: application/json' \
-H "X-API-Key: ${API_KEY}" \
-d '{"text": "The Internet is the global system of interconnected computer networks that use the Internet protocol suite to link devices worldwide."}' | jq

The resulting JSON response payload indicates that the nlp-client service was reached successfully and that it was then subsequently able to communicate with the rake-app service, whose container image originated from the vendor’s ECR repository.

[
{
"candidate": "interconnected computer networks",
"score": 9
},
{
"candidate": "link devices worldwide",
"score": 9
},
{
"candidate": "internet protocol suite",
"score": 8
},
{
"candidate": "global system",
"score": 4
},
{
"candidate": "internet",
"score": 2
}
]

Creating Amazon ECS Environment

Although using Docker Swarm locally is a great way to understand how cross-account ECR access works, it is not a typical use case for deploying containerized applications on the AWS Platform. More often, you could use Amazon ECS, Amazon Elastic Kubernetes Service (EKS), or enterprise versions of third-party orchestrators such as RedHat OpenShift or Rancher.

Using CloudFormation and some very convenient CloudFormation templates supplied by Amazon as a starting point, we will create a complete ECS environment for our application. First, we will create a VPC to house the ECS cluster and a public-facing ALB to front our ECS-based application, using the public-vpc.yml template.

aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name public-vpc \
--template-body file://cfn-templates/public-vpc.yml \
--capabilities CAPABILITY_NAMED_IAM

Next, we will create the ECS cluster and an Amazon ECS Task Definition using the public-subnet-public-loadbalancer.yml template. Again, the Task Definition defines how ECS will deploy our application using AWS Fargate. Amazon Fargate allows you to run containers without having to manage servers or clusters. No EC2 instances to manage! Woot! Below, in the CloudFormation template, we see the ContainerDefinitions section of the TaskDefinition resource that contains three container definitions. Note the three images and their ECR locations.

TaskDefinition:
Type: AWS::ECS::TaskDefinition
DependsOn: CloudWatchLogsGroup
Properties:
Family: !Ref 'ServiceNameClient'
Cpu: !Ref 'ContainerCpu'
Memory: !Ref 'ContainerMemory'
NetworkMode: awsvpc
RequiresCompatibilities:
FARGATE
EC2
ExecutionRoleArn:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'ECSTaskExecutionRole']]
TaskRoleArn:
Fn::If:
'HasCustomRole'
!Ref 'Role'
!Ref 'AWS::NoValue'
ContainerDefinitions:
Name: nlp-client
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/nlp-client:1.1.0']]
PortMappings:
ContainerPort: !Ref ContainerPortClient
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: NLP_CLIENT_PORT
Value: !Ref ContainerPortClient
Name: RAKE_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortRake]]
Name: PROSE_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortProse]]
Name: LANG_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortLang]]
Name: API_KEY
Value: !Ref ApiKey
Name: rake-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref VendorAccountId, 'dkr.ecr', !Ref VendorEcrRegion, 'amazonaws.com/rake-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: RAKE_PORT
Value: !Ref ContainerPortRake
Name: API_KEY
Value: !Ref ApiKey
Name: prose-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/prose-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: PROSE_PORT
Value: !Ref ContainerPortProse
Name: API_KEY
Value: !Ref ApiKey
Name: lang-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/lang-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: LANG_PORT
Value: !Ref ContainerPortLang
Name: API_KEY
Value: !Ref ApiKey

Execute the following command to create the ECS cluster and an ECS Task Definition using the CloudFormation template.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export API_KEY=SuP3r5eCRetAutHK3y
aws cloudformation create-stack \
--stack-name public-subnet-public-loadbalancer \
--template-body file://cfn-templates/public-subnet-public-loadbalancer.yml \
--parameters \
ParameterKey=VendorAccountId,ParameterValue=${ISV_ACCOUNT} \
ParameterKey=VendorEcrRegion,ParameterValue=${ISV_ECR_REGION} \
ParameterKey=ApiKey,ParameterValue=${API_KEY} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the expected output from the CloudFormation Management Console.

If you want to update the ECS Task Definition, simply run the aws cloudformation update-stack command.

CloudWatch Container Insights

The CloudFormation template does not enable CloudWatch Container Insights by default. Container Insights collects, aggregates, and summarizes metrics and logs from your containerized applications. To enable Insights, execute the following command:

aws ecs put-account-setting \
--name "containerInsights" --value "enabled"

Confirming the Cross-account Policy

If everything went right in the previous steps, we should now have an ECS cluster running our containerized application, including the container built from the vendor’s Docker image. Below, we see an example of the ECS cluster displayed in the management console.

Within the ECR cluster, we should observe a single running ECS Service. According to AWS, Amazon ECS allows you to run and maintain a specified number of instances of a task definition simultaneously in an Amazon ECS cluster; this is referred to as a Service.

We are running two instances of each container on ECS, thus two copies of the task within a single service. Each task runs its containers in a different Availability Zone for high availability.

Drilling into the service, note the new ALB associated with the new VPC, two public subnets, and the corresponding security group.

Switching to the Task Definitions tab, note that the ECS Task contains four container definitions that comprise the NLP API. Three images originated from the customer’s ECR repositories, and one from the vendor’s ECR repository.

Drilling in further, we will see the details of each container definition, including environment variables, passed from ECR to the container and on to the Golang binary running in the container.

Accessing the NLP API on ECS

With our earlier Docker Swarm example, the curl command was issued against localhost. We now have a public-facing Application Load Balancer (ALB) in front of our ECS-based application. We will use the DNS name of the ALB to hit our application on ECS. The DNS address (A Record) can be obtained from the Load Balancer Management Console, as shown below, or from the Output tab of the public-vpc CloudFormation Stack.

Another difference between the earlier Docker Swarm example and ECS is the port. Although the edge service, nlp-client, runs on port :8080, the ALB acts as a reverse proxy, passing requests from port :80 on the ALB to port :8080 of the nlp-client container instances (actually, the shared ENI of the running task).

Although I did set up a custom domain name for the ALB using Route53 and enabled HTTPS (port 443 on the ELB), https://nlp-ecs.example-api.com, for the sake of brevity, I will not go into the details in this post.

To test our deployed ECS, we can use a tool like curl or Postman to test the API’s endpoints. Don’t forget to you will need to add the API Key for authentication using the X-API-Key header key/value pair. Below we see a successful GET against the /routes endpoint, using Postman.

Here we see a successful POST against the /keywords endpoint, using Postman.

Cleaning Up

To clean up the demonstration’s AWS resources and Docker Stack, run the following scripts in the appropriate AWS accounts. Importantly, similar to S3, you must delete all the Docker images in the ECR repositories first, before deleting the repository, or else you will receive a CloudFormation error. This includes untagged images.

# local docker stack
docker stack rm nlp
# customer account
aws ecr batch-delete-image \
--repository-name nlp-client \
--image-ids imageTag=1.1.0
aws ecr batch-delete-image \
--repository-name prose-app \
--image-ids imageTag=1.1.0
aws ecr batch-delete-image \
--repository-name lang-app \
--image-ids imageTag=1.1.0
aws cloudformation delete-stack \
--stack-name ecr-repo-nlp-client
aws cloudformation delete-stack \
--stack-name ecr-repo-prose-app
aws cloudformation delete-stack \
--stack-name ecr-repo-lang-app
aws cloudformation delete-stack \
--stack-name public-subnet-public-loadbalancer
aws cloudformation delete-stack \
--stack-name public-vpc
aws cloudformation delete-stack \
--stack-name development-user-group-customer
# vendor account
aws ecr batch-delete-image \
--repository-name rake-app \
--image-ids imageTag=1.1.0
aws cloudformation delete-stack \
--stack-name ecr-repo-rake-app
aws cloudformation delete-stack \
--stack-name development-user-group-isv

Conclusion

In the preceding post, we saw how multiple AWS accounts could share private ECR-based Docker images. There are variations and restrictions to the configuration of the ECR Repository Policies, depending on the deployment tools you are using, such as AWS CodeBuild, AWS CodeDeploy, or AWS Elastic Beanstalk. AWS does a good job of providing some examples in their documentation, including Amazon ECR Repository Policy Examples and Amazon Elastic Container Registry Identity-Based Policy Examples.

In late 2020, AWS released Amazon Elastic Container Registry Public (ECR Public). Although this post was about private images, for public images, ECR Public allows you to store, manage, share, and deploy container images for anyone to discover and download globally.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Employing Amazon Macie to Discover and Protect Sensitive Data in your Amazon S3-based Data Lake

Introduction

Working with Analytics customers, it’s not uncommon to see data lakes with a dozen or more discrete data sources. Data typically originates from sources both internal and external to the customer. Internal data may come from multiple teams, departments, divisions, and enterprise systems. External data comes from vendors, partners, public sources, and subscriptions to licensed data sources. The volume, velocity, variety, veracity, and method of delivery vary across the data sources. All this data is being fed into data lakes for purposes such as analytics, business intelligence, and machine learning.

Given the growing volumes of incoming data and variations amongst data sources, it is increasingly complex, expensive, and time-consuming for organizations to ensure compliance with relevant laws, policies, and regulations. Regulations that impact how data is handled in a data lake include the Organizations Health Insurance Portability and Accountability Act (HIPAA), General Data Privacy Regulation (GDPR), Payment Card Industry Data Security Standard (PCI DSS), California Consumer Privacy Act (CCPA), and the Federal Information Security Management Act (FISMA).

Data Lake

AWS defines a data lake as a centralized repository that allows you to store all your structured and unstructured data at any scale. Once in the data lake, you run different types of analytics — from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

Data in a data lake is regularly organized or separated by its stage in the analytics process. Incoming data is often referred to as raw data. Data is then processed — cleansed, filtered, enriched, and tokenized if necessary. Lastly, the data is analyzed and aggregated, and the results are written back to the data lake. The analyzed and aggregated data is used to build business intelligence dashboards and reports, machine learning models, and is delivered to downstream or external systems. The different categories of data — raw, processed, and aggregated, are frequently referred to as bronze, silver, and gold, a reference to their overall data quality or value.

Protecting the Data Lake

Imagine you’ve received a large volume of data from an external data source. The incoming data is cleansed, filtered, and enriched. The data is re-formatted, partitioned, compressed for analytical efficiency, and written back to the data lake. Your analytics pipelines run complex and time-consuming queries against the data. Unfortunately, while building reports for a set of stakeholders, you realize that the original data accidentally included credit card information and other sensitive information about your customers. In addition to being out of compliance, you have the wasted time and expense of the initial data processing, as well as the extra time and expense to replace and re-process the data. The solution — Amazon Macie.

Amazon Macie

According to AWS, Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to discover and protect your sensitive data stored in Amazon Simple Storage Service (Amazon S3). Macie’s alerts, or findings, can be searched, filtered, and sent to Amazon EventBridge, formerly called Amazon CloudWatch Events, for easy integration with existing workflow or event management systems, or to be used in combination with AWS services, such as AWS Step Functions or Amazon Managed Workflows for Apache Airflow (MWAA) to take automated remediation actions.

Amazon Macie’s Summary view

Data Discovery and Protection

In this post, we will deploy an automated data inspection workflow to examine sample data in an S3-based data lake. Amazon Macie will examine data files uploaded to an encrypted S3 bucket. If sensitive data is discovered within the files, the files will be moved to an encrypted isolation bucket for further investigation. Email and SMS text alerts will be sent. This workflow will leverage Amazon EventBridge, Amazon Simple Notification Service (Amazon SNS), AWS Lambda, and AWS Systems Manager Parameter Store.

Macie data inspection workflow architecture

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/macie-demo.git

AWS resources for this post can be deployed using AWS Cloud​Formation. To follow along, you will need recent versions of Python 3, Boto3, and the AWS CLI version 2, installed.

Sample Data

We will use synthetic patient data, freely available from the MITRE Corporation. The data was generated by Synthea, MITRE’s open-source, synthetic patient generator that models the medical history of synthetic patients. Synthea data is exported in a variety of data standards, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Download and unzip the CSV files from the Synthea website.

REMOTE_FILE="synthea_sample_data_csv_apr2020.zip"
wget "https://storage.googleapis.com/synthea-public/${REMOTE_FILE}"
unzip -j "${REMOTE_FILE}" -d synthea_data/

The sixteen CSV data files contain a total of 471,852 rows of data, including column headers.

> wc -l *.csv

      598 allergies.csv
    3,484 careplans.csv
    8,377 conditions.csv
       79 devices.csv
   53,347 encounters.csv
      856 imaging_studies.csv
   15,479 immunizations.csv
   42,990 medications.csv
  299,698 observations.csv
    1,120 organizations.csv
    1,172 patients.csv
    3,802 payer_transitions.csv
       11 payers.csv
   34,982 procedures.csv
    5,856 providers.csv
        1 supplies.csv
  ------------------------------
  471,852 total

Amazon Macie Custom Data Identifier

To demonstrate some of the advanced features of Amazon Macie, we will use three Custom Data Identifiers. According to Macie’s documentation, a custom data identifier is a set of criteria that you define that reflects your organization’s particular proprietary data — for example, employee IDs, customer account numbers, or internal data classifications. We will create three custom data identifiers to detect the specific Synthea-format Patient ID, US driver number, and US passport number columns.

Post’s three custom data identifiers

The custom data identifiers in this post use a combination of regular expressions (regex) and keywords. The identifiers are designed to work with structured data, such as CSV files. Macie reports text that matches the regex pattern if any of these keywords are in the name of the column or field that stores the text, or if the text is within the maximum match distance of one of these words in a field value. Macie supports a subset of the regex pattern syntax provided by the Perl Compatible Regular Expressions (PCRE) library.

Patient ID custom data identifier console

Enable Macie

Before creating a CloudFormation stack with this demonstration’s resources, you will need to enable Amazon Macie from the AWS Management Console, or use the macie2 API and the AWS CLI with the enable-macie command.

aws macie2 enable-macie

Macie can also be enabled for your multi-account AWS Organization. The enable-organization-admin-account command designates an account as the delegated Amazon Macie administrator account for an AWS organization. For more information, see Managing multiple accounts in Amazon Macie.

AWS_ACCOUNT=111222333444
aws macie2 enable-organization-admin-account \
--admin-account-id ${AWS_ACCOUNT}

CloudFormation Stack

To create the CloudFormation stack with the supplied template, cloudformation/macie_demo.yml, run the following AWS CLI command. You will need to include an email address and phone number as input parameters. These parameter values will be used to send email and text alerts when Macie produces a sensitive data finding.

Please make sure you understand all the potential cost and security implications of creating the CloudFormation stack before continuing.

SNS_PHONE="+12223334444"
SNS_EMAIL="your-email-address@email.com"

aws cloudformation create-stack \
--stack-name macie-demo \
--template-body file://cloudformation/macie_demo.yml \
--parameters ParameterKey=SNSTopicEndpointSms,ParameterValue=${SNS_PHONE} \
ParameterKey=SNSTopicEndpointEmail,ParameterValue=${SNS_EMAIL} \
--capabilities CAPABILITY_NAMED_IAM

As shown in the AWS CloudFormation console, the new macie-demo stack will contain twenty-one AWS resources.

CloudFormation stack successfully created

Upload Data

Next, with the stack deployed, upload the CSV format data files to the encrypted S3 bucket, representing your data lake. The target S3 bucket has the following naming convention, synthea-data-<aws_account_id>-<region>. You can retrieve the two new bucket names from AWS Systems Manager Parameter Store, which were written there by CloudFormation, using the ssm API.

aws ssm get-parameters-by-path \
--path /macie_demo/ \
--query 'Parameters[*].Value'

Use the following ssm and s3 API commands to upload the data files.

DATA_BUCKET=$(aws ssm get-parameter \
--name /macie_demo/patient_data_bucket \
--query 'Parameter.Value')
aws s3 cp synthea_data/ \
    "s3://$(eval echo ${DATA_BUCKET})/patient_data/" --recursive

You should end up with sixteen CSV files in the S3 bucket, totaling approximately 82.3 MB.

Synthea patient data files uploaded to in S3

Sensitive Data Discovery Jobs

With the CloudFormation stack created and the patient data files uploaded, we will create two sensitive data discovery jobs. These jobs will scan the contents of the encrypted S3 bucket for sensitive data and report the findings. According to the documentation, you can configure a sensitive data discovery job to run only once for on-demand analysis and assessment, or on a recurring basis for periodic analysis, assessment, and monitoring. For this demonstration, we will create a one-time sensitive data discovery job using the AWS CLI. We will also create a recurring sensitive data discovery job using the AWS SDK for Python (Boto3). Both jobs can also be created from within Macie’s Jobs console.

Creating a new job Macie’s Jobs console

For both sensitive data discovery jobs, we will include the three custom data identifiers. Each of the custom data identifiers has a unique ID. We will need all three IDs to create the two sensitive data discovery jobs. You can use the AWS CLI and the macie2 API to retrieve the values.

aws macie2 list-custom-data-identifiers --query 'items[*].id'

Next, modify the job_specs/macie_job_specs_1x.json file, adding the three custom data identifier IDs. Also, update your AWS account ID and S3 bucket name (lines 3–5, 12, and 14). Note that since all the patient data files are in CSV format, we will limit our inspection to only files with a csv file extension (lines 18–33).

{
"customDataIdentifierIds": [
"custom-data-identifier-id-1",
"custom-data-identifier-id-2",
"custom-data-identifier-id-3"
],
"description": "Review Synthea patient data (1x)",
"jobType": "ONE_TIME",
"s3JobDefinition": {
"bucketDefinitions": [
{
"accountId": "111222333444",
"buckets": [
"synthea-data-111222333444-us-east-1"
]
}
],
"scoping": {
"includes": {
"and": [
{
"simpleScopeTerm": {
"comparator": "EQ",
"key": "OBJECT_EXTENSION",
"values": [
"csv"
]
}
}
]
}
}
},
"tags": {
"KeyName": "Project",
"KeyValue": "Amazon Macie Demo"
}
}

The above JSON template was generated using the standard AWS CLI generate-cli-skeleton command.

aws macie2 create-classification-job --generate-cli-skeleton

To create a one-time sensitive data discovery job using the above JSON template, run the following AWS CLI command. The unique job name will be dynamically generated based on the current time.

aws macie2 create-classification-job \
--name $(echo "SyntheaPatientData_${EPOCHSECONDS}") \
--cli-input-json file://job_specs/macie_job_specs_1x.json

In the Amazon Macie Jobs console, we can see a one-time sensitive data discovery job running. With a sampling depth of 100, the job will take several minutes to run. The samplingPercentage job property can be adjusted to scan any percentage of the data. If this value is less than 100, Macie selects the objects to analyze at random, up to the specified percentage and analyzes all the data in those objects.

One-time sensitive data discovery job running

Once the job is completed, the findings will be available in Macie’s Findings console. Using the three custom data identifiers in addition to Macie’s managed data identifiers, there should be a total of fifteen findings from the Synthea patient data files in S3. There should be six High severity findings and nine Medium severity findings. Of those, three are of a Personal finding type, seven of a Custom Identifier finding type, and five of a Multiple finding type, having both Personal and Custom Identifier finding types.

Macie’s Findings console displaying the results of the one-time job

Isolating High Severity Findings

The data inspection workflow we have deployed uses an AWS Lambda function, macie-object-mover, to isolate all data files with High severity findings to a second S3 bucket. The offending files are copied to the isolation bucket and deleted from the source bucket.

#!/usr/bin/env python3
# Purpose: Lambda function that moves S3 objects flagged by Macie
# Author: Gary A. Stafford (March 2021)
import json
import logging
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
def lambda_handler(event, context):
logging.info(f'event: {json.dumps(event)}')
destination_bucket_name = 'macie-isolation-111222333444-us-east-1'
source_bucket_name = event['detail']['resourcesAffected']['s3Bucket']['name']
file_key_name = event['detail']['resourcesAffected']['s3Object']['key']
copy_source_object = {'Bucket': source_bucket_name, 'Key': file_key_name}
logging.debug(f'destination_bucket_name: {destination_bucket_name}')
logging.debug(f'source_bucket_name: {source_bucket_name}')
logging.debug(f'file_key_name: {file_key_name}')
try:
response = s3_client.copy_object(
CopySource=copy_source_object,
Bucket=destination_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
try:
response = s3_client.delete_object(
Bucket=source_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
return {
'statusCode': 200,
'body': json.dumps(copy_source_object)
}
view raw lambda_function.py hosted with ❤ by GitHub

Amazon EventBridge

According to Macie’s documentation, to support integration with other applications, services, and systems, such as monitoring or event management systems, Amazon Macie automatically publishes findings to Amazon EventBridge as finding events. Amazon EventBridge is a serverless event bus that makes it easier to build event-driven applications at scale using events generated from your applications, integrated Software-as-a-Service (SaaS) applications, and AWS services.

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on any of the custom data identifiers, macie-rule-custom, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

Post’s event rules, shown in the Amazon EventBridge console

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on one of the three custom data identifiers, macie-rule-high, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

{
"source": [
"aws.macie"
],
"detail-type": [
"Macie Finding"
],
"detail": {
"classificationDetails": {
"result": {
"customDataIdentifiers": {
"detections": {
"name": [
"Patient ID",
"US Passport",
"US Driver License"
]
}
}
}
}
}
}

Six data files, containing High severity findings, will be moved to the isolation bucket by the Lambda, triggered by EventBridge.

Isolation bucket containing data files with High severity findings

Scheduled Sensitive Data Discovery Jobs

Data sources commonly deliver data on a repeated basis, such as nightly data feeds. For these types of data sources, we can schedule sensitive data discovery jobs to run on a scheduled basis. For this demonstration, we will create a scheduled job using the AWS SDK for Python (Boto3). Unlike the AWS CLI-based one-time job, you don’t need to modify the project’s script, scripts/create_macie_job_daily.py. The Python script will retrieve your AWS account ID and three custom data identifier IDs. The Python script then runs the create_classification_job command.

#!/usr/bin/env python3
# Purpose: Create Daily Macie classification job – Synthea patient data
# Author: Gary A. Stafford (March 2021)
import logging
import sys
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
sts_client = boto3.client('sts')
macie_client = boto3.client('macie2')
def main():
params = get_parameters()
account_id = sts_client.get_caller_identity()['Account']
custom_data_identifiers = list_custom_data_identifiers()
create_classification_job(params['patient_data_bucket'], account_id, custom_data_identifiers)
def list_custom_data_identifiers():
"""Returns a list of all custom data identifier ids"""
custom_data_identifiers = []
try:
response = macie_client.list_custom_data_identifiers()
for item in response['items']:
custom_data_identifiers.append(item['id'])
return custom_data_identifiers
except ClientError as e:
logging.error(e)
sys.exit(e)
def create_classification_job(patient_data_bucket, account_id, custom_data_identifiers):
"""Create Daily Macie classification job"""
try:
response = macie_client.create_classification_job(
customDataIdentifierIds=custom_data_identifiers,
description='Review Synthea patient data (Daily)',
jobType='SCHEDULED',
initialRun=True,
name='SyntheaPatientData_Daily',
s3JobDefinition={
'bucketDefinitions': [
{
'accountId': account_id,
'buckets': [
patient_data_bucket
]
}
],
'scoping': {
'includes': {
'and': [
{
'simpleScopeTerm': {
'comparator': 'EQ',
'key': 'OBJECT_EXTENSION',
'values': [
'csv',
]
}
},
]
}
}
},
samplingPercentage=100,
scheduleFrequency={
'dailySchedule': {}
},
tags={
'Project': 'Amazon Macie Demo'
}
)
logging.debug(f'Response: {response}')
except ClientError as e:
logging.error(e)
sys.exit(e)
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'patient_data_bucket': ssm_client.get_parameter(Name='/macie_demo/patient_data_bucket')['Parameter']['Value']
}
return params
if __name__ == '__main__':
main()

To create the scheduled sensitive data discovery job, run the following command.

python3 ./scripts/create_macie_job_daily.py

The scheduleFrequency parameter is set to { 'dailySchedule': {} }. This value specifies a daily recurrence pattern for running the job. The initialRun parameter of the create_classification_job command is set to True. This will cause the new job to analyze all eligible objects immediately after the job is created, in addition to on a daily basis.

Scheduled sensitive data discovery job in an active/idle state

Conclusion

In this post, we learned how we can use Amazon Macie to discover and protect sensitive data in Amazon S3. We learned how to use automation to trigger alerts based on Macie’s findings and to isolate data files based on the types of findings. The post’s data inspection workflow can easily be incorporated into existing data lake ingestion pipelines to ensure the integrity of incoming data.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options

Introduction

For anyone new to Amazon Managed Workflows for Apache Airflow (Amazon MWAA), especially those used to managing their own Apache Airflow platform, Amazon MWAA’s configuration might appear to be a bit of a black box at first. This brief post will explore Amazon MWAA’s configuration — how to inspect it and how to modify it. We will use Airflow DAGs to review an MWAA environment’s airflow.cfg file, environment variables, and Python packages.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project in 2019.

With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. For more information on Amazon MWAA, read my last post, Running Spark Jobs on Amazon EMR with Apache Airflow.

Image for post
Apache Airflow UI

Source Code

The DAGs referenced in this post are available on GitHub. Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/aws-airflow-demo.git

Accessing Configuration

Environment Variables

Environment variables are an essential part of an MWAA environment’s configuration. There are various ways to examine the environment variables. You could use Airflow’s BashOperator to simply call the command, env, or the PythonOperator to call a Python iterator function, as shown below. A sample DAG, dags/get_env_vars.py, is included in the project.

def print_env_vars():
keys = str(os.environ.keys().replace("', '", "'|'").split("|")
keys.sort()
for key in keys:
print(key)
get_env_vars_operator = PythonOperator(
task_id='get_env_vars_task',
python_callable=print_env_vars
)
view raw get_env_vars.py hosted with ❤ by GitHub

The DAG’s PythonOperator will iterate over the MWAA environment’s environment variables and output them to the task’s log. Below is a snippet of an example task’s log.

[2020-12-25 23:59:07,170] {{standard_task_runner.py:78}} INFO – Job 272: Subtask get_env_vars_task
[2020-12-25 23:59:08,423] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-25 23:59:08,516] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONSOLE_LOGS_ENABLED': 'false'
[2020-12-25 23:59:08,689] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONSOLE_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:08,777] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_EMAIL': 'airflow@example.com'
[2020-12-25 23:59:08,877] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_ID': 'get_env_vars'
[2020-12-25 23:59:08,970] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_DAG_OWNER': 'airflow'
[2020-12-25 23:59:09,269] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CTX_TASK_ID': 'get_env_vars_task'
[2020-12-25 23:59:09,357] {{logging_mixin.py:112}} INFO – 'AIRFLOW_DAG_PROCESSING_LOGS_ENABLED': 'false'
[2020-12-25 23:59:09,552] {{logging_mixin.py:112}} INFO – 'AIRFLOW_DAG_PROCESSING_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:09,647] {{logging_mixin.py:112}} INFO – 'AIRFLOW_ENV_NAME': 'MyAirflowEnvironment'
[2020-12-25 23:59:09,729] {{logging_mixin.py:112}} INFO – 'AIRFLOW_HOME': '/usr/local/airflow'
[2020-12-25 23:59:09,827] {{logging_mixin.py:112}} INFO – 'AIRFLOW_SCHEDULER_LOGS_ENABLED': 'false'
[2020-12-25 23:59:12,915] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-25 23:59:12,986] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'
[2020-12-25 23:59:13,136] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__LOAD_EXAMPLES': 'False'
[2020-12-25 23:59:13,217] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__PARALLELISM': '10000'
[2020-12-25 23:59:14,531] {{logging_mixin.py:112}} INFO – 'AWS_DEFAULT_REGION': 'us-east-1'
[2020-12-25 23:59:14,565] {{logging_mixin.py:112}} INFO – 'AWS_EXECUTION_ENV': 'AWS_ECS_FARGATE'
[2020-12-25 23:59:14,616] {{logging_mixin.py:112}} INFO – 'AWS_REGION': 'us-east-1'
[2020-12-25 23:59:14,647] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_FILE': ''
[2020-12-25 23:59:14,679] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_LEVEL': '20'
[2020-12-25 23:59:14,711] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_REDIRECT': '1'
[2020-12-25 23:59:14,747] {{logging_mixin.py:112}} INFO – 'CELERY_LOG_REDIRECT_LEVEL': 'WARNING'
view raw airflow_env_vars.txt hosted with ❤ by GitHub

Airflow Configuration File

According to Airflow, the airflow.cfg file contains Airflow’s configuration. You can edit it to change any of the settings. The first time you run Apache Airflow, it creates an airflow.cfg configuration file in your AIRFLOW_HOME directory and attaches the configurations to your environment as environment variables.

Amazon MWAA doesn’t expose the airflow.cfg in the Apache Airflow UI of an environment. Although you can’t access it directly, you can view the airflow.cfg file. The configuration file is located in your AIRFLOW_HOME directory, /usr/local/airflow (~/airflow by default).

There are multiple ways to examine your MWAA environment’s airflow.cfg file. You could use Airflow’s PythonOperator to call a Python function that reads the contents of the file, as shown below. The function uses the AIRFLOW_HOME environment variable to locate and read the airflow.cfg. A sample DAG, dags/get_airflow_cfg.py, is included in the project.

def print_airflow_cfg():
with open(f"{os.getenv('AIRFLOW_HOME')}/airflow.cfg", 'r') as airflow_cfg:
file_contents = airflow_cfg.read()
print(f'\n{file_contents}')
get_airflow_cfg_operator = PythonOperator(
task_id='get_airflow_cfg_task',
python_callable=print_airflow_cfg
)
view raw get_airflow_cfg.py hosted with ❤ by GitHub

The DAG’s task will read the MWAA environment’s airflow.cfg file and output it to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 00:02:57,163] {{standard_task_runner.py:78}} INFO – Job 274: Subtask get_airflow_cfg_task
[2020-12-26 00:02:57,583] {{logging_mixin.py:112}} INFO –
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /usr/local/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Set this to True if you want to enable remote logging.
remote_logging = True
# Users must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id = aws_default
remote_base_log_folder = cloudwatch://arn:aws:logs:::log-group:airflow-logs:*
encrypt_s3_logs = False
# Logging level
logging_level = INFO
# Logging level for Flask-appbuilder UI
fab_logging_level = WARN
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# Example: logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
[aws_mwaa]
redirect_url = https://console.aws.amazon.com/
session_duration_minutes = 720
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080
# Default timezone to display all dates in the RBAC UI, can be UTC, system, or
# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the
# default value of core/default_timezone will be used
# Example: default_ui_timezone = America/New_York
default_ui_timezone = UTC
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080
view raw airflow_cfg_log.txt hosted with ❤ by GitHub

Customizing Airflow Configurations

While AWS doesn’t expose the airflow.cfg in the Apache Airflow UI of your environment, you can change the default Apache Airflow configuration options directly within the Amazon MWAA console and continue using all other settings in airflow.cfg. The configuration options changed in the Amazon MWAA console are translated into environment variables.

To customize the Apache Airflow configuration, change the default options directly on the Amazon MWAA console. Select Edit, add or modify configuration options and values in the Airflow configuration options menu, then select Save. For example, we can change Airflow’s default timezone (core.default_ui_timezone) to America/New_York.

Image for post
Amazon MWAA’s Airflow configuration options

Once the MWAA environment is updated, which may take several minutes, view your changes by re-running the DAG,dags/get_env_vars.py. Note the new configuration item on both lines 2 and 6 of the log snippet shown below. The configuration item appears on its own (AIRFLOW__CORE_DEFAULT__UI_TIMEZONE), as well as part of the AIRFLOW_CONFIG_SECRETS dictionary environment variable.

[2020-12-26 05:00:57,756] {{standard_task_runner.py:78}} INFO – Job 293: Subtask get_env_vars_task
[2020-12-26 05:00:58,158] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONFIG_SECRETS': '{"AIRFLOW__CORE__DEFAULT_UI_TIMEZONE":"America/New_York"}'
[2020-12-26 05:00:58,190] {{logging_mixin.py:112}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-26 05:01:00,537] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-26 05:01:00,578] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__DEFAULT_UI_TIMEZONE': 'America/New_York'
[2020-12-26 05:01:00,630] {{logging_mixin.py:112}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'

Using the MWAA API

We can also make configuration changes using the MWAA API. For example, to change the default Airflow UI timezone, call the MWAA API’s update-environment command using the AWS CLI. Include the --airflow-configuration-option parameter, passing the core.default_ui_timezone key/value pair as a JSON blob.

aws mwaa update-environment \
–name <your_environment_name> \
–airflow-configuration-options """{
\"core.default_ui_timezone\": \"America/Los_Angeles\"
}"""

To review an environment’s configuration, use the get-environment command in combination with jq.

aws mwaa get-environment \
–name <your_environment_name> | \
jq -r '.Environment.AirflowConfigurationOptions'

Below, we see an example of the output.

{
"core.default_ui_timezone": "America/Los_Angeles"
}

Python Packages

Airflow is written in Python, and workflows are created via Python scripts. Python packages are a crucial part of an MWAA environment’s configuration. According to the documentation, an ‘extra package’, is a Python subpackage that is not included in the Apache Airflow base, installed on your MWAA environment. As part of setting up an MWAA environment, you can specify the location of the requirements.txt file in the Airflow S3 bucket. Extra packages are installed using the requirements.txt file.

Image for post
Amazon MWAA environment’s configuration

There are several ways to check your MWAA environment’s installed Python packages and versions. You could use Airflow’s BashOperator to call the command, python3 -m pip list. A sample DAG, dags/get_py_pkgs.py, is included in the project.

list_python_packages_operator = BashOperator(
task_id='list_python_packages',
bash_command='python3 -m pip list'
)
view raw get_py_pkgs.py hosted with ❤ by GitHub

The DAG’s task will output a list of all Python packages and package versions to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 21:53:06,310] {{bash_operator.py:136}} INFO – Temporary script location: /tmp/airflowtmp2whgp_p8/list_python_packagesxo8slhc6
[2020-12-26 21:53:06,350] {{bash_operator.py:146}} INFO – Running command: python3 -m pip list
[2020-12-26 21:53:06,395] {{bash_operator.py:153}} INFO – Output:
[2020-12-26 21:53:06,750] {{bash_operator.py:157}} INFO – Package Version
[2020-12-26 21:53:06,786] {{bash_operator.py:157}} INFO – ———————- ———
[2020-12-26 21:53:06,815] {{bash_operator.py:157}} INFO – alembic 1.4.2
[2020-12-26 21:53:06,856] {{bash_operator.py:157}} INFO – amqp 2.6.1
[2020-12-26 21:53:06,898] {{bash_operator.py:157}} INFO – apache-airflow 1.10.12
[2020-12-26 21:53:06,929] {{bash_operator.py:157}} INFO – apispec 1.3.3
[2020-12-26 21:53:06,960] {{bash_operator.py:157}} INFO – argcomplete 1.12.0
[2020-12-26 21:53:07,002] {{bash_operator.py:157}} INFO – attrs 19.3.0
[2020-12-26 21:53:07,036] {{bash_operator.py:157}} INFO – Babel 2.8.0
[2020-12-26 21:53:07,071] {{bash_operator.py:157}} INFO – billiard 3.6.3.0
[2020-12-26 21:53:07,960] {{bash_operator.py:157}} INFO – boto3 1.16.10
[2020-12-26 21:53:07,993] {{bash_operator.py:157}} INFO – botocore 1.19.10
[2020-12-26 21:53:08,028] {{bash_operator.py:157}} INFO – cached-property 1.5.1
[2020-12-26 21:53:08,061] {{bash_operator.py:157}} INFO – cattrs 1.0.0
[2020-12-26 21:53:08,096] {{bash_operator.py:157}} INFO – celery 4.4.7
[2020-12-26 21:53:08,130] {{bash_operator.py:157}} INFO – certifi 2020.6.20
[2020-12-26 21:53:12,260] {{bash_operator.py:157}} INFO – pandas 1.1.0
[2020-12-26 21:53:12,289] {{bash_operator.py:157}} INFO – pendulum 1.4.4
[2020-12-26 21:53:12,490] {{bash_operator.py:157}} INFO – pip 9.0.3
[2020-12-26 21:53:12,522] {{bash_operator.py:157}} INFO – prison 0.1.3
[2020-12-26 21:53:12,550] {{bash_operator.py:157}} INFO – prometheus-client 0.8.0
[2020-12-26 21:53:12,580] {{bash_operator.py:157}} INFO – psutil 5.7.2
[2020-12-26 21:53:12,613] {{bash_operator.py:157}} INFO – pycparser 2.20
[2020-12-26 21:53:12,641] {{bash_operator.py:157}} INFO – pycurl 7.43.0.5
[2020-12-26 21:53:12,676] {{bash_operator.py:157}} INFO – Pygments 2.6.1
[2020-12-26 21:53:12,710] {{bash_operator.py:157}} INFO – PyGreSQL 5.2.1
[2020-12-26 21:53:12,746] {{bash_operator.py:157}} INFO – PyJWT 1.7.1

Conclusion

Understanding your Amazon MWAA environment’s airflow.cfg file, environment variables, and Python packages are all important for proper Airflow platform management. This brief post learned more about Amazon MWAA’s configuration — how to inspect it using DAGs and how to modify it through the Amazon MWAA console.

, , , ,

Leave a comment

Getting Started with Presto Federated Queries using Ahana’s PrestoDB Sandbox on AWS

Introduction

According to The Presto Foundation, Presto (aka PrestoDB), not to be confused with PrestoSQL, is an open-source, distributed, ANSI SQL compliant query engine. Presto is designed to run interactive ad-hoc analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto is used in production at an immense scale by many well-known organizations, including Facebook, Twitter, Uber, Alibaba, Airbnb, Netflix, Pinterest, Atlassian, Nasdaq, and more.

In the following post, we will gain a better understanding of Presto’s ability to execute federated queries, which join multiple disparate data sources without having to move the data. Additionally, we will explore Apache Hive, the Hive Metastore, Hive partitioned tables, and the Apache Parquet file format.

Presto on AWS

There are several options for Presto on AWS. AWS recommends Amazon EMR and Amazon Athena. Presto comes pre-installed on EMR 5.0.0 and later. The Athena query engine is a derivation of Presto 0.172 and does not support all of Presto’s native features. However, Athena has many comparable features and deep integrations with other AWS services. If you need full, fine-grain control, you could deploy and manage Presto, yourself, on Amazon EC2, Amazon ECS, or Amazon EKS. Lastly, you may decide to purchase a Presto distribution with commercial support from an AWS Partner, such as Ahana or Starburst. If your organization needs 24x7x365 production-grade support from experienced Presto engineers, this is an excellent choice.

Federated Queries

In a modern Enterprise, it is rare to find all data living in a monolithic datastore. Given the multitude of available data sources, internal and external to an organization, and the growing number of purpose-built databases, analytics engines must be able to join and aggregate data across many sources efficiently. AWS defines a federated query as a capability that ‘enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources.

Presto allows querying data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, and MongoDB. In fact, there are currently 24 different Presto data source connectors available. With Presto, we can write queries that join multiple disparate data sources, without moving the data. Below is a simple example of a Presto federated query statement that correlates a customer’s credit rating with their age and gender. The query federates two different data sources, a PostgreSQL database table, postgresql.public.customer, and an Apache Hive Metastore table, hive.default.customer_demographics, whose underlying data resides in Amazon S3.

WITH credit_demographics AS (
SELECT
(year (now()) c_birth_year) AS age,
cd_credit_rating AS credit_rating,
cd_gender AS gender,
count(cd_gender) AS gender_count
FROM
postgresql.public.customer
LEFT JOIN hive.default.customer_demographics ON c_current_cdemo_sk = cd_demo_sk
WHERE
c_birth_year IS NOT NULL
AND cd_credit_rating IS NOT NULL
AND lower(cd_credit_rating) != 'unknown'
AND cd_gender IS NOT NULL
GROUP BY
cd_credit_rating,
c_birth_year,
cd_gender
)
SELECT
age,
credit_rating,
gender,
gender_count
FROM
credit_demographics
WHERE
age BETWEEN 21 AND 65
ORDER BY
age,
credit_rating,
gender;

Ahana

The Linux Foundation’s Presto Foundation member, Ahana, was founded as the first company focused on bringing PrestoDB-based ad hoc analytics offerings to market and working to foster growth and evangelize the Presto community. Ahana’s mission is to simplify ad hoc analytics for organizations of all shapes and sizes. Ahana has been successful in raising seed funding, led by GV (formerly Google Ventures). Ahana’s founders have a wealth of previous experience in tech companies, including Alluxio, Kinetica, Couchbase, IBM, Apple, Splunk, and Teradata.

PrestoDB Sandbox

This post will use Ahana’s PrestoDB Sandbox, an Amazon Linux 2, AMI-based solution available on AWS Marketplace, to execute Presto federated queries.

Ahana’s PrestoDB Sandbox AMI allows you to easily get started with Presto to query data wherever your data resides. This AMI configures a single EC2 instance Sandbox to be both the Presto Coordinator and a Presto Worker. It comes with an Apache Hive Metastore backed by PostgreSQL bundled in. In addition, the following catalogs are bundled in to try, test, and prototype with Presto:

  • JMX: useful for monitoring and debugging Presto
  • Memory: stores data and metadata in RAM, which is discarded when Presto restarts
  • TPC-DS: provides a set of schemas to support the TPC Benchmark DS
  • TPC-H: provides a set of schemas to support the TPC Benchmark H

Apache Hive

In this demonstration, we will use Apache Hive and an Apache Hive Metastore backed by PostgreSQL. Apache Hive is data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. The structure can be projected onto data already in storage. A command-line tool and JDBC driver are provided to connect users to Hive. The Metastore provides two essential features of a data warehouse: data abstraction and data discovery. Hive accomplishes both features by providing a metadata repository that is tightly integrated with the Hive query processing system so that data and metadata are in sync.

Getting Started

To get started creating federated queries with Presto, we first need to create and configure our AWS environment, as shown below.

Architecture of the demonstration’s AWS environment and resources

Subscribe to Ahana’s PrestoDB Sandbox

To start, subscribe to Ahana’s PrestoDB Sandbox on AWS Marketplace. Make sure you are aware of the costs involved. The AWS current pricing for the default, Linux-based r5.xlarge on-demand EC2 instance hosted in US East (N. Virginia) is USD 0.252 per hour. For the demonstration, since performance is not an issue, you could try a smaller EC2 instance, such as r5.large instance costs USD 0.126 per hour.

The configuration process will lead you through the creation of an EC2 instance based on Ahana’s PrestoDB Sandbox AMI.

I chose to create the EC2 instance in my default VPC. Part of the demonstration includes connecting to Presto locally using JDBC. Therefore, it was also necessary to include a public IP address for the EC2 instance. If you chose to do so, I strongly recommend limiting the required ports 22 and 8080 in the instance’s Security Group to just your IP address (a /32 CIDR block).

Limiting access to ports 22 and 8080 from only my current IP address

Lastly, we need to assign an IAM Role to the EC2 instance, which has access to Amazon S3. I assigned the AWS managed policy, AmazonS3FullAccess, to the EC2’s IAM Role.

Attaching the AmazonS3FullAccess AWS managed policy to the Role

Part of the configuration also asks for a key pair. You can use an existing key or create a new key for the demo. For reference in future commands, I am using a key named ahana-presto and my key path of ~/.ssh/ahana-presto.pem. Be sure to update the commands to match your own key’s name and location.

Once complete, instructions for using the PrestoDB Sandbox EC2 are provided.

You can view the running EC2 instance, containing Presto, from the web-based AWS EC2 Management Console. Make sure to note the public IPv4 address or the public IPv4 DNS address as this value will be required during the demo.

AWS CloudFormation

We will use Amazon RDS for PostgreSQL and Amazon S3 as additional data sources for Presto. Included in the project files on GitHub is an AWS CloudFormation template, cloudformation/presto_ahana_demo.yaml. The template creates a single RDS for PostgreSQL instance in the default VPC and an encrypted Amazon S3 bucket.

AWSTemplateFormatVersion: "2010-09-09"
Description: "This template deploys a RDS PostgreSQL database and an Amazon S3 bucket"
Parameters:
DBInstanceIdentifier:
Type: String
Default: "ahana-prestodb-demo"
DBEngine:
Type: String
Default: "postgres"
DBEngineVersion:
Type: String
Default: "12.3"
DBAvailabilityZone:
Type: String
Default: "us-east-1f"
DBInstanceClass:
Type: String
Default: "db.t3.medium"
DBStorageType:
Type: String
Default: "gp2"
DBAllocatedStorage:
Type: Number
Default: 20
DBName:
Type: String
Default: "shipping"
DBUser:
Type: String
Default: "presto"
DBPassword:
Type: String
Default: "5up3r53cr3tPa55w0rd"
# NoEcho: True
Resources:
MasterDatabase:
Type: AWS::RDS::DBInstance
Properties:
DBInstanceIdentifier:
Ref: DBInstanceIdentifier
DBName:
Ref: DBName
AllocatedStorage:
Ref: DBAllocatedStorage
DBInstanceClass:
Ref: DBInstanceClass
StorageType:
Ref: DBStorageType
Engine:
Ref: DBEngine
EngineVersion:
Ref: DBEngineVersion
MasterUsername:
Ref: DBUser
MasterUserPassword:
Ref: DBPassword
AvailabilityZone: !Ref DBAvailabilityZone
PubliclyAccessible: true
Tags:
Key: Project
Value: "Demo of RDS PostgreSQL"
DataBucket:
DeletionPolicy: Retain
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Outputs:
Endpoint:
Description: "Endpoint of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Address
Port:
Description: "Port of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Port
JdbcConnString:
Description: "JDBC connection string of RDS PostgreSQL database"
Value: !Join
""
– "jdbc:postgresql://"
!GetAtt MasterDatabase.Endpoint.Address
":"
!GetAtt MasterDatabase.Endpoint.Port
"/"
!Ref DBName
"?user="
!Ref DBUser
"&password="
!Ref DBPassword
Bucket:
Description: "Name of Amazon S3 data bucket"
Value: !Ref DataBucket

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/presto-aws-federated-queries.git

To create the AWS CloudFormation stack from the template, cloudformation/rds_s3.yaml, execute the following aws cloudformation command. Make sure you change the DBAvailabilityZone parameter value (shown in bold) to match the AWS Availability Zone in which your Ahana PrestoDB Sandbox EC2 instance was created. In my case, us-east-1f.

aws cloudformation create-stack \
--stack-name ahana-prestodb-demo \
--template-body file://cloudformation/presto_ahana_demo.yaml \
--parameters ParameterKey=DBAvailabilityZone,ParameterValue=us-east-1f

To ensure the RDS for PostgreSQL database instance can be accessed by Presto running on the Ahana PrestoDB Sandbox EC2, manually add the PrestoDB Sandbox EC2’s Security Group to port 5432 within the database instance’s VPC Security Group’s Inbound rules. I have also added my own IP to port 5432, which enables me to connect to the RDS instance directly from my IDE using JDBC.

The AWS CloudFormation stack’s Outputs tab includes a set of values, including the JDBC connection string for the new RDS for PostgreSQL instance, JdbcConnString, and the Amazon S3 bucket’s name, Bucket. All these values will be required during the demonstration.

Preparing the PrestoDB Sandbox

There are a few steps we need to take to properly prepare the PrestoDB Sandbox EC2 for our demonstration. First, use your PrestoDB Sandbox EC2 SSH key to scp the properties and sql directories to the Presto EC2 instance. First, you will need to set the EC2_ENDPOINT value (shown in bold) to your EC2’s public IPv4 address or public IPv4 DNS value. You can hardcode the value or use the aws ec2 API command is shown below to retrieve the value programmatically.

# on local workstation
EC2_ENDPOINT=$(aws ec2 describe-instances \
--filters "Name=product-code,Values=ejee5zzmv4tc5o3tr1uul6kg2" \
"Name=product-code.type,Values=marketplace" \
--query "Reservations[*].Instances[*].{Instance:PublicDnsName}" \
--output text)
scp -i "~/.ssh/ahana-presto.pem" \
-r properties/ sql/ \
ec2-user@${EC2_ENDPOINT}:~/
ssh -i "~/.ssh/ahana-presto.pem" ec2-user@${EC2_ENDPOINT}

Environment Variables

Next, we need to set several environment variables. First, replace the DATA_BUCKET and POSTGRES_HOST values below (shown in bold) to match your environment. The PGPASSWORD value should be correct unless you changed it in the CloudFormation template. Then, execute the command to add the variables to your .bash_profile file.

echo """
export DATA_BUCKET=prestodb-demo-databucket-CHANGE_ME
export POSTGRES_HOST=presto-demo.CHANGE_ME.us-east-1.rds.amazonaws.com
export PGPASSWORD=5up3r53cr3tPa55w0rd
export JAVA_HOME=/usr
export HADOOP_HOME=/home/ec2-user/hadoop
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/tools/lib/*
export HIVE_HOME=/home/ec2-user/hive
export PATH=$HIVE_HOME/bin:$HADOOP_HOME/bin:$PATH
""" >>~/.bash_profile

Optionally, I suggest updating the EC2 instance with available updates and install your favorite tools, likehtop, to monitor the EC2 performance.

yes | sudo yum update
yes | sudo yum install htop
View of htop running on an r5.xlarge EC2 instance

Before further configuration for the demonstration, let’s review a few aspects of the Ahana PrestoDB EC2 instance. There are several applications pre-installed on the instance, including Java, Presto, Hadoop, PostgreSQL, and Hive. Versions shown are current as of early September 2020.

java -version
# openjdk version "1.8.0_252"
# OpenJDK Runtime Environment (build 1.8.0_252-b09)
# OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
hadoop version
# Hadoop 2.9.2
postgres --version
# postgres (PostgreSQL) 9.2.24
psql --version
# psql (PostgreSQL) 9.2.24
hive --version
# Hive 2.3.7
presto-cli --version
# Presto CLI 0.235-cb21100

The Presto configuration files are in the /etc/presto/ directory. The Hive configuration files are in the ~/hive/conf/ directory. Here are a few commands you can use to gain a better understanding of their configurations.

ls /etc/presto/
cat /etc/presto/jvm.config
cat /etc/presto/config.properties
cat /etc/presto/node.properties
# installed and configured catalogs
ls /etc/presto/catalog/
cat ~/hive/conf/hive-site.xml

Configure Presto

To configure Presto, we need to create and copy a new Presto postgresql catalog properties file for the newly created RDS for PostgreSQL instance. Modify the properties/rds_postgresql.properties file, replacing the value, connection-url (shown in bold), with your own JDBC connection string, shown in the CloudFormation Outputs tab.

connector.name=postgresql
connection-url=jdbc:postgresql://presto-demo.abcdefg12345.us-east-1.rds.amazonaws.com:5432/shipping
connection-user=presto
connection-password=5up3r53cr3tPa55w0rd

Move the rds_postgresql.properties file to its correct location using sudo.

sudo mv properties/rds_postgresql.properties /etc/presto/catalog/

We also need to modify the existing Hive catalog properties file, which will allow us to write to non-managed Hive tables from Presto.

connector.name=hive-hadoop2
hive.metastore.uri=thrift://localhost:9083
hive.non-managed-table-writes-enabled=true

The following command will overwrite the existing hive.properties file with the modified version containing the new property.

sudo mv properties/hive.properties |
/etc/presto/catalog/hive.properties

To finalize the configuration of the catalog properties files, we need to restart Presto. The easiest way is to reboot the EC2 instance, then SSH back into the instance. Since our environment variables are in the .bash_profile file, they will survive a restart and logging back into the EC2 instance.

sudo reboot

Add Tables to Apache Hive Metastore

We will use RDS for PostgreSQL and Apache Hive Metastore/Amazon S3 as additional data sources for our federated queries. The Ahana PrestoDB Sandbox instance comes pre-configured with Apache Hive and an Apache Hive Metastore, backed by PostgreSQL (a separate PostgreSQL 9.x instance pre-installed on the EC2).

The Sandbox’s instance of Presto comes pre-configured with schemas for the TPC Benchmark DS (TPC-DS). We will create identical tables in our Apache Hive Metastore, which correspond to three external tables in the TPC-DS data source’s sf1 schema: tpcds.sf1.customer, tpcds.sf1.customer_address, and tpcds.sf1.customer_demographics. A Hive external table describes the metadata/schema on external files. External table files can be accessed and managed by processes outside of Hive. As an example, here is the SQL statement that creates the external customer table in the Hive Metastore and whose data will be stored in the S3 bucket.

CREATE EXTERNAL TABLE IF NOT EXISTS `customer`(
`c_customer_sk` bigint,
`c_customer_id` char(16),
`c_current_cdemo_sk` bigint,
`c_current_hdemo_sk` bigint,
`c_current_addr_sk` bigint,
`c_first_shipto_date_sk` bigint,
`c_first_sales_date_sk` bigint,
`c_salutation` char(10),
`c_first_name` char(20),
`c_last_name` char(30),
`c_preferred_cust_flag` char(1),
`c_birth_day` integer,
`c_birth_month` integer,
`c_birth_year` integer,
`c_birth_country` char(20),
`c_login` char(13),
`c_email_address` char(50),
`c_last_review_date_sk` bigint)
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

The threeCREATE EXTERNAL TABLE SQL statements are included in the sql/ directory: sql/hive_customer.sql, sql/hive_customer_address.sql, and sql/hive_customer_demographics.sql. The bucket name (shown in bold above), needs to be manually updated to your own bucket name in all three files before continuing.

Next, run the following hive commands to create the external tables in the Hive Metastore within the existing default schema/database.

hive --database default -f sql/hive_customer.sql
hive --database default -f sql/hive_customer_address.sql
hive --database default -f sql/hive_customer_demographics.sql

To confirm the tables were created successfully, we could use a variety of hive commands.

hive --database default -e "SHOW TABLES;"
hive --database default -e "DESCRIBE FORMATTED customer;"
hive --database default -e "SELECT * FROM customer LIMIT 5;"
Using the ‘DESCRIBE FORMATTED customer_address;’ Hive command

Alternatively, you can also create the external table interactively from within Hive, using the hive command to access the CLI. Copy and paste the contents of the SQL files to the hive CLI. To exit hive use quit;.

Interactively querying within Apache Hive

Amazon S3 Data Source Setup

With the external tables created, we will now select all the data from each of the three tables in the TPC-DS data source and insert that data into the equivalent Hive tables. The physical data will be written to Amazon S3 as highly-efficient, columnar storage format, SNAPPY-compressed Apache Parquet files. Execute the following commands. I will explain why the customer_address table statements are a bit different, next.

# inserts 100,000 rows
presto-cli --execute """
INSERT INTO hive.default.customer
SELECT * FROM tpcds.sf1.customer;
"""
# inserts 50,000 rows across 52 partitions
presto-cli --execute """
INSERT INTO hive.default.customer_address
SELECT ca_address_sk, ca_address_id, ca_street_number,
ca_street_name, ca_street_type, ca_suite_number,
ca_city, ca_county, ca_zip, ca_country, ca_gmt_offset,
ca_location_type, ca_state
FROM tpcds.sf1.customer_address
ORDER BY ca_address_sk;
"""
# add new partitions in metastore
hive -e "MSCK REPAIR TABLE default.customer_address;"
# inserts 1,920,800 rows
presto-cli --execute """
INSERT INTO hive.default.customer_demographics
SELECT * FROM tpcds.sf1.customer_demographics;
"""

Confirm the data has been loaded into the correct S3 bucket locations and is in Parquet-format using the AWS Management Console or AWS CLI. Rest assured, the Parquet-format data is SNAPPY-compressed even though the S3 console incorrectly displays Compression as None. You can easily confirm the compression codec with a utility like parquet-tools.

Data organized by key prefixes in Amazon S3
Using S3’s ‘Select from’ feature to preview the SNAPPY-compressed Parquet format data

Partitioned Tables

The customer_address table is unique in that it has been partitioned by the ca_state column. Partitioned tables are created using the PARTITIONED BY clause.

CREATE EXTERNAL TABLE `customer_address`(
`ca_address_sk` bigint,
`ca_address_id` char(16),
`ca_street_number` char(10),
`ca_street_name` char(60),
`ca_street_type` char(15),
`ca_suite_number` char(10),
`ca_city` varchar(60),
`ca_county` varchar(30),
`ca_zip` char(10),
`ca_country` char(20),
`ca_gmt_offset` double precision,
`ca_location_type` char(20)
)
PARTITIONED BY (`ca_state` char(2))
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

According to Apache Hive, a table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Since the data for the Hive tables are stored in Amazon S3, that means that when the data is written to the customer_address table, it is automatically separated into different S3 key prefixes based on the state. The data is physically “partitioned”.

customer_address data, partitioned by the state, in Amazon S3

Whenever add new partitions in S3, we need to run the MSCK REPAIR TABLE command to add that table’s new partitions to the Hive Metastore.

hive -e "MSCK REPAIR TABLE default.customer_address;"

In SQL, a predicate is a condition expression that evaluates to a Boolean value, either true or false. Defining the partitions aligned with the attributes that are frequently used in the conditions/filters (predicates) of the queries can significantly increase query efficiency. When we execute a query that uses an equality comparison condition, such as ca_state = 'TN', partitioning means the query will only work with a slice of the data in the corresponding ca_state=TN prefix key. There are 50,000 rows of data in the customer_address table, but only 1,418 rows (2.8% of the total data) in the ca_state=TN partition. With the additional advantage of Parquet format with SNAPPY compression, partitioning can significantly reduce query execution time.

Adding Data to RDS for PostgreSQL Instance

For the demonstration, we will also replicate the schema and data of the tpcds.sf1.customer_address table to the new RDS for PostgreSQL instance’s shipping database.

CREATE TABLE customer_address (
ca_address_sk bigint,
ca_address_id char(16),
ca_street_number char(10),
ca_street_name char(60),
ca_street_type char(15),
ca_suite_number char(10),
ca_city varchar(60),
ca_county varchar(30),
ca_state char(2),
ca_zip char(10),
ca_country char(20),
ca_gmt_offset double precision,
ca_location_type char(20)
);

Like Hive and Presto, we can create the table programmatically from the command line or interactively; I prefer the programmatic approach. Use the following psql command, we can create the customer_address table in the public schema of the shipping database.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-f sql/postgres_customer_address.sql

Now, to insert the data into the new PostgreSQL table, run the following presto-cli command.

# inserts 50,000 rows
presto-cli --execute """
INSERT INTO rds_postgresql.public.customer_address
SELECT * FROM tpcds.sf1.customer_address;
"""

To confirm that the data was imported properly, we can use a variety of commands.

-- Should be 50000 rows in table
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT COUNT(*) FROM customer_address;"
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT * FROM customer_address LIMIT 5;"

Alternatively, you could use the PostgreSQL client interactively by copying and pasting the contents of the sql/postgres_customer_address.sql file to the psql command prompt. To interact with PostgreSQL from the psql command prompt, use the following command.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto

Use the \dt command to list the PostgreSQL tables and the \q command to exit the PostgreSQL client. We now have all the new data sources created and configured for Presto!

Interacting with Presto

Presto provides a web interface for monitoring and managing queries. The interface provides dashboard-like insights into the Presto Cluster and queries running on the cluster. The Presto UI is available on port 8080 using the public IPv4 address or the public IPv4 DNS.

There are several ways to interact with Presto, via the PrestoDB Sandbox. The post will demonstrate how to execute ad-hoc queries against Presto from an IDE using a JDBC connection and the Presto CLI. Other options include running queries against Presto from Java and Python applications, Tableau, or Apache Spark/PySpark.

Below, we see a query being run against Presto from JetBrains PyCharm, using a Java Database Connectivity (JDBC) connection. The advantage of using an IDE like JetBrains is having a single visual interface, including all the project files, multiple JDBC configurations, output results, and the ability to run multiple ad hoc queries.

Below, we see an example of configuring the Presto Data Source using the JDBC connection string, supplied in the CloudFormation stack Outputs tab.

Make sure to download and use the latest Presto JDBC driver JAR.

With JetBrains’ IDEs, we can even limit the databases/schemas displayed by the Data Source. This is helpful when we have multiple Presto catalogs configured, but we are only interested in certain data sources.

We can also run queries using the Presto CLI, three different ways. We can pass a SQL statement to the Presto CLI, pass a file containing a SQL statement to the Presto CLI, or work interactively from the Presto CLI. Below, we see a query being run, interactively from the Presto CLI.

As the query is running, we can observe the live Presto query statistics (not very user friendly in my terminal).

And finally, the view the query results.

Federated Queries

The example queries used in the demonstration and included in the project were mainly extracted from the scholarly article, Why You Should Run TPC-DS: A Workload Analysis, available as a PDF on the tpc.org website. I have modified the SQL queries to work with Presto.

In the first example, we will run the three versions of the same basic query statement. Version 1 of the query is not a federated query; it only queries a single data source. Version 2 of the query queries two different data sources. Finally, version 3 of the query queries three different data sources. Each of the three versions of the SQL statement should return the same results — 93 rows of data.

Version 1: Single Data Source

The first version of the query statement, sql/presto_query2.sql, is not a federated query. Each of the query’s four tables (catalog_returns, date_dim, customer, and customer_address) reference the TPC-DS data source, which came pre-installed with the PrestoDB Sandbox. Note table references on lines 11–13 and 41–42 are all associated with the tpcds.sf1 schema.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
catalog_returns,
date_dim,
customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
customer_address,
customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;
view raw presto_query2.sql hosted with ❤ by GitHub

We will run each query non-interactively using the presto-cli. We will choose the sf1 (scale factor of 1) tpcds schema. According to Presto, every unit in the scale factor (sf1, sf10, sf100) corresponds to a gigabyte of data.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2.sql \
--output-format ALIGNED \
--client-tags "presto_query2"

Below, we see the query results in the presto-cli.

Below, we see the first query running in Presto’s web interface.

Below, we see the first query’s results detailed in Presto’s web interface.

Version 2: Two Data Sources

In the second version of the query statement, sql/presto_query2_federated_v1.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. The other two tables (customer and customer_address) now reference the Apache Hive Metastore for their schema and underlying data in Amazon S3. Note table references on lines 11 and 12, as opposed to lines 13, 41, and 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
hive.default.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
hive.default.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v1.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v1"

Below, we see the second query’s results detailed in Presto’s web interface.

Even though the data is in two separate and physically different data sources, we can easily query it as though it were all in the same place.

Version 3: Three Data Sources

In the third version of the query statement, sql/presto_query2_federated_v2.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. One of the tables (hive.default.customer) references the Apache Hive Metastore. The fourth table (rds_postgresql.public.customer_address) references the new RDS for PostgreSQL database instance. The underlying data is in Amazon S3. Note table references on lines 11 and 12, and on lines 13 and 41, as opposed to line 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
rds_postgresql.public.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
rds_postgresql.public.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, we have run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v2.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v2"

Below, we see the third query’s results detailed in Presto’s web interface.

Again, even though the data is in three separate and physically different data sources, we can easily query it as though it were all in the same place.

Additional Query Examples

The project contains several additional query statements, which I have extracted from Why You Should Run TPC-DS: A Workload Analysis and modified work with Presto and federate across multiple data sources.

# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1.sql \
--output-format ALIGNED \
--client-tags "presto_query1"
# federated - two sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query1_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4.sql \
--output-format ALIGNED \
--client-tags "presto_query4"
# federated - three sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query4_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query5.sql \
--output-format ALIGNED \
--client-tags "presto_query5"

Conclusion

In this post, we gained a better understanding of Presto using Ahana’s PrestoDB Sandbox product from AWS Marketplace. We learned how Presto queries data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, MongoDB, etc. We also learned about Apache Hive and the Apache Hive Metastore, Apache Parquet file format, and how and why to partition Hive data in Amazon S3. Most importantly, we learned how to write federated queries that join multiple disparate data sources without having to move the data into a single monolithic data store.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

Leave a comment

Collecting and Analyzing IoT Data in Near Real-Time with AWS IoT, LoRa, and LoRaWAN

Introduction

In a recent post published on ITNEXT, LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT, we explored the use of the LoRa (Long Range) and LoRaWAN protocols to transmit and receive sensor data, over a substantial distance, between an IoT device, containing several embedded sensors, and an IoT gateway. In this post, we will extend that architecture to the Cloud, using AWS IoT, a broad and deep set of IoT services, from the edge to the Cloud. We will securely collect, transmit, and analyze IoT data using the AWS cloud platform.

LoRa and LoRaWAN

According to the LoRa Alliance, Low-Power, Wide-Area Networks (LPWAN) are projected to support a major portion of the billions of devices forecasted for the Internet of Things (IoT). LoRaWAN is designed from the bottom up to optimize LPWANs for battery lifetime, capacity, range, and cost. LoRa and LoRaWAN permit long-range connectivity for IoT devices in different types of industries. According to Wikipedia, LoRaWAN defines the communication protocol and system architecture for the network, while the LoRa physical layer enables the long-range communication link.

AWS IoT

AWS describes AWS IoT as a set of managed services that enable ‘internet-connected devices to connect to the AWS Cloud and lets applications in the cloud interact with internet-connected devices.’ AWS IoT services span three categories: Device Software, Connectivity and Control, and Analytics.

In this post, we will focus on three AWS IOT services, one from each category, including AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics. According to AWS, the AWS IoT Device SDKs include open-source libraries and developer and porting guides with samples to help you build innovative IoT products or solutions on your choice of hardware platforms. AWS IoT Core is a managed cloud service that lets connected devices easily and securely interact with cloud applications and other devices. AWS IoT Core can process and route messages to AWS endpoints and other devices reliably and securely. Finally, AWS IoT Analytics is a fully-managed IoT analytics service, designed specifically for IoT, which collects, pre-processes, enriches, stores, and analyzes IoT device data at scale.

To learn more about AWS IoT, specifically the AWS IoT services we will be exploring within this post, I recommend reading my recent post published on Towards Data Science, Getting Started with IoT Analytics on AWS.

Hardware Selection

In this post, we will use the following hardware.

IoT Device with Embedded Sensors

An Arduino single-board microcontroller will serve as our IoT device. The 3.3V AI-enabled Arduino Nano 33 BLE Sense board (Amazon: USD 36.00), released in August 2019, comes with the powerful nRF52840 processor from Nordic Semiconductors, a 32-bit ARM Cortex-M4 CPU running at 64 MHz, 1MB of CPU Flash Memory, 256KB of SRAM, and a NINA-B306 stand-alone Bluetooth 5 low energy (BLE) module.

The Sense contains an impressive array of embedded sensors:

  • 9-axis Inertial Sensor (LSM9DS1): 3D digital linear acceleration sensor, a 3D digital
    angular rate sensor, and a 3D digital magnetic sensor
  • Humidity and Temperature Sensor (HTS221): Capacitive digital sensor for relative humidity and temperature
  • Barometric Sensor (LPS22HB): MEMS nano pressure sensor: 260–1260 hectopascal (hPa) absolute digital output barometer
  • Microphone (MP34DT05): MEMS audio sensor omnidirectional digital microphone
  • Gesture, Proximity, Light Color, and Light Intensity Sensor (APDS9960): Advanced Gesture detection, Proximity detection, Digital Ambient Light Sense (ALS), and Color Sense (RGBC).

The Arduino Sense is an excellent, low-cost single-board microcontroller for learning about the collection and transmission of IoT sensor data.

IoT Gateway

An IoT Gateway, according to TechTarget, is a physical device or software program that serves as the connection point between the Cloud and controllers, sensors, and intelligent devices. All data moving to the Cloud, or vice versa, goes through the gateway, which can be either a dedicated hardware appliance or software program.

LoRa Gateways, to paraphrase The Things Network, form the bridge between devices and the Cloud. Devices use low power networks like LoRaWAN to connect to the Gateway, while the Gateway uses high bandwidth networks like WiFi, Ethernet, or Cellular to connect to the Cloud.

A third-generation Raspberry Pi 3 Model B+ single-board computer (SBC) will serve as our LoRa IoT Gateway. This Raspberry Pi model features a 1.4GHz Cortex-A53 (ARMv8) 64-bit quad-core processor System on a Chip (SoC), 1GB LPDDR2 SDRAM, dual-band wireless LAN, Bluetooth 4.2 BLE, and Gigabit Ethernet (Amazon: USD 42.99).

LoRa Transceiver Modules

To transmit the IoT sensor data between the IoT device, containing the embedded sensors, and the IoT gateway, I have used the REYAX RYLR896 LoRa transceiver module (Amazon: USD 19.50 x 2). The transceiver modules are commonly referred to as a universal asynchronous receiver-transmitter (UART). A UART is a computer hardware device for asynchronous serial communication in which the data format and transmission speeds are configurable.

According to the manufacturer, REYAX, the RYLR896 contains the Semtech SX1276 long-range, low power transceiver. The RYLR896 module provides ultra-long range spread spectrum communication and high interference immunity while minimizing current consumption. Each RYLR896 module contains a small, PCB integrated, helical antenna. This transceiver operates at both the 868 and 915 MHz frequency ranges. In this demonstration, we will be transmitting at 915 MHz for North America.

The Arduino Sense (IoT device) transmits data, using one of the RYLR896 modules (shown below front). The Raspberry Pi (IoT Gateway), connected to the other RYLR896 module (shown below rear), receives the data.

LoRaWAN Security

The RYLR896 is capable of AES 128-bit data encryption. Using the Advanced Encryption Standard (AES), we will encrypt the data sent from the IoT device to the IoT gateway, using a 32 hex digit password (128 bits / 4 bits/hex digit).

Provisioning AWS Resources

To start, we will create the necessary AWS IoT and associated resources on the AWS cloud platform. Once these resources are in place, we can then proceed to configure the IoT device and IoT gateway to securely transmit the sensor data to the Cloud.

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone --branch master --single-branch --depth 1 --no-tags \
https://github.com/garystafford/iot-aws-lora-demo.git

AWS CloudFormation

The CloudFormation template, iot-analytics.yaml, will create an AWS IoT CloudFormation stack containing the following resources.

  • AWS IoT Thing
  • AWS IoT Thing Policy
  • AWS IoT Core Topic Rule
  • AWS IoT Analytics Channel, Pipeline, Data store, and Data set
  • AWS Lambda and Lambda Permission
  • Amazon S3 Bucket
  • Amazon SageMaker Notebook Instance
  • AWS IAM Roles

Please be aware of the costs involved with the AWS resources used in the CloudFormation template before continuing. To create the AWS CloudFormation stack from the included CloudFormation template, execute the following AWS CLI command.

aws cloudformation create-stack \
–stack-name lora-iot-demo \
–template-body file://cloudformation/iot-analytics.yaml \
–parameters ParameterKey=ProjectName,ParameterValue=lora-iot-demo \
ParameterKey=IoTTopicName,ParameterValue=lora-iot-demo \
–capabilities CAPABILITY_NAMED_IAM

The resulting CloudFormation stack should contain 16 AWS resources.

Additional Resources

Unfortunately, AWS CloudFormation cannot create all the AWS IoT resources we require for this demonstration. To complete the AWS provisioning process, execute the following series of AWS CLI commands, aws_cli_commands.md. These commands will create the remaining resources, including an AWS IoT Thing Type, Thing Group, Thing Billing Group, and an X.509 Certificate.

# LoRaWAN / AWS IoT Demo
# Author: Gary Stafford
# Run AWS CLI commands after CloudFormation stack completes successfully
# variables
thingName=lora-iot-gateway-01
thingPolicy=LoRaDevicePolicy
thingType=LoRaIoTGateway
thingGroup=LoRaIoTGateways
thingBillingGroup=LoRaIoTGateways
mkdir ${thingName}
aws iot create-keys-and-certificate \
–certificate-pem-outfile "${thingName}/${thingName}.cert.pem" \
–public-key-outfile "${thingName}/${thingName}.public.key" \
–private-key-outfile "${thingName}/${thingName}.private.key" \
–set-as-active
# assuming you only have one certificate registered
certificate=$(aws iot list-certificates | jq '.[][] | .certificateArn')
## alternately, for a specific certificate if you have more than one
# aws iot list-certificates
## then change the value below
# certificate=arn:aws:iot:us-east-1:123456789012:cert/<certificate>
aws iot attach-policy \
–policy-name $thingPolicy \
–target $certificate
aws iot attach-thing-principal \
–thing-name $thingName \
–principal $certificate
aws iot create-thing-type \
–thing-type-name $thingType \
–thing-type-properties "thingTypeDescription=LoRaWAN IoT Gateway"
aws iot create-thing-group \
–thing-group-name $thingGroup \
–thing-group-properties "thingGroupDescription=\"LoRaWAN IoT Gateway Thing Group\", attributePayload={attributes={Manufacturer=RaspberryPiFoundation}}"
aws iot add-thing-to-thing-group \
–thing-name $thingName \
–thing-group-name $thingGroup
aws iot create-billing-group \
–billing-group-name $thingBillingGroup \
–billing-group-properties "billingGroupDescription=\"Gateway Billing Group\""
aws iot add-thing-to-billing-group \
–thing-name $thingName \
–billing-group-name $thingBillingGroup
aws iot update-thing \
–thing-name $thingName \
–thing-type-name $thingType \
–attribute-payload "{\"attributes\": {\"GatewayMfr\":\"RaspberryPiFoundation\", \"LoRaMfr\":\"REYAX\", \"LoRaModel\":\"RYLR896\"}}"
aws iot describe-thing \
–thing-name $thingName
view raw aws_cli_commands.sh hosted with ❤ by GitHub

IoT Device Configuration

With the AWS resources deployed, we can configure the IoT device and IoT Gateway.

Arduino Sketch

For those not familiar with Arduino, a sketch is the name that Arduino uses for a program. It is the unit of code that is uploaded into non-volatile flash memory and runs on an Arduino board. The Arduino language is a set of C and C++ functions. All standard C and C++ constructs supported by the avr-g++ compiler should work in Arduino.

For this post, the sketch, lora_iot_demo_aws.ino, contains the code necessary to collect and securely transmit the environmental sensor data, including temperature, relative humidity, barometric pressure, Red, Green, and Blue (RGB) color, and ambient light intensity, using the LoRaWAN protocol.

/*
Description: Transmits Arduino Nano 33 BLE Sense sensor telemetry over LoRaWAN,
including temperature, humidity, barometric pressure, and color,
using REYAX RYLR896 transceiver modules
http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
Author: Gary Stafford
*/
#include <Arduino_HTS221.h>
#include <Arduino_LPS22HB.h>
#include <Arduino_APDS9960.h>
const int UPDATE_FREQUENCY = 5000; // update frequency in ms
const float CALIBRATION_FACTOR = –4.0; // temperature calibration factor (Celsius)
const int ADDRESS = 116;
const int NETWORK_ID = 6;
const String PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF";
const String DELIMITER = "|";
String uid = "";
void setup()
{
Serial.begin(9600);
Serial1.begin(115200); // default baud rate of module is 115200
delay(1000); // wait for LoRa module to be ready
// get unique transceiver id to identify iot device on network
Serial1.print((String)"AT+UID?\r\n");
uid = Serial1.readString();
uid.replace("+UID=", ""); // trim off '+UID=' at start of line
uid.replace("\r\n", ""); // trim off CR/LF at end of line
// needs all need to be same for receiver and transmitter
Serial1.print((String)"AT+ADDRESS=" + ADDRESS + "\r\n");
delay(200);
Serial1.print((String)"AT+NETWORKID=" + NETWORK_ID + "\r\n");
delay(200);
Serial1.print("AT+CPIN=" + PASSWORD + "\r\n");
delay(200);
Serial1.print("AT+CPIN?\r\n"); // confirm password is set
if (!HTS.begin())
{ // initialize HTS221 sensor
Serial.println("Failed to initialize humidity temperature sensor!");
while (1);
}
if (!BARO.begin())
{ // initialize LPS22HB sensor
Serial.println("Failed to initialize pressure sensor!");
while (1);
}
// avoid bad readings to start bug
// https://forum.arduino.cc/index.php?topic=660360.0
BARO.readPressure();
delay(1000);
if (!APDS.begin())
{ // initialize APDS9960 sensor
Serial.println("Failed to initialize color sensor!");
while (1);
}
}
void loop()
{
updateReadings();
delay(UPDATE_FREQUENCY);
}
void updateReadings()
{
float temperature = getTemperature(CALIBRATION_FACTOR);
float humidity = getHumidity();
float pressure = getPressure();
int colors[4];
getColor(colors);
String payload = buildPayload(temperature, humidity, pressure, colors);
Serial.println("Payload: " + payload); // display the payload for debugging
Serial1.print(payload); // send the payload over LoRaWAN WiFi
displayResults(temperature, humidity, pressure, colors); // display the results for debugging
}
float getTemperature(float calibration)
{
return HTS.readTemperature() + calibration;
}
float getHumidity()
{
return HTS.readHumidity();
}
float getPressure()
{
return BARO.readPressure();
}
void getColor(int c[])
{
// check if a color reading is available
while (!APDS.colorAvailable())
{
delay(5);
}
int r, g, b, a;
APDS.readColor(r, g, b, a);
c[0] = r;
c[1] = g;
c[2] = b;
c[3] = a;
}
// display for debugging purposes
void displayResults(float t, float h, float p, int c[])
{
Serial.println((String)"UID: " + uid);
Serial.print("Temperature: ");
Serial.println(t);
Serial.print("Humidity: ");
Serial.println(h);
Serial.print("Pressure: ");
Serial.println(p);
Serial.print("Color (r, g, b, a): ");
Serial.print(c[0]);
Serial.print(", ");
Serial.print(c[1]);
Serial.print(", ");
Serial.print(c[2]);
Serial.print(", ");
Serial.println(c[3]);
Serial.println("———-");
}
String buildPayload(float t, float h, float p, int c[])
{
String readings = "";
readings += uid;
readings += DELIMITER;
readings += t;
readings += DELIMITER;
readings += h;
readings += DELIMITER;
readings += p;
readings += DELIMITER;
readings += c[0];
readings += DELIMITER;
readings += c[1];
readings += DELIMITER;
readings += c[2];
readings += DELIMITER;
readings += c[3];
String payload = "";
payload += "AT+SEND=";
payload += ADDRESS;
payload += ",";
payload += readings.length();
payload += ",";
payload += readings;
payload += "\r\n";
return payload;
}

AT Commands

Communications with the RYLR896’s long-range modem is done using AT commands. AT commands are instructions used to control a modem. AT is the abbreviation of ATtention. Every command line starts with AT. That is why modem commands are called AT commands, according to Developer’s Home. A complete list of AT commands can be downloaded as a PDF from the RYLR896 product page.

To efficiently transmit the environmental sensor data from the IoT sensor to the IoT gateway, the sketch concatenates the sensor ID and the sensor values together in a single string. The string will be incorporated into an AT command, sent to the RYLR896 LoRa transceiver module. To make it easier to parse the sensor data on the IoT gateway, we will delimit the sensor values with a pipe (|), as opposed to a comma. According to REYAX, the maximum length of the LoRa payload is approximately 330 bytes.

Below, we see an example of an AT command used to send the sensor data from the IoT sensor and the corresponding unencrypted data received by the IoT gateway. Both contain the LoRa transmitter Address ID, payload length (62 bytes in the example), and the payload. The data received by the IoT gateway also has the Received signal strength indicator (RSSI), and Signal-to-noise ratio (SNR).

Receiving Data on IoT Gateway

The Raspberry Pi will act as a LoRa IoT gateway, receiving the environmental sensor data from the IoT device, the Arduino, and sending the data to AWS. The Raspberry Pi runs a Python script, rasppi_lora_receiver_aws.py, which will receive the data from the Arduino Sense, decrypt the data, parse the sensor values, and serialize the data to a JSON payload, and finally, transmit the payload in an MQTT-protocol message to AWS. The script uses the pyserial, the Python Serial Port Extension, which encapsulates the access for the serial port for communication with the RYLR896 module. The script uses the AWS IoT Device SDK for Python v2 to communicate with AWS.

import json
import logging
import sys
import threading
import time
from argparse import ArgumentParser
import serial
from awscrt import io, mqtt, auth, http, exceptions
from awsiot import mqtt_connection_builder
# LoRaWAN IoT Sensor Demo
# Using REYAX RYLR896 transceiver modules
# http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
# Author: Gary Stafford
# Requirements: python3 -m pip install –user -r requirements.txt
# Usage:
# sh ./rasppi_lora_receiver_aws.sh \
# b1d0fncxn1hf3m-ats.iot.us-east-1.amazonaws.com
# constants
ADDRESS = 116
NETWORK_ID = 6
PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF"
# global variables
count = 0 # from args
received_count = 0
received_all_event = threading.Event()
def main():
# get args
logging.basicConfig(filename='output.log',
filemode='w', level=logging.DEBUG)
args = get_args() # get args
payload = ""
lora_payload = {}
# set log level
io.init_logging(getattr(io.LogLevel, args.verbosity), 'stderr')
# spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
# set MQTT connection
mqtt_connection = set_mqtt_connection(args, client_bootstrap)
logging.debug("Connecting to {} with client ID '{}'…".format(
args.endpoint, args.client_id))
connect_future = mqtt_connection.connect()
# future.result() waits until a result is available
connect_future.result()
logging.debug("Connecting to REYAX RYLR896 transceiver module…")
serial_conn = serial.Serial(
port=args.tty,
baudrate=int(args.baud_rate),
timeout=5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
if serial_conn.isOpen():
logging.debug("Connected!")
set_lora_config(serial_conn)
check_lora_config(serial_conn)
while True:
# read data from serial port
serial_payload = serial_conn.readline()
logging.debug(serial_payload)
if len(serial_payload) >= 1:
payload = serial_payload.decode(encoding="utf-8")
payload = payload[:2]
try:
data = parse_payload(payload)
lora_payload = {
"ts": time.time(),
"data": {
"device_id": str(data[0]),
"gateway_id": str(args.gateway_id),
"temperature": float(data[1]),
"humidity": float(data[2]),
"pressure": float(data[3]),
"color": {
"red": float(data[4]),
"green": float(data[5]),
"blue": float(data[6]),
"ambient": float(data[7])
}
}
}
logging.debug(lora_payload)
except IndexError:
logging.error("IndexError: {}".format(payload))
except ValueError:
logging.error("ValueError: {}".format(payload))
# publish mqtt message
message_json = json.dumps(
lora_payload,
sort_keys=True,
indent=None,
separators=(',', ':'))
try:
mqtt_connection.publish(
topic=args.topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE)
except mqtt.SubscribeError as err:
logging.error(".SubscribeError: {}".format(err))
except exceptions.AwsCrtError as err:
logging.error("AwsCrtError: {}".format(err))
def set_mqtt_connection(args, client_bootstrap):
if args.use_websocket:
proxy_options = None
if args.proxy_host:
proxy_options = http.HttpProxyOptions(
host_name=args.proxy_host, port=args.proxy_port)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(
client_bootstrap)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=args.endpoint,
client_bootstrap=client_bootstrap,
region=args.signing_region,
credentials_provider=credentials_provider,
websocket_proxy_options=proxy_options,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
else:
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=args.endpoint,
cert_filepath=args.cert,
pri_key_filepath=args.key,
client_bootstrap=client_bootstrap,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
return mqtt_connection
def get_args():
parser = ArgumentParser(
description="Send and receive messages through and MQTT connection.")
parser.add_argument("–tty", required=True,
help="serial tty", default="/dev/ttyAMA0")
parser.add_argument("–baud-rate", required=True,
help="serial baud rate", default=1152000)
parser.add_argument('–endpoint', required=True, help="Your AWS IoT custom endpoint, not including a port. " +
"Ex: \"abcd123456wxyz-ats.iot.us-east-1.amazonaws.com\"")
parser.add_argument('–cert', help="File path to your client certificate, in PEM format.")
parser.add_argument('–key', help="File path to your private key, in PEM format.")
parser.add_argument('–root-ca', help="File path to root certificate authority, in PEM format. " +
"Necessary if MQTT server uses a certificate that's not already in " +
"your trust store.")
parser.add_argument('–client-id', default='samples-client-id',
help="Client ID for MQTT connection.")
parser.add_argument('–topic', default="samples/test",
help="Topic to subscribe to, and publish messages to.")
parser.add_argument('–message', default="Hello World!", help="Message to publish. " +
"Specify empty string to publish nothing.")
parser.add_argument('–count', default=0, type=int, help="Number of messages to publish/receive before exiting. " +
"Specify 0 to run forever.")
parser.add_argument('–use-websocket', default=False, action='store_true',
help="To use a websocket instead of raw mqtt. If you specify this option you must "
"specify a region for signing, you can also enable proxy mode.")
parser.add_argument('–signing-region', default='us-east-1',
help="If you specify –use-web-socket, this is the region that will be used for computing "
"the Sigv4 signature")
parser.add_argument('–proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " +
"you will likely need to set –root-ca to the ca for your proxy.")
parser.add_argument('–proxy-port', type=int, default=8080,
help="Port for proxy to connect to.")
parser.add_argument('–verbosity', choices=[x.name for x in io.LogLevel], default=io.LogLevel.NoLogs.name,
help='Logging level')
parser.add_argument("–gateway-id", help="IoT Gateway serial number")
args = parser.parse_args()
return args
def parse_payload(payload):
# input: +RCV=116,29,0447383033363932003C0034|23.94|37.71|99.89|16|38|53|80,-61,56
# output: [0447383033363932003C0034, 23.94, 37.71, 99.89, 16.0, 38.0, 53.0, 80.0]
payload = payload.split(",")
payload = payload[2].split("|")
payload = [i for i in payload]
return payload
def set_lora_config(serial_conn):
# configures the REYAX RYLR896 transceiver module
serial_conn.write(str.encode("AT+ADDRESS=" + str(ADDRESS) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID=" + str(NETWORK_ID) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network Id set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN=" + PASSWORD + "\r\n"))
time.sleep(1)
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES-128 password set? {}".format(serial_payload.decode(encoding="utf-8")))
def check_lora_config(serial_conn):
serial_conn.write(str.encode("AT?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Module responding? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+ADDRESS?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network id: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+IPR?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("UART baud rate: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+BAND?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF frequency: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CRFOP?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF output power: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+MODE?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Work mode: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+PARAMETER?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF parameters: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES128 password of the network: {}".format(serial_payload.decode(encoding="utf-8")))
# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
logging.error("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
logging.warning("Connection resumed. return_code: {} session_present: {}".format(
return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
logging.warning("Session did not persist. Resubscribing to existing topics…")
resubscribe_future, _ = connection.resubscribe_existing_topics()
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
logging.warning("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
logging.debug("Received message from topic '{}': {}".format(topic, payload))
global received_count
received_count += 1
if received_count == count:
received_all_event.set()
if __name__ == "__main__":
sys.exit(main())

Running the IoT Gateway Python Script

To run the Python script on the Raspberry Pi, we will use a helper shell script, rasppi_lora_receiver_aws.sh. The shell script helps construct the arguments required to execute the Python script.

#!/bin/bash
# LoRaWAN / AWS IoT Demo
# Author: Gary A. Stafford
# Revised: 2021-03-24
# Start IoT data collector script and tails output
# Usage:
# sh ./rasppi_lora_receiver_aws.sh a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com
if [[ $# -ne 1 ]]; then
echo "Script requires 1 parameter!"
exit 1
fi
# input parameters
ENDPOINT=$1 # e.g. a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com
DEVICE="lora-iot-gateway-01" # matches CloudFormation thing name
CERTIFICATE="${DEVICE}-cert.pem" # e.g. lora-iot-gateway-01-cert.pem
KEY="${DEVICE}-private.key" # e.g. lora-iot-gateway-01-private.key
GATEWAY_ID=$(< /proc/cpuinfo grep Serial | grep -oh "[a-z0-9]*$") # e.g. 00000000f62051ce
# output for debugging
echo "DEVICE: ${DEVICE}"
echo "ENDPOINT: ${ENDPOINT}"
echo "CERTIFICATE: ${CERTIFICATE}"
echo "KEY: ${KEY}"
echo "GATEWAY_ID: ${GATEWAY_ID}"
# call the python script
nohup python3 rasppi_lora_receiver_aws.py \
–endpoint "${ENDPOINT}" \
–cert "${DEVICE}-creds/${CERTIFICATE}" \
–key "${DEVICE}-creds/${KEY}" \
–root-ca "${DEVICE}-creds/AmazonRootCA1.pem" \
–client-id "${DEVICE}" \
–topic "lora-iot-demo" \
–gateway-id "${GATEWAY_ID}" \
–verbosity "Info" \
–tty "/dev/ttyAMA0" \
–baud-rate 115200 \
>collector.log 2>&1 </dev/null &
sleep 2
# tail the log (Control-C to exit)
tail -f collector.log

To run the helper script, we execute the following command, substituting the input parameter, the AWS IoT endpoint, with your endpoint.

sh ./rasppi_lora_receiver_aws.sh \
  a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com

You should see the console output, similar to the following.

The script starts by configuring the RYLR896 module and outputting that configuration to a log file, output.log. If successful, we should see the following debug information logged.

DEBUG:root:Connecting to a1b2c3d4e5f6-ats.iot.us-east-1.amazonaws.com with client ID 'lora-iot-gateway-01'
DEBUG:root:Connecting to REYAX RYLR896 transceiver module
DEBUG:root:Connected!
DEBUG:root:Address set? +OK
DEBUG:root:Network Id set? +OK
DEBUG:root:AES-128 password set? +OK
DEBUG:root:Module responding? +OK
DEBUG:root:Address: +ADDRESS=116
DEBUG:root:Network id: +NETWORKID=6
DEBUG:root:UART baud rate: +IPR=115200
DEBUG:root:RF frequency: +BAND=915000000
DEBUG:root:RF output power: +CRFOP=15
DEBUG:root:Work mode: +MODE=0
DEBUG:root:RF parameters: +PARAMETER=12,7,1,4
DEBUG:root:AES128 password of the network: +CPIN=92A0ECEC9000DA0DCF0CAAB0ABA2E0EF

That sensor data is also written to the log file for debugging purposes. This first line in the log (shown below) is the raw decrypted data received from the IoT device via LoRaWAN. The second line is the JSON-serialized payload, sent securely to AWS, using the MQTT protocol.

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.46|41.89|99.38|230|692|833|1116,-48,39\r\n'

DEBUG:root:{'ts': 1598305503.7041512, 'data': {'humidity': 41.89, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 230.0, 'blue': 833.0, 'ambient': 1116.0, 'green': 692.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.46|41.63|99.38|236|696|837|1127,-49,35\r\n'

DEBUG:root:{'ts': 1598305513.7918658, 'data': {'humidity': 41.63, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 236.0, 'blue': 837.0, 'ambient': 1127.0, 'green': 696.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.44|41.57|99.38|232|686|830|1113,-48,32\r\n'

DEBUG:root:{'ts': 1598305523.8556132, 'data': {'humidity': 41.57, 'temperature': 23.44, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 232.0, 'blue': 830.0, 'ambient': 1113.0, 'green': 686.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.51|41.44|99.38|205|658|802|1040,-48,36\r\n'

DEBUG:root:{'ts': 1598305528.8890748, 'data': {'humidity': 41.44, 'temperature': 23.51, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 205.0, 'blue': 802.0, 'ambient': 1040.0, 'green': 658.0}}}

AWS IoT Core

The Raspberry Pi-based IoT gateway will be registered with AWS IoT Core. IoT Core allows users to connect devices quickly and securely to AWS.

Things

According to AWS, IoT Core can reliably scale to billions of devices and trillions of messages. Registered devices are referred to as things in AWS IoT Core. A thing is a representation of a specific device or logical entity. Information about a thing is stored in the registry as JSON data.

Below, we see an example of the Thing created by CloudFormation. The Thing, lora-iot-gateway-01, represents the physical IoT gateway. We have assigned the IoT gateway a Thing Type, LoRaIoTGateway, a Thing Group, LoRaIoTGateways, and a Thing Billing Group, IoTGateways.

In a real IoT environment, containing hundreds, thousands, even millions of IoT devices, gateways, and sensors, these classification mechanisms, Thing Type, Thing Group, and Thing Billing Group, will help to organize IoT assets.

Device Gateway and Message Broker

IoT Core provides a Device Gateway, which manages all active device connections. The Gateway currently supports MQTT, WebSockets, and HTTP 1.1 protocols. Behind the Message Gateway is a high-throughput pub/sub Message Broker, which securely transmits messages to and from all IoT devices and applications with low latency. Below, we see a typical AWS IoT Core architecture containing multiple Topics, Rules, and Actions.

AWS IoT Security

AWS IoT Core provides mutual authentication and encryption, ensuring all data is exchanged between AWS and the devices are secure by default. In the demonstration, all data is sent securely using Transport Layer Security (TLS) 1.2 with X.509 digital certificates on port 443. Below, we see an example of an X.509 certificate assigned to the Thing, lora-iot-gateway-01, which represents the physical IoT gateway. The X.509 certificate and the private key, generated using the AWS CLI, previously, are installed on the IoT gateway.

Authorization of the device to access any resource on AWS is controlled by AWS IoT Core Policies. These policies are similar to AWS IAM Policies. Below, we see an example of an AWS IoT Core Policy, LoRaDevicePolicy, which is assigned to the IoT gateway.

AWS IoT Core Rules

Once an MQTT message is received from the IoT gateway (a thing), we use AWS IoT Rules to send message data to an AWS IoT Analytics Channel. Rules give your devices the ability to interact with AWS services. Rules are analyzed, and Actions are performed based on the MQTT topic stream. Below, we see an example rule that forwards our messages to an IoT Analytics Channel.

Rule query statements are written in standard Structured Query Language (SQL). The datasource for the Rule query is an IoT Topic.

SELECT
data.device_id,
data.gateway_id,
data.temperature,
data.humidity,
data.pressure,
data.color.red,
data.color.green,
data.color.blue,
data.color.ambient,
ts,
Clientid () AS device,
parse_time ("yyyy-MM-dd'T'HH:mm:ss.SSSZ", timestamp(), "UTC") AS msg_received
FROM
"${IoTTopicName}"
view raw iot_rule.sql hosted with ❤ by GitHub

AWS IoT Analytics

AWS IoT Analytics is composed of five primary components: Channels, Pipelines, Data stores, Data sets, and Notebooks. These components enable you to collect, prepare, store, analyze, and visualize your IoT data.

Below, we see a typical AWS IoT Analytics architecture. IoT messages are received from AWS IoT Core, thought a Rule Action. Amazon QuickSight provides business intelligence, visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting.

IoT Analytics Channel

An AWS IoT Analytics Channel pulls messages or data into IoT Analytics from other AWS sources, such as Amazon S3, Amazon Kinesis, or Amazon IoT Core. Channels store data for IoT Analytics Pipelines. Both Channels and Data store support storing data in your own Amazon S3 bucket or an IoT Analytics service-managed S3 bucket. In the demonstration, we are using a service managed S3 bucket.

When creating a Channel, you also decide how long to retain the data. For the demonstration, we have set the data retention period for 21 days. Channels are generally not used for long term storage of data. Typically, you would only retain data in the Channel for the period you need to analyze. For long term storage of IoT message data, I recommend using an AWS IoT Core Rule to send a copy of the raw IoT data to Amazon S3, using a service such as Amazon Kinesis Data Firehose.

IoT Analytics Pipeline

An AWS IoT Analytics Pipeline consumes messages from one or more Channels. Pipelines transform, filter, and enrich the messages before storing them in IoT Analytics Data stores. A Pipeline is composed of an ordered list of activities. Logically, you must specify both a Channel (source) and a Datastore (destination) activity. Optionally, you may choose as many as 23 additional activities in the pipelineActivities array.

In our demonstration’s Pipeline, iot_analytics_pipeline, we have specified three additional activities, including DeviceRegistryEnrich, Filter, and Lambda. Other activity types include Math, SelectAttributes, RemoveAttributes, and AddAttributes.

The Filter activity ensures the sensor values are not Null or otherwise erroneous; if true, the message is dropped. The Lambda Pipeline activity executes an AWS Lambda function to transform the messages in the pipeline. Messages are sent in an event object to the Lambda. The message is modified, and the event object is returned to the activity.

The Python-based Lambda function easily handles typical IoT data transformation tasks, including converting the temperature from Celsius to Fahrenheit, pressure from kilopascals (kPa) to inches of Mercury (inHg), and 12-bit RGBA values to 8-bit color values (0–255). The Lambda function also rounds down all the values to between 0 and 2 decimal places of precision.

def lambda_handler(event, context):
for e in event:
e['temperature'] = round((e['temperature'] * 1.8) + 32, 2)
e['humidity'] = round(e['humidity'], 2)
e['pressure'] = round((e['pressure'] / 3.3864), 2)
e['red'] = int(round(e['red'] / (4097 / 255), 0))
e['green'] = int(round(e['green'] / (4097 / 255), 0))
e['blue'] = int(round(e['blue'] / (4097 / 255), 0))
e['ambient'] = int(round(e['ambient'] / (4097 / 255), 0))
return event

The demonstration’s Pipeline also enriches the IoT data with metadata from the IoT device’s AWS IoT Core Registry. The metadata includes additional information about the device that generated the IoT data, including the custom attributes such as LoRa transceiver manufacturer and model, and the IoT gateway manufacturer.

A notable feature of Pipelines is the ability to reprocess messages. If you make changes to the Pipeline, which often happens during the data preparation stage, you can reprocess any or all the IoT data in the associated Channel, and overwrite the IoT data in the Data set.

IoT Analytics Data store

An AWS IoT Analytics Data store stores prepared data from an AWS IoT Analytics Pipeline, in a fully-managed database. Both Channels and Data store support storing IoT data in your own Amazon S3 bucket or an IoT Analytics managed S3 bucket. In the demonstration, we are using a service-managed S3 bucket to store the IoT data in our Data store, iot_analytics_data_store.

IoT Analytics Data set

An AWS IoT Analytics Data set automatically provides regular, up-to-date insights for data analysts by querying a Data store using standard SQL. Periodic updates are implemented using a cron expression. For the demonstration, we are updating our Data set, iot_analytics_data_set, at a 15-minute interval. The time interval can be increased or reduced, depending on the desired ‘near real-time’ nature of the IoT data being analyzed.

Below, we see messages in the Result preview pane of the Data set. Note the SQL query used to obtain the messages, which queries the Data store. The Data store, as you will recall, contains the transformed messages from the Pipeline.

IoT Analytics Data sets also support sending content results, which are materialized views of your IoT Analytics data, to an Amazon S3 bucket.

The CloudFormation stack created an encrypted Amazon S3 Bucket. This bucket receives a copy of the messages from the IoT Analytics Data set whenever the cron expression runs the scheduled update.

IoT Analytics Notebook

An AWS IoT Analytics Notebook allows users to perform statistical analysis and machine learning on IoT Analytics Data sets using Jupyter Notebooks. The IoT Analytics Notebook service includes a set of notebook templates that contain AWS-authored machine learning models and visualizations. Notebook Instances can be linked to a GitHub or other source code repository. Notebooks created with IoT Analytics Notebook can also be accessed directly through Amazon SageMaker. For the demonstration, the Notebooks Instance is cloned from our project’s GitHub repository.

The repository contains a sample Jupyter Notebook, LoRa_IoT_Analytics_Demo.ipynb, based on the conda_python3 kernel. This preinstalled environment includes the default Anaconda installation and Python 3.

The Notebook uses pandas, matplotlib, and plotly to manipulate and visualize the sample IoT data stored in the IoT Analytics Data set.

The Notebook can be modified, and the changes pushed back to GitHub. You could easily fork the demonstration’s GitHub repository and modify the CloudFormation template to point to your source code repository.

Amazon QuickSight

Amazon QuickSight provides business intelligence (BI) and visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting. We can use Amazon QuickSight to visualize the IoT message data, stored in the IoT Analytics Data set.

Amazon QuickSight has both a Standard and an Enterprise Edition. AWS provides a detailed product comparison of each edition. For the post, I am demonstrating the Enterprise Edition, which includes additional features, such as ML Insights, hourly refreshes of SPICE (super-fast, parallel, in-memory, calculation engine), and theme customization.

Please be aware of the costs of Amazon QuickSight if you choose to follow along with this part of the demo. Although there is an Amazon QuickSight API, Amazon QuickSight is not automatically enabled or configured with CloudFormation or using the AWS CLI in this demonstration.

QuickSight Data Sets

Amazon QuickSight has a wide variety of data source options for creating Amazon QuickSight Data sets, including the ones shown below. Do not confuse Amazon QuickSight Data sets with IoT Analytics Data sets; they are two different service features.

For the demonstration, we will create an Amazon QuickSight Data set that will use our IoT Analytics Data set, iot_analytics_data_set.

Amazon QuickSight gives you the ability to view and modify QuickSight Data sets before visualizing. QuickSight even provides a wide variety of functions, enabling us to perform dynamic calculations on the field values. For this demonstration, we will leave the data unchanged since all transformations were already completed in the IoT Analytics Pipeline.

QuickSight Analysis

Using the QuickSight Data set, built from the IoT Analytics Data set as a data source, we create a QuickSight Analysis. The QuickSight Analysis console is shown below. An Analysis is primarily a collection of Visuals (aka Visual types). QuickSight provides several Visual types. Each visual is associated with a Data set. Data for the QuickSight Analysis or each visual within the Analysis can be filtered. For the demo, I have created a simple QuickSight Analysis, including a few typical QuickSight visuals.

QuickSight Dashboards

To share a QuickSight Analysis, we can create a QuickSight Dashboard. Below, we see a few views of the QuickSight Analysis, shown above, as a Dashboard. Although viewers of the Dashboard cannot edit the visuals, they can apply filtering and interactively drill-down into data in the Visuals.

Amazon QuickSight ML Insights

According to Amazon, ML Insights leverages AWS’s machine learning (ML) and natural language capabilities to gain deeper insights from data. QuickSight’s ML-powered Anomaly Detection continuously analyze data to discover anomalies and variations inside of the aggregates, giving you the insights to act when business changes occur. QuickSight’s ML-powered Forecasting can be used to predict your business metrics accurately, and perform interactive what-if analysis with point-and-click simplicity. QuickSight’s built-in algorithms make it easy for anyone to use ML that learns from your data patterns to provide you with accurate predictions based on historical trends.

Below, we see the ML Insights tab (left) in the demonstration’s QuickSight Analysis. Individually detected anomalies can be added to the QuickSight Analysis, like Visuals, and configured to tune the detection parameters. Observe the temperature, humidity, and barometric pressure anomalies, identified by ML Insights, based on their Anomaly Score, which is higher or lower, given a minimum delta of five percent. These anomalies accurately reflected an actual failure of the IoT device, caused by overheated during testing, which resulted in abnormal sensor readings.

Receiving the Messages on AWS

To confirm the IoT gateway is sending messages, we can use a packet analyzer, like tcpdump, on the IoT gateway. Running tcpdump on the IoT gateway, below, we see outbound encrypted MQTT messages being sent to AWS on port 443.

To confirm those messages are being received from the IoT gateway on AWS, we can use the AWS IoT Core Test feature and subscribe to the lora-iot-demo topic. We should see messages flowing in from the IoT gateway at approximately 5-second intervals.

The JSON payload structure of the incoming MQTT messages will look similar to the below example. The device_id is the unique id of the IoT device that transmitted the message using LoRaWAN. The gateway_id is the unique id of the IoT gateway that received the message using LoRaWAN and sent it to AWS. A single IoT gateway would usually manage messages from multiple IoT devices, each with a unique id.

{
"data": {
"color": {
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281
},
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6
},
"ts": 1598543131.9861386
}

The SQL query used by the AWS IoT Rule described earlier, transforms and flattens the nested JSON payload structure, before passing it to the AWS IoT Analytics Channel, as shown below.

{
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281,
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6,
"ts": 1598543131.9861386,
"msg_received": "2020-08-27T11:45:32.074+0000",
"device": "lora-iot-gateway-01"
}

We can measure the near real-time nature of the IoT data using the ts and msg_received data fields. The ts data field is date and time when the sensor reading occurred on the IoT device, while the msg_received data field is the date and time when the message was received on AWS. The delta between the two values is a measure of how near real-time the sensor readings are being streamed to the AWS IoT Analytics Channel. In the below example, the difference between ts (2020–08–27T11:45:31.986) and msg_received (2020–08–27T11:45:32.074) is 88 ms.

Final IoT Data Message Structure

Once the message payload passes through the AWS IoT Analytics Pipeline and lands in the AWS IoT Analytics Data set, its final data structure looks as follows. Note that the device’s attribute metadata has been added from the AWS IoT Core device registry. Regrettably, the metadata is not well-formatted JSON and will require additional transformation to be usable.

{
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"temperature": 74.48,
"humidity": 45.73,
"pressure": 29.13,
"red": 17,
"green": 42,
"blue": 40,
"ambient": 66,
"ts": 1598543131.9861386,
"device": "lora-iot-gateway-01",
"msg_received": "2020-08-27T15:45:32.024+0000",
"metadata": {
"defaultclientid": "lora-iot-gateway-01",
"thingname": "lora-iot-gateway-01",
"thingid": "017db4b8-7fca-4617-aa58-7125dd94ab36",
"thingarn": "arn:aws:iot:us-east-1:123456789012:thing/lora-iot-gateway-01",
"thingtypename": "LoRaIoTGateway",
"attributes": {
"loramfr": "REYAX",
"gatewaymfr": "RaspberryPiFoundation",
"loramodel": "RYLR896"
},
"version": "2",
"billinggroupname": "LoRaIoTGateways"
},
"__dt": "2020-08-27 00:00:00.000"
}

A set of sample messages is included in the GitHub project’s sample_messages directory.

Conclusion

In this post, we explored the use of the LoRa and LoRaWAN protocols to transmit environmental sensor data from an IoT device to an IoT gateway. Given its low energy consumption, long-distance transmission capabilities, and well-developed protocols, LoRaWAN is an ideal long-range wireless protocol for IoT devices. We then demonstrated how to use AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics to securely collect, analyze, and visualize streaming messages from the IoT device, in near real-time.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , ,

Leave a comment