Kubernetes-based Microservice Observability with Istio Service Mesh: Part 2 of 2

In part two of this two-part post, we will continue to explore the set of popular open-source observability tools that are easily integrated with the Istio service mesh. While these tools are not a part of Istio, they are essential to making the most of Istio’s observability features. The tools include Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We will round out the toolset with the addition of Fluent Bit for log processing and aggregation. We will observe a distributed, microservices-based reference application platform deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster using these tools. The platform, running on EKS, will use Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.

Kiali Management Console showing reference application platform

Observability

The O’Reilly book, Distributed Systems Observability, by Cindy Sridharan, describes The Three Pillars of Observability in Chapter 4: “Logs, metrics, and traces are often known as the three pillars of observability. While plainly having access to logs, metrics, and traces doesn’t necessarily make systems more observable, these are powerful tools that, if understood well, can unlock the ability to build better systems.

Reference Application Platform

To demonstrate Istio’s observability tools, we deployed a reference application platform to EKS on AWS. I have developed the application platform to demonstrate different Kubernetes platforms, such as EKS, GKE, AKS, and concepts such as service mesh, API management, observability, DevOps, and Chaos Engineering. The platform comprises a backend containing eight Go-based microservices, labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, four MongoDB databases, and one RabbitMQ message queue. The platform and all its source code are open-sourced on GitHub.

Reference Application Platform’s Angular-based UI

The reference application platform is designed to generate HTTP-based service-to-service, TCP-based service-to-database, and TCP-based service-to-queue-to-service IPC (inter-process communication). For example, Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message to a RabbitMQ queue, which Service F consumes message off on the RabbitMQ queue, and writes to MongoDB, and so on. The platform’s distributed service communications can be observed using Istio’s observability tools when the system is deployed to a Kubernetes cluster running the Istio service mesh.

High-level architecture of Reference Application Platform

Part Two

In part one of the post, we configured and deployed the reference application platform to an Amazon EKS development-grade cluster on AWS. The reference application, running on EKS, communicates with two external systems, Amazon DocumentDB (with MongoDB compatibility) and Amazon MQ.

Deployed Reference Application Platform as seen from Argo CD

In part two of the post, we will explore each of the observability tools we installed in greater detail. We will understand how each tool contributes to the three pillars of observability: logs, metrics, and traces.

Logs, metrics, and traces are often known as the three pillars of observability.
 — Cindy Sridharan

Pillar One: Logs

To paraphrase Jay Kreps on the LinkedIn Engineering Blog, a log is an append-only, totally-ordered sequence of records ordered by time. The ordering of records defines a notion of “time” since entries to the left are defined to be older than entries to the right. Logs are a historical record of events that happened in the past. Logs have been around almost as long as computers and are at the heart of many distributed data systems and real-time application architectures.

Go-based Microservice Logging

An effective logging strategy starts with what you log, when you log, and how you log. As part of our logging strategy, the eight Go-based microservices use Logrus, a popular structured logger for Go first released in 2014. The microservices also implement Banzai Cloud’s logrus-runtime-formatter. There is an excellent article on the formatter, Golang runtime Logrus Formatter. These two logging packages give us greater control over what you log, when you log, and how you log information about our microservices. The recommended configuration of the packages is minimal.

func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}

Logrus provides several advantages over Go’s simple logging package, log. For example, log entries are not only for Fatal errors, nor should all verbose log entries be output in a Production environment. The post’s microservices are taking advantage of Logrus’ ability to log at seven levels: Trace, Debug, Info, Warning, Error, Fatal, and Panic. I have also variabilized the log level, allowing it to be easily changed in the Kubernetes Deployment resource at deploy-time.

The microservices also take advantage of Banzai Cloud’s logrus-runtime-formatter. The Banzai formatter automatically tags log messages with runtime and stack information, including function name and line number; extremely helpful when troubleshooting. I am also using Logrus’ JSON formatter.

Service A log entries in CloudWatch Insights

In 2020, Logus entered maintenance mode. The author, Simon Eskildsen (Principal Engineer at Shopify), stated they will not be introducing new features. This does not mean Logrus is dead. With over 18,000 GitHub Stars, Logrus will continue to be maintained for security, bug fixes, and performance. The author states that many fantastic alternatives to Logus now exist, such as Zerolog, Zap, and Apex.

Client-side Angular UI Logging

Likewise, I have enhanced the logging of the Angular UI using NGX Logger. NGX Logger is a simple logging module for angular (currently supports Angular 6+). It allows “pretty print” to the console and allows log messages to be POSTed to a URL for server-side logging. For this demo, the UI will only log to the web browser’s console. Similar to Logrus, NGX Logger supports multiple log levels: Trace, Debug, Info, Warning, Error, Fatal, and Off. However, instead of just outputting messages, NGX Logger allows us to output properly formatted log entries to the browser’s console.

The level of logs output is configured to be dependent on the environment, Production or not Production. Below is an example of the log output from the Angular UI in Chrome. Since the UI’s Docker Image was built with the Production configuration, the log level is set to INFO. You would not want to expose potentially sensitive information in verbose log output to our end-users in Production.

Controlling logging levels is accomplished by adding the following ternary operator to the app.module.ts file.

imports: [
BrowserModule,
HttpClientModule,
FormsModule,
LoggerModule.forRoot({
level: !environment.production ?
NgxLoggerLevel.DEBUG : NgxLoggerLevel.INFO,
serverLogLevel: NgxLoggerLevel.INFO
})
],

Platform Logs

Based on the platform built, configured, and deployed in , you now have access logs from multiple sources.

  1. Amazon DocumentDB: Amazon CloudWatch Audit and Profiler logs;
  2. Amazon MQ: Amazon CloudWatch logs;
  3. Amazon EKS: API server, Audit, Authenticator, Controller manager, and Scheduler CloudWatch logs;
  4. Kubernetes Dashboard: Individual EKS Pod and Replica Set logs;
  5. Kiali: Individual EKS Pod and Container logs;
  6. Fluent Bit: EKS performance, host, dataplane, and application CloudWatch logs;

Fluent Bit

According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open-source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.

With Fluent Bit, deployed in part one, the EKS cluster’s performance, host, dataplane, and application logs will also be available in Amazon CloudWatch.

Within the application log groups, you have access to the individual log streams for each reference application’s components.

Within each CloudWatch log stream, you can view individual log entries.

CloudWatch Logs Insights enables you to interactively search and analyze your log data in Amazon CloudWatch Logs. You can perform queries to help you more efficiently and effectively respond to operational issues. If an issue occurs, you can use CloudWatch Logs Insights to identify potential causes and validate deployed fixes.

CloudWatch Logs Insights supports CloudWatch Logs Insights query syntax, a query language you can use to perform queries on your log groups. Each query can include one or more query commands separated by Unix-style pipe characters (|). For example:

fields @timestamp, @message
| filter kubernetes.container_name = "service-f"
and @message like "error"
| sort @timestamp desc
| limit 20

Pillar Two: Metrics

For metrics, we will examine CloudWatch Container Insights, Prometheus, and Grafana. Prometheus and Grafana are industry-leading tools you installed as part of the Istio deployment.

Prometheus

Prometheus is an open-source systems monitoring and alerting toolkit originally built at SoundCloud circa 2012. Prometheus joined the Cloud Native Computing Foundation (CNCF) in 2016 as the second project hosted after Kubernetes.

According to Istio, the Prometheus addon is a Prometheus server that comes preconfigured to scrape Istio endpoints to collect metrics. You can use Prometheus with Istio to record metrics that track the health of Istio and applications within the service mesh. You can visualize metrics using tools like Grafana and Kiali. The Istio Prometheus addon is intended for demonstration only and is not tuned for performance or security.

The istioctl dashboardcommand provides access to all of the Istio web UIs. With the EKS cluster running, Istio installed, and the reference application platform deployed, access Prometheus using the istioctl dashboard prometheus command from your terminal. You must be logged into AWS from your terminal to connect to Prometheus successfully. If you are not logged in to AWS, you will often see the following error: Error: not able to locate <tool_name> pod: Unauthorized. Since we used the non-production demonstration versions of the Istio Addons, there is no authentication and authorization required to access Prometheus.

According to Prometheus, users select and aggregate time-series data in real-time using a functional query language called PromQL (Prometheus Query Language). The result of an expression can either be shown as a graph, viewed as tabular data in Prometheus’s expression browser, or consumed by external systems through Prometheus’ HTTP API. The expression browser includes a drop-down menu with all available metrics as a starting point for building queries. Shown below are a few PromQL examples that were developed as part of writing this post.

istio_agent_go_info{kubernetes_namespace="dev"}
istio_build{kubernetes_namespace="dev"}
up{alpha_eksctl_io_cluster_name="istio-observe-demo", job="kubernetes-nodes"}
sum by (pod) (rate(container_network_transmit_packets_total{stack="reference-app",namespace="dev",pod=~"service-.*"}[5m]))
sum by (instance) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code="200"})
sum by (response_code) (istio_requests_total{source_app="istio-ingressgateway",connection_security_policy="mutual_tls",response_code!~"200|0"})

Prometheus APIs

Prometheus has both an HTTP API and a Management API. There are many useful endpoints in addition to the Prometheus UI, available at http://localhost:9090/graph. For example, the Prometheus HTTP API endpoint that lists all the command-line configuration flags is available at http://localhost:9090/api/v1/status/flags. The endpoint that lists all the available Prometheus metrics is available at http://localhost:9090/api/v1/label/__name__/values; a total of 951 metrics in this demonstration!

The Prometheus endpoint that lists many available metrics with HELP and TYPE to explain their function is found at http://localhost:9090/metrics.

Understanding Metrics

In addition to these endpoints, the standard service level metrics exported by Istio and available via Prometheus are found in the Istio Standard Metrics documentation. An explanation of many of the metrics available via Prometheus are also found in the cAdvisor README on their GitHub site. As mentioned in this AWS Blog Post, the cAdvisor metrics are also available from the command line using the following commands:

export NODE=$(kubectl get nodes | sed -n '2 p') | awk {'print $1'}
kubectl get --raw "/api/v1/nodes/${NODE}/proxy/metrics/cadvisor"

Observing Metrics

Below is an example graph of the backend microservice containers deployed to EKS. The graph PromQL expression returns the amount of working set memory, including recently accessed memory, dirty memory, and kernel memory (container_memory_working_set_bytes), summed by pod, in megabytes (MB). There was no load on the services during the period displayed.

sum by (pod) (container_memory_working_set_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2)

The container_memory_working_set_bytes metric is the same metric used by the kubectl top command (not container_memory_usage_bytes).

> kubectl top pod -n dev --containers=true --use-protocol-buffer
POD                          NAME          CPU(cores)   MEMORY(bytes)
service-a-546fbd558d-28jlm service-a 1m 6Mi
service-a-546fbd558d-2lcsg service-a 1m 6Mi
service-b-545c85df9-dl9h8 service-b 1m 6Mi
service-b-545c85df9-q99xm service-b 1m 5Mi
service-c-58996574-58wd8 service-c 1m 7Mi
service-c-58996574-6q7n4 service-c 1m 7Mi
service-d-867796bb47-87ps5 service-d 1m 6Mi
service-d-867796bb47-fh6wl service-d 1m 6Mi
...

In another Prometheus example, the PromQL query expression returns the per-second rate of CPU resources measured in CPU units (1 CPU = 1 AWS vCPU), as measured over the last 5 minutes, per time series in the range vector, summed by the pod. During this period, the backend services were under a consistent, simulated load of 25 concurrent users using hey. The four Service D pods were consuming the most CPU units during this time period.

sum by (pod) (rate(container_cpu_usage_seconds_total{image=~"registry.hub.docker.com/garystafford/.*"}[5m])) * 1000

The container_cpu_usage_seconds_total metric is the same metric used by the kubectl top command. The above PromQL expression multiplies the query results by 1,000 to match the results from kubectl top, shown below.

> kubectl top pod -n dev --containers=true --use-protocol-buffer
POD                          NAME          CPU(cores)   MEMORY(bytes)
service-a-546fbd558d-28jlm service-a 25m 9Mi
service-a-546fbd558d-2lcsg service-a 27m 8Mi
service-b-545c85df9-dl9h8 service-b 29m 11Mi
service-b-545c85df9-q99xm service-b 23m 8Mi
service-c-58996574-c8hkn service-c 62m 9Mi
service-c-58996574-kx895 service-c 55m 8Mi
service-d-867796bb47-87ps5 service-d 285m 12Mi
service-d-867796bb47-9ln7p service-d 226m 11Mi
...

Limits

Prometheus also exposes container resource limits. For example, the memory limits set on the reference platform’s backend services, displayed in megabytes (MB), using the container_spec_memory_limit_bytes metric. When viewed alongside the real-time resources consumed by the services, these metrics are useful to properly configure and monitor Kubernetes management features such as the Horizontal Pod Autoscaler.

sum by (container) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2) / count by (container) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"})

Or, memory limits by Pod:

sum by (pod) (container_spec_memory_limit_bytes{image=~"registry.hub.docker.com/garystafford/.*"}) / (1024^2)

Cluster Metrics

Prometheus also contains metrics about Istio components, Kubernetes components, and the EKS cluster. For example, the total memory in gigabytes (GB) of each m5.large EC2 worker nodes in the istio-observe-demo EKS cluster’s managed-ng-1 Managed Node Group.

machine_memory_bytes{alpha_eksctl_io_cluster_name="istio-observe-demo", alpha_eksctl_io_nodegroup_name="managed-ng-1"} / (1024^3)

For total physical cores, use the machine_cpu_physical_core metric, and for vCPU cores use the machine_cpu_cores metric.

Grafana

Grafana describes itself as the leading open-source software for time-series analytics. According to Grafana Labs, Grafana allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. You can easily create, explore, and share visually rich, data-driven dashboards. Grafana also allows users to visually define alert rules for their most important metrics. Grafana will continuously evaluate rules and can send notifications.

If you deployed Grafana using the Istio addons process demonstrated in part one of the post, access Grafana similar to the other tools:

istioctl dashboard grafana

According to Istio, Grafana is an open-source monitoring solution used to configure dashboards for Istio. You can use Grafana to monitor the health of Istio and applications within the service mesh. While you can build your own dashboards, Istio offers a set of preconfigured dashboards for all of the most important metrics for the mesh and the control plane. The preconfigured dashboards use Prometheus as the data source.

Below is an example of the Istio Mesh Dashboard, filtered to show the eight backend services workloads running in the dev namespace. During this period, the backend services were under a consistent simulated load of approximately 20 concurrent users using hey. You can observe the p50, p90, and p99 latency of requests to these workloads.

Dashboards are built from Panels, the basic visualization building blocks in Grafana. Each panel has a query editor specific to the data source (Prometheus in this case) selected. The query editor allows you to write your (PromQL) query. Below is the PromQL expression query responsible for the p50 latency Panel displayed in the Istio Mesh Dashboard.

 label_join((histogram_quantile(0.50, sum(rate(istio_request_duration_milliseconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)) / 1000) or histogram_quantile(0.50, sum(rate(istio_request_duration_seconds_bucket{reporter="source"}[1m])) by (le, destination_workload, destination_workload_namespace)), "destination_workload_var", ".", "destination_workload", "destination_workload_namespace")

Below is an example of the Outbound Workloads section of the Istio Workload Dashboard. The complete dashboard contains three sections: General, Inbound Workloads, and Outbound Workloads. Here we have filtered the on reference platform’s backend services in the dev namespace.

Here is a different view of the Istio Workload Dashboard, the dashboard’s Inbound Workloads section filtered to a single workload, Service A, the backend’s edge service. Service A accepts incoming traffic from the Istio Ingress Gateway as shown in the dashboard’s panels.

Grafana provides the ability to Explore a Panel. Explore strips away the dashboard and panel options so that you can focus on the query. It helps you iterate until you have a working query and then think about building a dashboard. Below is an example of the Panel showing the egress TCP traffic, based on the istio_tcp_sent_bytes_total metric, for Service F. Service F consumes messages off on the RabbitMQ queue (Amazon MQ) and writes messages to MongoDB (DocumentDB).

You can monitor the resource usage of Istio with the Performance Dashboard.

Additional Dashboards

Grafana provides a site containing official and community-built dashboards, including the above-mentioned Istio dashboards. Importing dashboards into your Grafana instance is as simple as copying the dashboard URL or the ID provided from the Grafana dashboard site and pasting it into the dashboard import option of your Grafana instance. Be aware that not every Kubernetes dashboard in Grafan’s site is compatible with your specific version of Kubernetes, Istio, or EKS, nor relies on Prometheus as a data source. As a result, you might have to test and tweak imported dashboards to get them working.

Below is an example of an imported community dashboard, Kubernetes cluster monitoring (via Prometheus) by Instrumentisto Team (dashboard ID 315).

Alerting

An effective observability strategy must include more than just the ability to visualize results. An effective strategy must also detect anomalies and notify (alert) the appropriate resources or directly resolve incidents. Grafana, like Prometheus, is capable of alerting and notification. You visually define alert rules for your critical metrics. Grafana will continuously evaluate metrics against the rules and send notifications when pre-defined thresholds are breached.

Prometheus supports multiple popular notification channels, including PagerDuty, HipChat, Email, Kafka, and Slack. Below is an example of a Prometheus notification channel that sends alert notifications to a Slack support channel.

Below is an example of an alert based on an arbitrarily high CPU usage of 300 milliCPUs (m). When the CPU usage of a single pod goes above that value for more than 3 minutes, an alert is sent. The high CPU usage could be caused by the Horizontal Pod Autoscaler not functioning, or the HPA has reached its maxReplicas limit, or there are not enough resources available within the cluster to schedule additional pods.

Triggered by the alert, Prometheus sends detailed notifications to the designated Slack channel.

Amazon CloudWatch Container Insights

Lastly in the category of Metrics, Amazon CloudWatch Container Insights collects, aggregates, and summarizes metrics and logs from your containerized applications and microservices. CloudWatch alarms can be set on metrics that Container Insights collects. Container Insights is available for Amazon Elastic Container Service (Amazon ECS) including Fargate, Amazon EKS, and Kubernetes platforms on Amazon EC2.

In Amazon EKS, Container Insights uses a containerized version of the CloudWatch agent to discover all running containers in a cluster. It then collects performance data at every layer of the performance stack. Container Insights collects data as performance log events using the embedded metric format. These performance log events are entries that use a structured JSON schema that enables high-cardinality data to be ingested and stored at scale.

In part one of the post, we also installed CloudWatch Container Insights monitoring for Prometheus, which automates the discovery of Prometheus metrics from containerized systems and workloads.

Below is an example of a basic Performance Monitoring CloudWatch Container Insights Dashboard. The dashboard is filtered to the dev namespace of the EKS cluster, where the reference application platform is running. During this period, the backend services were put under a simulated load using hey. As the load on the application increases, observe the Number of Pods increases from 19 to 34 pods, based on the Deployment resources and HPA configurations. There is also an Alert, shown on the right of the screen. An alarm was triggered for an arbitrarily high level of network transmission activity.

Next is an example of Container Insights’ Container Map view in Memory mode. You see a visual representation of the dev namespace, with each of the backend service’s Service and Deployment resources shown.

There is a warning icon indicating an Alarm on the cluster was triggered.

Lastly, CloudWatch Insights allows you to jump from the CloudWatch Insights to the CloudWatch Log Insights console. CloudWatch Insights will also write the CloudWatch Insights query for you. Below, we went from the Service D container metrics view in the CloudWatch Insights Performance Monitoring console directly to the CloudWatch Log Insights console with a query, ready to run.

Pillar 3: Traces

According to the Open Tracing website, distributed tracing, also called distributed request tracing, is used to profile and monitor applications, especially those built using a microservices architecture. Distributed tracing helps pinpoint where failures occur and what causes poor performance.

According to Istio, header propagation may be accomplished through client libraries, such as Zipkin or Jaeger. It may also be accomplished manually, referred to as trace context propagation, documented in the Distributed Tracing Task. Istio proxies can automatically send spans. Applications need to propagate the appropriate HTTP headers so that when the proxies send span information, the spans can be correlated correctly into a single trace. To accomplish this, an application needs to collect and propagate the following headers from the incoming request to any outgoing requests.

  • x-request-id
  • x-b3-traceid
  • x-b3-spanid
  • x-b3-parentspanid
  • x-b3-sampled
  • x-b3-flags
  • x-ot-span-context

The x-b3 headers originated as part of the Zipkin project. The B3 portion of the header is named for the original name of Zipkin, BigBrotherBird. Passing these headers across service calls is known as B3 propagation. According to Zipkin, these attributes are propagated in-process and eventually downstream (often via HTTP headers) to ensure all activity originating from the same root are collected together.

To demonstrate distributed tracing with Jaeger and Zipkin, Service A, Service B, and Service E have been modified to pass the b3 headers. These are the three services that make HTTP requests to other upstream services. The following code has been added to propagate the headers from one service to the next. The Istio sidecar proxy (Envoy) generates the first headers. It is critical to only propagate the headers that are present in the downstream request and have a value, as the code below does. Propagating an empty header will break the distributed tracing.

incomingHeaders := []string{
"x-b3-flags",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-spanid",
"x-b3-traceid",
"x-ot-span-context",
"x-request-id",
}
for _, header := range incomingHeaders {
if r.Header.Get(header) != "" {
req.Header.Add(header, r.Header.Get(header))
}
}

Below, the highlighted section of the response payload from a call to Service A’s /api/request-echo endpoint reveals the b3 headers originating from the Istio proxy and passed to Service A.

Jaeger

According to their website, Jaeger, inspired by Dapper and OpenZipkin, is a distributed tracing system released as open source by Uber Technologies. Jaeger is used for monitoring and troubleshooting microservices-based distributed systems, including distributed context propagation, distributed transaction monitoring, root cause analysis, service dependency analysis, and performance and latency optimization. The Jaeger website contains a helpful overview of Jaeger’s architecture and general tracing-related terminology.

If you deployed Jaeger using the Istio addons process demonstrated in part one of the post, access Jaeger similar to the other tools:

istioctl dashboard jaeger

Below is an example of the Jaeger UI’s Search view, displaying the results of a search for the Istio Ingress Gateway service over a period of time. We see a timeline of traces across the top with a list of trace results below. As discussed on the Jaeger website, a trace is composed of spans. A span represents a logical unit of work in Jaeger that has an operation name. A trace is an execution path through the system and can be thought of as a directed acyclic graph (DAG) of spans. If you have worked with systems like Apache Spark, you are probably already familiar with the concept of DAGs.

Below is a detailed view of a single trace in Jaeger’s Trace Timeline mode. The 14 spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 160 ms. The root span in the trace is the Istio Ingress Gateway. The Angular UI, loaded in the end user’s web browser, calls Service A via the Istio Ingress Gateway. From there, we see the expected flow of our service-to-service IPC. Service A calls Services B and Service C. Service B calls Service E, which calls Service G and Service H.

In this demonstration, traces are not instrumented to span the RabbitMQ message queue nor MongoDB. This means you would not see a trace that includes a call from Service D to Service F via the RabbitMQ.

The visualization of the trace’s timeline demonstrates the synchronous nature of the reference platform’s service-to-service IPC instead of the asynchronous nature of the decoupled communications using the RabbitMQ messaging queue. Note how Service A waits for each service in its call chain to respond before returning its response to the requester.

Within Jaeger’s Trace Timeline view, you have the ability to drill into a single span, which contains additional metadata. The span’s metadata includes the API endpoint URL being called, HTTP method, response status, and several other headers.

Jaeger also has an experimental Trace Graph mode, which displays a graph view of the same trace.

Jaeger also includes a Compare Trace feature and two Dependencies views: Force-Directed Graph and DAG. I find both views rather primitive compared to Kiali. Lacking access to Kiali, the views are marginally useful as a dependency graph.

Zipkin

Zipkin is a distributed tracing system, which helps gather timing data needed to troubleshoot latency problems in service architectures. According to a 2012 post on Twitter’s Engineering Blog, Zipkin started as a project during Twitter’s first Hack Week. During that week, they implemented a basic version of the Google Dapper paper for Thrift.

Zipkin and Jaeger are very similar in terms of capabilities. I have chosen to focus on Jaeger in this post as I prefer it over Zipkin. If you want to try Zipkin instead of Jaeger, you can use the following commands to remove Jaeger and install Zipkin from the Istio addons extras directory. In part one of the post, we did not install Zipkin by default when we deployed the Istio addons. Be aware that running both tools at the same time in the same Kubernetes cluster will cause unpredictable tracing results.

kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/jaeger.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.10/samples/addons/extras/zipkin.yaml

Access Zipkin similar to the other observability tools:

istioctl dashboard zipkin

Below is an example of a distributed trace visualized in Zipkin’s UI, containing 14 spans. This is very similar to the trace visualized in Jaeger, shown above. The spans encompass eight of the reference platform’s components: seven of the eight backend services and the Istio Ingress Gateway. The spans each have individual timings, with an overall trace time of 154 ms.

Zipkin can also visualize a dependency graph based on the distributed trace. Below is an example of a traffic simulation over a two-minute period, showing network traffic flowing between the reference platform’s components, illustrated as a dependency graph.

Kiali: Microservice Observability

According to their website, Kiali is a management console for an Istio-based service mesh. It provides dashboards, observability, and lets you operate your mesh with robust configuration and validation capabilities. It shows the structure of a service mesh by inferring traffic topology and displaying the mesh’s health. Kiali provides detailed metrics, powerful validation, Grafana access, and strong integration for distributed tracing with Jaeger.

If you deployed Kaili using the Istio addons process demonstrated in part one of the post, access Kiali similar to the other tools:

istioctl dashboard kaili

For improved security, I optionally chose to install the latest version of Kaili using the customizable install mentioned in Istio’s documentation. Using Kiali’s Install via Kiali Server Helm Chart option adds token-based authentication, similar to the Kubernetes Dashboard.

Logging into Kiali, we see the Overview tab, which provides a global view of all namespaces within the Istio service mesh and the number of applications within each namespace.

The Graph tab in the Kiali UI represents the components running in the Istio service mesh. Below, filtering on the cluster’s dev Namespace, we can observe that Kiali has mapped 8 applications (Workloads), 10 services, and 22 edges (a graph term). Specifically, we see the Istio Ingres Proxy at the edge of the service mesh, the Angular UI and eight backend services all with their respective Envoy proxy sidecars that are taking traffic (Service F did not take any direct traffic from another service in this example), the external DocumentDB egress point, and the external Amazon MQ egress point. Finally, note how service-to-service traffic flows, with Istio, from the service to its sidecar proxy, to the other service’s sidecar proxy, and finally to the service.

Below is a similar view of the service mesh, but this time, there are failures between the Istio Ingress Gateway and Service A, shown in red. We can also observe overall metrics for the HTTP traffic, such as the request per second inbound and outbound, total requests, success and error rates, and HTTP status codes.

Kiali allows you to zoom in and focus on a single component in the graph and its individual metrics.

Kiali can also display average request times and other metrics for each edge in the graph (communication between two components). Kaili can even show those metrics over a given period of time, using Kiali’s Replay feature, shown below.

Focusing on the external DocumentDB cluster, Kiali also allows us to view TCP traffic between the four services within the service mesh that connect to the external cluster.

The Applications tab lists all the applications, their namespace, and labels.

You can drill into an individual component on both the Applications and Workloads tabs and view additional details. Details include the overall health, Pods, and Istio Config status. Below is an overview of the Service A workload in the dev Namespace.

The Workloads detailed view also includes inbound and outbound metrics. Below is an example of the outbound request volume, duration, throughput, and size metrics, for Service A in the dev Namespace.

Kiali also gives you access to the individual pod’s container logs. Although log access is not as user-friendly as other log sources discussed previously, having logs available alongside metrics (integration with Grafana), traces (integration with Jaeger), and mesh visualization, all in Kiali, can be very effective as a single source for observability.

Kiali also has an Istio Config tab. The Istio Config tab displays a list of all of the available Istio configuration objects that exist in the user’s environment.

You can use Kiali to configure and manage the Istio service mesh and its installed resources. Using Kiali, you can actually modify the deployed resources, similar to using the kubectl edit command.

Oftentimes, I find Kiali to be my first stop when troubleshooting platform issues. Once I identify the specific components or communication paths having issues, I can query the CloudWatch logs and Prometheus metrics through the Grafana dashboard.

Conclusion

In this two-part post, we explored a set of popular open-source observability tools, easily integrated with the Istio service mesh. These tools included Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We rounded out the toolset with the addition of Fluent Bit for log processing and forwarding to Amazon CloudWatch Container Insights. Using these tools, we successfully observed a microservices-based, distributed reference application platform deployed to Amazon EKS.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , ,

1 Comment

Kubernetes-based Microservice Observability with Istio Service Mesh: Part 1 of 2

This two-part post explores a set of popular open-source observability tools that are easily integrated with the Istio service mesh. While these tools are not a part of Istio, they are essential to making the most of Istio’s observability features. The tools include Jaeger and Zipkin for distributed transaction monitoring, Prometheus for metrics collection and alerting, Grafana for metrics querying, visualization, and alerting, and Kiali for overall observability and management of Istio. We will round out the toolset with the addition of Fluent Bit for log processing and aggregation. We will observe a distributed, microservices-based reference application platform deployed to an Amazon Elastic Kubernetes Service (Amazon EKS) cluster using these tools. The platform, running on EKS, will use Amazon DocumentDB as a persistent data store and Amazon MQ to exchange messages.

Kiali Management Console showing reference application platform

Observability

Similar to quantum computing, big data, artificial intelligence, machine learning, and 5G, observability is currently a hot buzzword in the IT industry. According to Wikipedia, observability is a measure of how well the internal states of a system can be inferred from its external outputs. The O’Reilly book, Distributed Systems Observability, by Cindy Sridharan, describes The Three Pillars of Observability in Chapter 4: “Logs, metrics, and traces are often known as the three pillars of observability. While plainly having access to logs, metrics, and traces doesn’t necessarily make systems more observable, these are powerful tools that, if understood well, can unlock the ability to build better systems.

Logs, metrics, and traces are often known as the three pillars of observability.

Cindy Sridharan

Honeycomb is a developer of observability tools for production systems. The honeycomb.io site includes articles, blog posts, whitepapers, and podcasts on observability. According to Honeycomb, “Observability is achieved when a system is understandable — which is difficult with complex systems, where most problems are the convergence of many things failing at once.

As modern distributed systems grow ever more complex, the ability to observe those systems demands equally modern tooling designed with this level of complexity in mind. Traditional logging and monitoring tools struggle with today’s polyglot, distributed, event-driven, ephemeral, containerized and serverless application environments. Tools like the Istio service mesh attempt to solve the observability challenge by offering easy integration with several popular open-source telemetry tools. Istio’s integrations include Jaeger for distributed tracing, Kiali for Istio service mesh-based microservice visualization, and Prometheus and Grafana for metric collection, monitoring, and alerting. Combined with cloud-native monitoring and logging tools such as Fluent Bit and Amazon CloudWatch Container Insights, we have a complete observability platform for modern distributed applications running on Amazon Elastic Kubernetes Service (Amazon EKS).

Traditional logging and monitoring tools struggle with today’s polyglot, distributed, event-driven, ephemeral, containerized and serverless application environments.

Gary Stafford

Reference Application Platform

To demonstrate Istio’s observability tools, we will deploy a reference application platform, written in Go and TypeScript with Angular, to EKS on AWS. The reference application platform was developed to demonstrate different Kubernetes platforms, such as EKS, GKE, and AKS, and concepts such as service mesh, API management, observability, DevOps, and Chaos Engineering. The platform is currently comprised of a backend containing eight Go-based microservices, labeled generically as Service A — Service H, one Angular 12 TypeScript-based frontend UI, four MongoDB databases, and one RabbitMQ message queue for event-based communications. The platform and all its source code are open-sourced on GitHub.

Reference Application Platform’s Angular-based UI

The reference application platform is designed to generate HTTP-based service-to-service, TCP-based service-to-database, and TCP-based service-to-queue-to-service IPC (inter-process communication). Service A calls Service B and Service C; Service B calls Service D and Service E; Service D produces a message on a RabbitMQ queue, which Service F consumes and writes to MongoDB, and so on. Distributed service communications can be observed using Istio’s observability tools when the system is deployed to a Kubernetes cluster running the Istio service mesh.

High-level architecture of Reference Application Platform

Service Responses

Each Go microservice contains a /greeting, /health, and /metrics endpoint. The service’s /health endpoint is used to configure Kubernetes Liveness, Readiness, and Startup Probes. The /metrics endpoint exposes metrics that Prometheus scraps. Lastly, upstream services respond to requests from downstream services when calling their /greeting endpoint by returning a small informational JSON payload — a greeting.

{
"id": "1f077127-2f9f-4a90-ad88-da52327c2620",
"service": "Service C",
"message": "Konnichiwa (こんにちは), from Service C!",
"created": "2021-06-04T04:34:02.901726709Z",
"hostname": "service-c-6d5cc8fdfd-stsq9"
}

The responses are aggregated across the service call chain, resulting in an array of service responses being returned to the edge service, Service A, and subsequently, the platform’s UI running in the end user’s web browser.

[
{
"id": "a9afab6a-3e2a-41a6-aec7-7257d2904076",
"service": "Service D",
"message": "Shalom (שָׁלוֹם), from Service D!",
"created": "2021-06-04T14:28:32.695151047Z",
"hostname": "service-d-565c775894-vdsjx"
},
{
"id": "6d4cc38a-b069-482c-ace5-65f0c2d82713",
"service": "Service G",
"message": "Ahlan (أهلا), from Service G!",
"created": "2021-06-04T14:28:32.814550521Z",
"hostname": "service-g-5b846ff479-znpcb"
},
{
"id": "988757e3-29d2-4f53-87bf-e4ff6fbbb105",
"service": "Service H",
"message": "Nǐ hǎo (你好), from Service H!",
"created": "2021-06-04T14:28:32.947406463Z",
"hostname": "service-h-76cb7c8d66-lkr26"
},
{
"id": "966b0bfa-0b63-4e21-96a1-22a76e78f9cd",
"service": "Service E",
"message": "Bonjour, from Service E!",
"created": "2021-06-04T14:28:33.007881464Z",
"hostname": "service-e-594d4754fc-pr7tc"
},
{
"id": "c612a228-704f-4562-90c5-33357b12ff8d",
"service": "Service B",
"message": "Namasté (नमस्ते), from Service B!",
"created": "2021-06-04T14:28:33.015985983Z",
"hostname": "service-b-697b78cf54-4lk8s"
},
{
"id": "b621bd8a-02ee-4f9b-ac1a-7d91ddad85f5",
"service": "Service C",
"message": "Konnichiwa (こんにちは), from Service C!",
"created": "2021-06-04T14:28:33.042001406Z",
"hostname": "service-c-7fd4dd5947-5wcgs"
},
{
"id": "52eac1fa-4d0c-42b4-984b-b65e70afd98a",
"service": "Service A",
"message": "Hello, from Service A!",
"created": "2021-06-04T14:28:33.093380628Z",
"hostname": "service-a-6f776d798f-5l5dz"
}
]

CORS

The platform’s backend edge service, Service A, is configured for Cross-Origin Resource Sharing (CORS) using the access-control-allow-origin response header. The CORS configuration allows the Angular UI, running in the end user’s web browser, to call Service A’s /greeting endpoint, which potentially resides in a different host from the UI. Shown below is the Go source code for Service A. Note the use of the ALLOWED_ORIGINS environment variable on lines 32 and 195, which allows you to configure the origins that are allowed from the service’s Deployment resource.

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: Service A
// date: 2021-06-05
package main
import (
"encoding/json"
"fmt"
runtime "github.com/banzaicloud/logrus-runtime-formatter"
"io"
"io/ioutil"
"net/http"
"net/http/httputil"
"os"
"strconv"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
log "github.com/sirupsen/logrus"
)
var (
logLevel = getEnv("LOG_LEVEL", "debug")
port = getEnv("PORT", ":8080")
serviceName = getEnv("SERVICE_NAME", "Service A")
message = getEnv("GREETING", "Hello, from Service A!")
allowedOrigins = getEnv("ALLOWED_ORIGINS", "*")
URLServiceB = getEnv("SERVICE_B_URL", "http://service-b&quot;)
URLServiceC = getEnv("SERVICE_C_URL", "http://service-c&quot;)
)
type Greeting struct {
ID string `json:"id,omitempty"`
ServiceName string `json:"service,omitempty"`
Message string `json:"message,omitempty"`
CreatedAt time.Time `json:"created,omitempty"`
Hostname string `json:"hostname,omitempty"`
}
var greetings []Greeting
// *** HANDLERS ***
func GreetingHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
log.Debug(r)
greetings = nil
callNextServiceWithTrace(URLServiceB+"/api/greeting", r)
callNextServiceWithTrace(URLServiceC+"/api/greeting", r)
tmpGreeting := Greeting{
ID: uuid.New().String(),
ServiceName: serviceName,
Message: message,
CreatedAt: time.Now().Local(),
Hostname: getHostname(),
}
greetings = append(greetings, tmpGreeting)
err := json.NewEncoder(w).Encode(greetings)
if err != nil {
log.Error(err)
}
}
func HealthCheckHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{\"alive\": true}"))
if err != nil {
log.Error(err)
}
}
func ResponseStatusHandler(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
statusCode, err := strconv.Atoi(params["code"])
if err != nil {
log.Error(err)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(statusCode)
}
func RequestEchoHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
requestDump, err := httputil.DumpRequest(r, true)
if err != nil {
log.Error(err)
}
_, err = fmt.Fprintf(w, string(requestDump))
if err != nil {
log.Error(err)
}
}
// *** UTILITY FUNCTIONS ***
func callNextServiceWithTrace(url string, r *http.Request) {
log.Debug(url)
var tmpGreetings []Greeting
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Error(err)
}
// Headers must be passed for Jaeger Distributed Tracing
incomingHeaders := []string{
"x-b3-flags",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-spanid",
"x-b3-traceid",
"x-ot-span-context",
"x-request-id",
}
for _, header := range incomingHeaders {
if r.Header.Get(header) != "" {
req.Header.Add(header, r.Header.Get(header))
}
}
log.Info(req)
client := &http.Client{
Timeout: time.Second * 10,
}
response, err := client.Do(req)
if err != nil {
log.Error(err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
log.Error(err)
}
}(response.Body)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error(err)
}
err = json.Unmarshal(body, &tmpGreetings)
if err != nil {
log.Error(err)
}
for _, r := range tmpGreetings {
greetings = append(greetings, r)
}
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Error(err)
}
return hostname
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func run() error {
c := cors.New(cors.Options{
AllowedOrigins: []string{allowedOrigins},
AllowCredentials: true,
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"},
})
router := mux.NewRouter()
api := router.PathPrefix("/api").Subrouter()
api.HandleFunc("/greeting", GreetingHandler).Methods("GET", "OPTIONS")
api.HandleFunc("/health", HealthCheckHandler).Methods("GET", "OPTIONS")
api.HandleFunc("/request-echo", RequestEchoHandler).Methods(
"GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD")
api.HandleFunc("/status/{code}", ResponseStatusHandler).Methods("GET", "OPTIONS")
api.Handle("/metrics", promhttp.Handler())
handler := c.Handler(router)
return http.ListenAndServe(port, handler)
}
func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}
func main() {
if err := run(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}
view raw main.go hosted with ❤ by GitHub

MongoDB- and RabbitMQ-as-a-Service

Using external services will help us understand how Istio and its observability tools collect telemetry for communications between the reference application platform on Kubernetes and external systems.

Amazon DocumentDB

For this demonstration, the reference application platform’s MongoDB databases will be hosted, external to EKS, on Amazon DocumentDB (with MongoDB compatibility). According to AWS, Amazon DocumentDB is a purpose-built database service for JSON data management at scale, fully managed and integrated with AWS, and enterprise-ready with high durability.

Amazon MQ

Similarly, the reference application platform’s RabbitMQ queue will be hosted, external to EKS, on Amazon MQ. AWS MQ is a managed message broker service for Apache ActiveMQ and RabbitMQ, making it easy to set up and operate message brokers on AWS. Amazon MQ reduces your operational responsibilities by managing the provisioning, setup, and maintenance of message brokers for you. For RabbitMQ, Amazon MQ provides access to the RabbitMQ web console. The console allows us to monitor and manage RabbitMQ.

RabbitMQ Web Console showing the reference platform’s greeting queue

Shown below is the Go source code for Service F. This service consumes messages from the RabbitMQ queue, placed there by Service D, and writes the messages to MongoDB. Services use Sean Treadway’s Go RabbitMQ Client Library and MongoDB’s MongoDB Go Driver for connectivity.

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// purpose: Service F
// date: 2021-06-05
package main
import (
"bytes"
"context"
"encoding/json"
runtime "github.com/banzaicloud/logrus-runtime-formatter"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var (
logLevel = getEnv("LOG_LEVEL", "debug")
port = getEnv("PORT", ":8080")
serviceName = getEnv("SERVICE_NAME", "Service F")
message = getEnv("GREETING", "Hola, from Service F!")
queueName = getEnv("QUEUE_NAME", "service-d.greeting")
mongoConn = getEnv("MONGO_CONN", "mongodb://mongodb:27017/admin")
rabbitMQConn = getEnv("RABBITMQ_CONN", "amqp://guest:guest@rabbitmq:5672")
)
type Greeting struct {
ID string `json:"id,omitempty"`
ServiceName string `json:"service,omitempty"`
Message string `json:"message,omitempty"`
CreatedAt time.Time `json:"created,omitempty"`
Hostname string `json:"hostname,omitempty"`
}
var greetings []Greeting
// *** HANDLERS ***
func GreetingHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
greetings = nil
tmpGreeting := Greeting{
ID: uuid.New().String(),
ServiceName: serviceName,
Message: message,
CreatedAt: time.Now().Local(),
Hostname: getHostname(),
}
greetings = append(greetings, tmpGreeting)
callMongoDB(tmpGreeting, mongoConn)
err := json.NewEncoder(w).Encode(greetings)
if err != nil {
log.Error(err)
}
}
func HealthCheckHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{\"alive\": true}"))
if err != nil {
log.Error(err)
}
}
// *** UTILITY FUNCTIONS ***
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Error(err)
}
return hostname
}
func callMongoDB(greeting Greeting, mongoConn string) {
log.Info(greeting)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoConn))
if err != nil {
log.Error(err)
}
defer func(client *mongo.Client, ctx context.Context) {
err := client.Disconnect(ctx)
if err != nil {
log.Error(err)
}
}(client, nil)
collection := client.Database("service-f").Collection("messages")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = collection.InsertOne(ctx, greeting)
if err != nil {
log.Error(err)
}
}
func getMessages(rabbitMQConn string) {
conn, err := amqp.Dial(rabbitMQConn)
if err != nil {
log.Error(err)
}
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
log.Error(err)
}
}(conn)
ch, err := conn.Channel()
if err != nil {
log.Error(err)
}
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
log.Error(err)
}
}(ch)
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
if err != nil {
log.Error(err)
}
msgs, err := ch.Consume(
q.Name,
"service-f",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Error(err)
}
forever := make(chan bool)
go func() {
for delivery := range msgs {
log.Debug(delivery)
callMongoDB(deserialize(delivery.Body), mongoConn)
}
}()
<-forever
}
func deserialize(b []byte) (t Greeting) {
log.Debug(b)
var tmpGreeting Greeting
buf := bytes.NewBuffer(b)
decoder := json.NewDecoder(buf)
err := decoder.Decode(&tmpGreeting)
if err != nil {
log.Error(err)
}
return tmpGreeting
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func run() error {
go getMessages(rabbitMQConn)
router := mux.NewRouter()
api := router.PathPrefix("/api").Subrouter()
api.HandleFunc("/greeting", GreetingHandler).Methods("GET")
api.HandleFunc("/health", HealthCheckHandler).Methods("GET")
api.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(port, router)
}
func init() {
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}}
formatter.Line = true
log.SetFormatter(&formatter)
log.SetOutput(os.Stdout)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Error(err)
}
log.SetLevel(level)
}
func main() {
if err := run(); err != nil {
log.Fatal(err)
os.Exit(1)
}
}
view raw main.go hosted with ❤ by GitHub

Source Code

All source code for this post is available on GitHub within two projects. Go-based microservices source code and Kubernetes resources are located in the k8s-istio-observe-backend project repository. The Angular UI TypeScript-based source code is located in the k8s-istio-observe-frontend project repository. You do not need to clone the Angular UI project for this demonstration. The demonstration uses the 2021-istio branch for both projects.

git clone --branch 2021-istio --single-branch \
https://github.com/garystafford/k8s-istio-observe-backend.git
# optional - not needed for demonstration
git clone --branch 2021-istio --single-branch \
https://github.com/garystafford/k8s-istio-observe-frontend.git

Docker images referenced in the Kubernetes Deployment resource files for the Go services and UI are all available on Docker Hub. The Go microservice Docker images were built using the official Golang Alpine image on DockerHub, containing Go version 1.16.4. Using the Alpine image to compile the Go source code ensures the containers will be as small as possible and minimize the container’s potential attack surface.

Prerequisites

This post will assume a basic level of knowledge of AWS EKS, Kubernetes, and Istio. Furthermore, the post assumes you have already installed recent versions of the AWS CLI v2, kubectl, Weaveworks’ eksctl, Docker, and Istio. Meaning that the aws, kubectl, eksctl, istioctl, and docker command tools are all available from the terminal.

CLI for Amazon EKS

Weaveworks’ eksctl is a simple CLI tool for creating and managing clusters on EKS — Amazon’s managed Kubernetes service for EC2. It is written in Go and uses CloudFormation.

CLI for Istio

The Istio configuration command-line utility, istioctl, is designed to help debug and diagnose the Istio mesh.

Set-up and Installation

To deploy the microservices platform to EKS, we will proceed in roughly the following order:

  1. Create a TLS certificate and Route53 hosted zone records for ALB;
  2. Create an Amazon DocumentDB database cluster;
  3. Create an Amazon MQ RabbitMQ message broker;
  4. Create an EKS cluster;
  5. Modify Kubernetes resources for your own environment;
  6. Deploy AWS Application Load Balancer (ALB) and associated resources;
  7. Deploy Istio to the EKS cluster;
  8. Deploy Fluent Bit to the EKS cluster;
  9. Deploy the reference platform to EKS;
  10. Test and troubleshoot the platform;
  11. Observe the results in part two;

Amazon DocumentDB

As previously mentioned, the MongoDB databases will be hosted, external to EKS, on Amazon DocumentDB, with MongoDB compatibility. Create a DocumentDB cluster. For the sake of simplicity and affordability of the demo, I recommend creating a single db.r5.large node cluster. We will connect from the microservices to Amazon DocumentDB using the supplied mongodb:// connection string.

Amazon DocumentDB clusters are deployed within an Amazon Virtual Private Cloud (Amazon VPC). If you are installing DocumentDB in a separate VPC than EKS, you will need to ensure that the EKS VPC can access the DocumentDB VPC. Per the DocumentDB documentation, DocumentDB clusters can be accessed directly by Amazon EC2 instances or other AWS services that are deployed in the same Amazon VPC. Additionally, Amazon DocumentDB can be accessed via EC2 instances or other AWS services from different VPCs in the same AWS Region or other Regions via VPC peering.

Amazon MQ

Similarly, the RabbitMQ queues will be hosted, external to EKS, on Amazon MQ. Create an Amazon MQ RabbitMQ broker. To ensure the simplicity and affordability of the demo, I recommend a single mq.m5.large instance broker. The broker is running the RabbitMQ engine and has TLS disabled. We will connect from the microservices to Amazon MQ using AMQP (Advanced Message Queuing Protocol). Amazon MQ provides an amqps:// endpoint. The amqps URI scheme is used to instruct a client to make a secure connection to the server. You can manage and observe RabbitMQ from the RabbitMQ web console provided by Amazon MQ.

Modify Kubernetes Resources

You will need to change several configuration settings in the GitHub project’s Kubernetes resource files to match your environment.

Istio ServiceEntry for Document DB

Modify the Istio ServiceEntry resource, external-mesh-document-db.yaml, adding your DocumentDB host address. This file allows egress traffic from the microservices on EKS to the DocumentDB cluster.

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: docdb-external-mesh
spec:
hosts:
- {{ your_document_db_hostname }}
ports:
- name: mongo
number: 27017
protocol: MONGO
location: MESH_EXTERNAL
resolution: NONE

Istio ServiceEntry for Amazon MQ

Modify the Istio ServiceEntry resource, external-mesh-amazon-mq.yaml, adding your Amazon MQ host address. This file allows egress traffic from the microservices on EKS to the Amazon MQ RabbitMQ broker.

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: amazon-mq-external-mesh
spec:
hosts:
- {{ your_amazon_mq_hostname }}
ports:
- name: rabbitmq
number: 5671
protocol: TCP
location: MESH_EXTERNAL
resolution: NONE

Istio Gateway

There are numerous strategies you can use to route traffic into the EKS cluster via Istio. For this demonstration, I am using an AWS Application Load Balancer (ALB). I have mapped one hostname, observe-ui.example-api.com, to the Angular UI application running on EKS. The backend microservice-based API, specifically the edge service, Service A, is mapped to a second hostname, observe-api.example-api.com.

According to Istio, the Gateway describes a load balancer operating at the edge of the mesh, receiving incoming or outgoing HTTP/TCP connections. Modify the Istio Ingress Gateway resource, gateway.yaml. Insert your own DNS entries into the hosts section. These are the only hosts that will be allowed into the mesh on port 80.

apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: istio-gateway
spec:
selector:
istio: ingressgateway # use istio default controller
servers:
- port:
number: 80
name: ui
protocol: HTTP
hosts:
- {{ your_ui_hostname }}
- {{ your_api_hostname }}

Istio VirtualService

According to Istio, a VirtualService defines a set of traffic routing rules to apply when a host is addressed. A VirtualService is bound to a Gateway to control the forwarding of traffic arriving at a particular host and port. Modify the project’s two Istio VirtualServices resources, virtualservices.yaml. Insert the corresponding DNS entries from the Istio Gateway.

---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: angular-ui
spec:
hosts:
- {{ your_ui_hostname }}
gateways:
- istio-gateway
http:
- match:
- uri:
prefix: /
route:
- destination:
host: angular-ui.dev.svc.cluster.local
subset: v1
port:
number: 80
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: service-a
spec:
hosts:
- {{ your_api_hostname }}
gateways:
- istio-gateway
http:
- match:
- uri:
prefix: /api
route:
- destination:
host: service-a.dev.svc.cluster.local
subset: v1
port:
number: 8080

Kubernetes Secret

According to the Kubernetes project, Kubernetes Secrets lets you store and manage sensitive information, such as passwords, OAuth tokens, and SSH keys. Storing confidential information in a Secret is safer and more flexible than putting it verbatim in a Pod definition or in a container image.

The project contains a Kubernetes Opaque type Secret resource, go-srv-demo.yaml. The Secret contains several pieces of arbitrary user-defined data we want to secure. Data includes the full DocumentDB mongodb:// connection string and the Amazon MQ amqps:// connection string used by the microservices. We will use the Secret to secure the entire connection string, including the hostname, port, username, and password. The data also includes the DocumentDB host, username, and password, and an arbitrary username and password to login to Mongo Express using Basic Authentication.

You must encode your secret’s values using base64. On Linux and Mac, you can use the base64 program to encode the connection strings.

echo -n '{{ your_secret_to_encode }}' | base64
# e.g., echo -n 'amqps://username:password@hostname.mq.us-east-1.amazonaws.com:5671/' | base64

Add the base64 encoded values to the Secret resource.

apiVersion: v1
kind: Secret
metadata:
name: go-srv-config
namespace: dev
type: Opaque
data:
mongodb.conn: {{ your_base64_encoded_secret }}
rabbitmq.conn: {{ your_base64_encoded_secret }}
---
apiVersion: v1
kind: Secret
metadata:
name: mongo-express-config
namespace: mongo-express
type: Opaque
data:
me.basicauth.username: {{ your_base64_encoded_secret }}
me.basicauth.password: {{ your_base64_encoded_secret }}
mongodb.host: {{ your_base64_encoded_secret }}
mongodb.username: {{ your_base64_encoded_secret }}
mongodb.password: {{ your_base64_encoded_secret }}

AWS Load Balancer Controller

The project contains a Custom Resource Definition (CRD) and associated resources, aws-load-balancer-controller-v220-all.yaml. These resources configure the AWS Application Load Balancer (ALB) using the AWS Load Balancer Controller v2.2.0, aws-load-balancer-controller. The AWS Load Balancer Controller manages AWS Elastic Load Balancers (ELB) for a Kubernetes cluster. The controller provisions an AWS ALB when you create a Kubernetes Ingress.

Modify line 797 to include the name of your own cluster. I am using the cluster name istio-observe-demo throughout the demo.

spec:
containers:
- args:
- --cluster-name=istio-observe-demo
- --ingress-class=alb
image: amazon/aws-alb-ingress-controller:v2.2.0
livenessProbe:
failureThreshold: 2
httpGet:
path: /healthz
port: 61779
scheme: HTTP

EKS Cluster Config

The project contains an eksctl ClusterConfig resource, cluster.yaml. The ClusterConfig defines the configuration of the Amazon EKS cluster along with networking, security, and other associated resources. Instead of a pre-existing Amazon Virtual Private Cloud (Amazon VPC) for this demo, eksctl will create a VPC and associated AWS resources as part of cluster creation. Modify the file to match your AWS Region, desired EKS cluster name, and Kubernetes release. For the demo, I am using the latest Kubernetes 1.20 release.

apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: istio-observe-demo
region: us-east-1
version: "1.20"
iam:
withOIDC: true

Set Environment Variables

Modify and set the following environment variables in your terminal. I will be using us-east-1 for all the demonstration’s AWS resources that are part of the demonstration. They should match the eksctl ClusterConfig resource above.

export AWS_ACCOUNT=$(aws sts get-caller-identity --output text --query 'Account')
export EKS_REGION="us-east-1"
export CLUSTER_NAME="istio-observe-demo"

Istio Home

Set your ISTIO_HOME directory. I have the latest Istio 1.10.0 installed and have theISTIO_HOME environment variable set in my Oh My Zsh .zshrc file. I have also set Istio’s bin/ subdirectory in my PATH environment variable. The bin/ subdirectory contains the istioctl executable.

echo $ISTIO_HOME
/Applications/Istio/istio-1.10.0
where istioctl
/Applications/Istio/istio-1.10.0/bin/istioctl
istioctl version

client version: 1.10.0
control plane version: 1.10.0
data plane version: 1.10.0 (4 proxies)

Create EKS Cluster

With the cluster.yaml file modified previously, deploy the EKS cluster to a new VPC on AWS.

eksctl create cluster -f ./resources/other/cluster.yaml

This step deploys a large number of resources using CloudFormation. The complete EKS provisioning process can take up to 15–20 minutes to complete.

For the complete demonstration, eksctl will deploy a total of four CloudFormation stacks to your AWS environment.

Once complete, configure kubectl so that you can connect to an Amazon EKS cluster.

aws eks --region ${EKS_REGION} update-kubeconfig \
--name ${CLUSTER_NAME}

Confirm that your cluster creation was successful with the following commands:

kubectl cluster-info
eksctl utils describe-stacks \
--region ${EKS_REGION} --cluster ${CLUSTER_NAME}

Use the EKS Management Console to review the new cluster’s details.

The EKS cluster in this demonstration was created with a single Amazon EKS managed node group, managed-ng-1. The managed node group contains three m5.large EC2 instances. The composition of the EKS cluster can be modified in the eksctl ClusterConfig resource, cluster.yaml.

Deploy AWS Load Balancer Controller

Using the aws-load-balancer-controller-v220-all.yaml file you previously modified, deploy the AWS Load Balancer Controller v2.2.0. Please carefully review the AWS Load Balancer Controller instructions to understand how this resource is configured and integrated with EKS.

curl -o resources/aws/iam-policy.json \
https://raw.githubusercontent.com/kubernetes-sigs/aws-load-balancer-controller/v2.2.0/docs/install/iam_policy.json
aws iam create-policy \
--policy-name AWSLoadBalancerControllerIAMPolicy220 \
--policy-document file://resources/aws/iam-policy.json

eksctl create iamserviceaccount \
--region ${EKS_REGION} \
--cluster ${CLUSTER_NAME} \
--namespace=kube-system \
--name=aws-load-balancer-controller \
--attach-policy-arn=arn:aws:iam::${AWS_ACCOUNT}:policy/AWSLoadBalancerControllerIAMPolicy220 \
--override-existing-serviceaccounts \
--approve
kubectl apply --validate=false \
-f https://github.com/jetstack/cert-manager/releases/download/v1.3.1/cert-manager.yaml

kubectl apply -f resources/other/aws-load-balancer-controller-v220-all.yaml

To confirm the aws-load-balancer-controller is deployed and ready, run the following command:

kubectl get deployment -n kube-system aws-load-balancer-controller
NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
aws-load-balancer-controller
1/1 1 1 55s

AWS Load Balancer Controller Policy

There is an OpenID Connect provider URL associated with the EKS cluster. To use IAM roles for service accounts, an IAM OIDC provider must exist for your cluster. Obtain the URL from the EKS Management Console’s Details tab.

You can also obtain the URL using the following AWS CLI commands:

aws eks describe-cluster --name ${CLUSTER_NAME}
aws iam list-open-id-connect-providers

The project contains a policy document, trust-eks-policy.json. Modify the policy document by adding the OpenID Connect information found above. Instructions are also included in the AWS Create an IAM OIDC provider for your cluster documentation.

{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Principal":{
"Federated":" {{ your_openid_connect_arn }}"
},
"Action":"sts:AssumeRoleWithWebIdentity",
"Condition":{
"StringEquals":{
"oidc.eks.us-east-1.amazonaws.com/id/{{ your_open_id_connect_id }}:sub":"system:serviceaccount:kube-system:alb-ingress-controller"
}
}
}
]
}

Create and attach the AWS Load Balancer Controller IAM policies and roles.

aws iam create-role \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--assume-role-policy-document file://resources/aws/trust-eks-policy.json

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn="arn:aws:iam::${AWS_ACCOUNT}:policy/AWSLoadBalancerControllerIAMPolicy220"

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy

aws iam attach-role-policy \
--role-name eks-alb-ingress-controller-eks-istio-observe-demo \
--policy-arn arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy

Create Namespaces

Kubernetes supports multiple virtual clusters backed by the same physical cluster. These virtual clusters are called namespaces. The dev namespace will house the reference application platform for this demonstration — the Angular UI frontend and Go microservices backend. This namespace represents a development environment on EKS for our reference application platform. A second namespace, mongo-express, will be used to deploy Mongo Express later in the post.

kubectl apply -f ./minikube/resources/namespaces.yaml

Enable Automatic Sidecar Injection

To take advantage of Istio’s features, pods in the mesh must be running an Istio sidecar proxy. By setting the istio-injection=enabled label on a namespace and the injection webhook is enabled, any new pods created in that namespace will automatically have an Istio sidecar proxy added to them. Labeling the dev namespace for automatic sidecar injection ensures that our reference application platform — the UI and the microservices — will have Istio sidecar proxy automatically injected into their pods.

kubectl label namespace dev istio-injection=enabled

Deploy Secret Resources

Create the DocumentDB and Amazon MQ Secrets in the appropriate dev and mongo-express namespaces.

kubectl apply -f ./resources/secrets/secrets.yaml

Install Istio Configuration Profile

Istio comes with several built-in configuration profiles. The profiles provide customization of the Istio control plane and the sidecars for the Istio data plane.

istioctl profile list
Istio configuration profiles:
default
demo
empty
external
minimal
openshift
preview
remote

For this demonstration, use the default profile, which installs Istio core, istiod, istio-ingressgateway, and istio-egressgateway.

istioctl install --set profile=demo -y
✔ Istio core installed
✔ Istiod installed
✔ Ingress gateways installed
✔ Egress gateways installed
✔ Installation complete

Deploy Istio Gateway, VirtualService, and DestinationRule Resources

An Istio Gateway describes a load balancer operating at the edge of the mesh receiving incoming or outgoing HTTP/TCP connections. An Istio VirtualService defines a set of traffic routing rules to apply when a host is addressed. Lastly, an Istio DestinationRule defines policies that apply to traffic intended for a Service after routing has occurred. You need to deploy an Istio Gateway and a set of VirtualService. You will also need to deploy a set of DestinationRule resources. Create the Istio Gateway, Virtual Services, and Destination Rules, which you modified earlier.

kubectl apply -f resources/istio/gateway.yaml -n dev
kubectl apply -f resources/istio/virtualservices.yaml -n dev
kubectl apply -f resources/istio/destination-rules.yaml -n dev

Deploy Istio Telemetry Add-ons

The Istio project includes sample deployments of various telemetry add-ons that integrate with Istio. The add-ons include Jaeger, Zipkin, Kiali, Prometheus, and Grafana. While these applications are not a part of Istio, they are essential to making the most of Istio’s observability features. According to the Istio project, the deployments are meant to quickly get up and running and are optimized for this case. As a result, they may not be suitable for production. See the GitHub project for more info on integrating a production-grade version of each add-on.

Install the add-ons using the default configurations and then replace Prometheus with a modified version included in the project. The modified Kubernetes ConfigMap in the prometheus.yaml file has added configuration to scrape our reference platform’s /api/metrics endpoint.

kubectl apply -f $ISTIO_HOME/samples/addons
kubectl apply -f resources/istio/prometheus.yaml -n istio-system

You should see seven workloads in the namespace from the EKS Management Console’s Workloads tab, each with one pod up and running. The workloads include Grafana, Jaeger, Kiali, and Prometheus. Also included is the Istio Configuration demo Profile’s istiod, istio-ingressgateway, and istio-egressgateway, installed previously.

Deploy Kubernetes Web UI (Dashboard)

Kubernetes Web UI (Dashboard) is a web-based Kubernetes user interface. You can use the Dashboard to deploy containerized applications to a Kubernetes cluster, troubleshoot your containerized application, and manage cluster resources. You can use the Dashboard to get an overview of applications running on your cluster, as well as for creating or modifying individual Kubernetes resources.

To deploy the dashboard, follow the steps outlined in the Tutorial: Deploy the Kubernetes Dashboard (web UI).

kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.0.5/aio/deploy/recommended.yaml
kubectl apply -f resources/aws/eks-admin-service-account.yaml

Each Service Account has a Secret with a valid Bearer Token that can be used to log in to the Dashboard. Use the following command to retrieve the token associated with the eks-admin Account.

kubectl -n kube-system describe secret $(kubectl -n kube-system get secret | grep eks-admin | awk '{print $1}')

Start the kubectl proxy in a separate terminal window.

kubectl proxy

Use the eks-admin Account’s token to log in to the Kubernetes Dashboard at the following URL:

http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#!/login

Deploy Mongo Express

Mongo Express is a web-based MongoDB administrative interface written with Node.js, Express, and Bootstrap3. Install Mongo Express into the mongo-express namespace on the EKS cluster to manage the DocumentDB cluster.

kubectl apply -f ./resources/services/mongo-express.yaml -n mongo-express

Obtain the external IP address of any of the Kubernetes worker nodes and the NodePort of Mongo Express with the following two commands:

kubectl get nodes -o wide |  awk {'print $1" " $2 " " $7'} | column -t
kubectl get service/mongo-express -n mongo-express

To ensure secure access to Mongo Express, create an Inbound Rule in your VPC’s Security Group that allows only your IP address (the ‘My IP’ option) access to Mongo Express running on the NodePort obtained above.

Start the kubectl proxy in a separate terminal window.

kubectl proxy

Use the external IP address of any of the Kubernetes worker nodes and current NodePort to access Mongo Express. Mongo Express will require you to enter the username and password you encoded in the Kubernetes Secret created earlier using basic authentication. Once you have deployed the reference application platform, later in the post, you will observe four databases: service-c, service-f, service-g, and service-h. The typical operational databases you would normally see with your own MongoDB installation are unavailable in the UI since DocumentDB is a managed service.

Mongo Express UI showing the four reference platform’s databases

Modify and Deploy the ALB Ingress

The project contains an ALB Ingress resource, alb-ingress.yaml. The AWS Load Balancer Controller installed earlier is configured to limit the ingresses ALB ingress controller controls. By setting the --ingress-class=alb argument, it constrains the controller’s scope to ingresses with matching kubernetes.io/ingress.class: alb annotation. This is especially helpful when running multiple ingress controllers in the same cluster.

The ALB Ingress resource, alb-ingress.yaml, needs to be modified before deployment. First, update the alb.ingress.kubernetes.io/healthcheck-port annotation. The port value is derived from the status-port of the istio-ingressgateway, which was installed as part of the Istio demo configuration profile. To obtain the status-port from the istio-ingressgateway, run the following command:

kubectl -n istio-system get svc istio-ingressgateway \
-o jsonpath='{.spec.ports[?(@.name=="status-port")].nodePort}'

Next, insert the ARN of your SSL/TLS (Transport Layer Security) certificate that is associated with the domain listed in the external-dns.alpha.kubernetes.io/hostname annotation into the ALB Ingress resource, alb-ingress.yaml. Run the following command to insert the TLS certificate’s ARN into the alb.ingress.kubernetes.io/certificate-arn annotation. This command assumes that your SSL/TLS certificate is registered with AWS Certificate Manager (ACM).

export ALB_CERT=$(aws acm list-certificates --certificate-statuses ISSUED \
| jq -r '.CertificateSummaryList[] | select(.DomainName=="*.example-api.com") | .CertificateArn')
yq e '.metadata.annotations."alb.ingress.kubernetes.io/certificate-arn" = env(ALB_CERT)' -i resources/other/alb-ingress.yaml

The alb.ingress.kubernetes.io/actions.ssl-redirect annotation will redirect all HTTP traffic to HTTPS. The TLS certificate is used for HTTPS traffic. The ALB then terminates the HTTPS traffic at the ALB and forwards the unencrypted traffic to the EKS cluster on port 80.

Finally, update external-dns.alpha.kubernetes.io/hostname annotation with a common-delimited list of your platform’s UI and API hostnames. Below is the complete ALB Ingress resource, alb-ingress.yaml.

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: demo-ingress
namespace: istio-system
annotations:
kubernetes.io/ingress.class: alb
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/tags: Environment=dev
alb.ingress.kubernetes.io/healthcheck-port: '{{ your_status_port }}'
alb.ingress.kubernetes.io/healthcheck-path: /healthz/ready
alb.ingress.kubernetes.io/healthcheck-protocol: HTTP
alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
alb.ingress.kubernetes.io/actions.ssl-redirect: '{"Type": "redirect", "RedirectConfig": { "Protocol": "HTTPS", "Port": "443", "StatusCode": "HTTP_301"}}'
external-dns.alpha.kubernetes.io/hostname: "{{ your_ui_hostname, your_api_hostname }}"
alb.ingress.kubernetes.io/certificate-arn: "{{ your_ssl_tls_cert_arn }}"
alb.ingress.kubernetes.io/load-balancer-attributes: routing.http2.enabled=true,idle_timeout.timeout_seconds=30
labels:
app: reference-app
spec:
rules:
- http:
paths:
- pathType: Prefix
path: /
backend:
service:
name: ssl-redirect
port:
name: use-annotation
- pathType: Prefix
path: /
backend:
service:
name: istio-ingressgateway
port:
number: 80
- pathType: Prefix
path: /api
backend:
service:
name: istio-ingressgateway
port:
number: 80

To deploy the ALB Ingress resource, alb-ingress.yaml, run the following command:

kubectl apply -f resources/other/alb-ingress.yaml

To confirm the configuration of the AWS Load Balancer Controller and the ingresses ALB ingress controller controls, run the following command:

kubectl describe ingress.networking.k8s.io --all-namespaces

Any misconfigurations should show up as errors in the Events section.

Running the following command should display the public DNS address of the ALB associated with port 80.

kubectl -n istio-system get ingress
NAME           CLASS    HOSTS   ADDRESS                                                                   PORTS   AGE
demo-ingress <none> * k8s-istiosys-demoingr-
...us-east-1.elb.amazonaws.com 80 23m

Use the EC2 Load Balancer Management Console to review the new ALB’s details.

Deploy Fluent Bit

According to a recent AWS Blog post, Fluent Bit Integration in CloudWatch Container Insights for EKS, Fluent Bit is an open-source, multi-platform log processor and forwarder that allows you to collect data and logs from different sources and unify and send them to different destinations, including CloudWatch Logs. Fluent Bit is also fully compatible with Docker and Kubernetes environments. Using the newly launched Fluent Bit DaemonSet, you can send container logs from your EKS clusters to CloudWatch logs for logs storage and analytics.

We will use Fluent Bit to send the reference platform’s logs to Amazon CloudWatch Container Insights. To install Fluent Bit, I have used the procedure outlined in the AWS documentation: Quick Start Setup for Container Insights on Amazon EKS and Kubernetes. I recommend reviewing this documentation for detailed installation instructions.

kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/cloudwatch-namespace.yaml

ClusterName=${CLUSTER_NAME}
RegionName=${EKS_REGION}
FluentBitHttpPort='2020'
FluentBitReadFromHead='Off'
[[ ${FluentBitReadFromHead} = 'On' ]] && FluentBitReadFromTail='Off'|| FluentBitReadFromTail='On'
[[ -z ${FluentBitHttpPort} ]] && FluentBitHttpServer='Off' || FluentBitHttpServer='On'
kubectl create configmap fluent-bit-cluster-info \
--from-literal=cluster.name=${ClusterName} \
--from-literal=http.server=${FluentBitHttpServer} \
--from-literal=http.port=${FluentBitHttpPort} \
--from-literal=read.head=${FluentBitReadFromHead} \
--from-literal=read.tail=${FluentBitReadFromTail} \
--from-literal=logs.region=${RegionName} -n amazon-cloudwatch

kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluent-bit/fluent-bit.yaml

kubectl get pods -n amazon-cloudwatch

DASHBOARD_NAME=istio_observe_demo
REGION_NAME=${EKS_REGION}
CLUSTER_NAME=${CLUSTER_NAME}

curl https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/service/cwagent-prometheus/sample_cloudwatch_dashboards/fluent-bit/cw_dashboard_fluent_bit.json \
| sed "s/{{YOUR_AWS_REGION}}/${REGION_NAME}/g" \
| sed "s/{{YOUR_CLUSTER_NAME}}/${CLUSTER_NAME}/g" \
| xargs -0 aws cloudwatch put-dashboard --dashboard-name ${DASHBOARD_NAME} --dashboard-body

curl https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluentd/fluentd.yaml | kubectl delete -f -
kubectl delete configmap cluster-info -n amazon-cloudwatch

From the EKS Management Console’s Workloads tab, you should see three fluent-bit pods up and running in the amazon-cloudwatch namespace. There is one fluent-bit pod per EKS worker node.

Once the reference application platform is deployed and running, you should be able to visualize the application in the Amazon CloudWatch Container Insights console’s Map view.

Amazon CloudWatch Container Insights UI

The reference platform’s cluster logs will also be available in Amazon CloudWatch. You should have access to individual Log groups for each application’s components.

Lastly, individual pod logs can also be viewed through the Kubernetes Dashboard. The microservice’s log verbosity level is set to info by default. This level can be changed using the LOG_LEVEL environment variable in the service’s Kubernetes Deployment resource.

Deploy ServiceEntry Resources

Using Istio ServiceEntry configurations, you can reach any publicly accessible service from within your Istio cluster. The Istio proxy can be configured to block any host without an HTTP service or service entry defined within the mesh. We will not go to this extreme in the demonstration. However, we will configure ServiceEntry configurations to monitor egress traffic to the reference platform’s two external services, DocumentDB and Amazon MQ.

Confirm the istio-egressgateway is running, then deploy the two ServiceEntry resources you modified earlier.

kubectl get pod -l istio=egressgateway -n istio-system
NAME                                   READY   STATUS    RESTARTS   AGE
istio-egressgateway-585f7668fc-74qtf 1/1 Running 0 14h
kubectl apply -f resources/istio/external-mesh-document-db-internal.yaml
kubectl apply -f resources/istio/external-mesh-amazon-mq-internal.yaml

Deploy the Reference Application Platform

Each of the platform’s components has a file in the project containing both the Kubernetes Service and corresponding Deployment resources.

apiVersion: v1
kind: Service
metadata:
name: service-h
labels:
app: service-h
component: service
spec:
ports:
name: http
port: 8080
selector:
app: service-h
component: service
apiVersion: apps/v1
kind: Deployment
metadata:
name: service-h
labels:
app: service-h
component: service
version: v1
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 1
selector:
matchLabels:
app: service-h
component: service
version: v1
template:
metadata:
labels:
app: service-h
component: service
version: v1
spec:
containers:
name: service-h
image: registry.hub.docker.com/garystafford/go-srv-h:1.6.8
livenessProbe:
httpGet:
path: /api/health
port: 8080
initialDelaySeconds: 3
periodSeconds: 3
env:
name: LOG_LEVEL
value: info
name: MONGO_CONN
valueFrom:
secretKeyRef:
name: go-srv-config
key: mongodb.conn
name: GREETING
value: "Nǐ hǎo (你好), from Service H!"
ports:
containerPort: 8080
imagePullPolicy: Always
view raw service-h.yaml hosted with ❤ by GitHub

Deploy the reference application platform’s frontend UI and eight backend microservices to the EKS cluster using the following commands:

kubectl apply -f ./resources/services/angular-ui.yaml -n dev

for service in a b c d e f g h; do
kubectl apply -f "./resources/services/service-$service.yaml" -n dev
done

From the EKS Management Console’s Workloads tab, you should observe that the three pods for each reference application platform component are up and running in the dev namespace.

You can also use the Kubernetes Dashboard to confirm that the deployments were successful to the dev namespace.

Test the Platform

You want to ensure the platform’s web-based UI is reachable via the AWS Application Load Balancer to EKS through Istio and to the UI’s FQDN (fully qualified domain name) of angular-ui.dev.svc.cluster.local. You want to ensure the platform’s eight microservices are communicating with each other and communicating with the external DocumentDB cluster and Amazon MQ RabbitMQ broker. The easiest way to test the cluster is by viewing the Angular UI in a web browser. For example, in my case, https://observe-ui.example-api.com.

Reference Application Platform’s Angular-based UI

The UI requires you to input the hostname of the backend, which is the edge service, Service A. For example, in my case, https://observe-api.example-api.com. Since you want to use your own hostname and the UI’s JavaScript code is running locally in your web browser, this option allows you to provide your own hostname. This is the same hostname you inserted into the Istio VirtualService for Service A. This hostname routes the API calls to the FQDN of Service A running in the dev namespace, service-a.dev.svc.cluster.local. You should observe seven greeting responses displayed in the UI, all but Service F.

You can also use tools like Postman to test the backend directly, using the same hostname of the backend, as above.

Using Postman to make requests against the /greeting endpoint of Service A
Using Postman to make requests against the /request-echo endpoint of Service A

Load Testing with Hey

You can also use performance testing tools to load-test the platform. Many issues will not show up until the platform is placed under elevated load. I recently tried hey, a modern go-based load generator tool as a replacement for Apache Bench (ab), Unlike ab, hey supports HTTP/2 endpoints, which is required to test the platform on EKS with Istio. You can install hey with Homebrew.

brew install hey

Using hey, you can test the reference application platform by hitting the API hostname and /api/greeting endpoint. The command below generates 1,000 requests, simulates 25 concurrent users, and uses HTTP/2. Traffic will be generated across all the services, the RabbitMQ broker, and the DocumentDB databases.

hey -n 1000 -c 25 -h2 {{ your_api_hostname }}/api/greeting

The results show 1,000 successful HTTP 200 responses from the reference platform’s API in about 43 seconds with an average response time of 1.0430 seconds.

To generate a consistent level of traffic over a longer period of time, try this variation of the command:

hey -n 25000 -c 25 -q 1 -h2 {{ your_api_hostname }}/api/greeting

This command generates a steady stream of traffic for about 18 minutes, making it more convenient when exploring and troubleshooting your observability tools.

Part Two

In part two of this post, we will explore each observability tool and see how they can help us manage the reference application platform running on the EKS cluster.

Jaeger UI showing a distributed trace of Service A

To tear down the EKS cluster, DocumentDB cluster, and Amazon MQ broker, use the commands below.

# EKS cluster
eksctl delete cluster --name $CLUSTER_NAME

# Amazon MQ
aws mq list-brokers | jq -r '.BrokerSummaries[] | .BrokerId'
aws mq delete-broker --broker-id {{ your_broker_id }}
# DocumentDB
aws docdb describe-db-clusters \
| jq -r '.DBClusters[] | .DbClusterResourceId'
aws docdb delete-db-cluster \
--db-cluster-identifier {{ your_cluster_id }}

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Eventual Consistency with Spring for Apache Kafka: Part 2 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

As discussed in Part One of this post, given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components in a local development environment running on Kubernetes with Istio, using minikube. For simplicity’s sake, we will only run a single instance of each service. Additionally, we are not implementing custom domain names, TLS/HTTPS, authentication and authorization, API keys, or restricting access to any sensitive operational API endpoints or ports, all of which we would certainly do in an actual production environment.

To provide operational visibility, we will add Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo ExpressKialiPrometheus, and Grafana to our system.

View of Storefront API traffic from Kiali

Prerequisites

This post will assume a basic level of knowledge of Kubernetes, minikube, Docker, and Istio. Furthermore, the post assumes you have already installed recent versions of minikube, kubectl, Docker, and Istio. Meaning, that the kubectl, istioctl, docker, and minikube commands are all available from the terminal.

Currently installed version of the required applications

For this post demonstration, I am using an Apple MacBook Pro running macOS as my development machine. I have the latest versions of Docker Desktop, minikube, kubectl, and Istio installed as of May 2021.

Source Code

The source code for this post is open-source and is publicly available on GitHub. Clone the GitHub project using the following command:

clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Minikube

Part of the Kubernetes project, minikube is local Kubernetes, focusing on making it easy to learn and develop for Kubernetes. Minikube quickly sets up a local Kubernetes cluster on macOS, Linux, and Windows. Given the number of Kubernetes resources we will be deploying to minikube, I would recommend at least 3 CPUs and 4–5 GBs of memory. If you choose to deploy multiple observability tools, you may want to increase both of these resources if you can afford it. I maxed out both CPUs and memory several times while setting up this demonstration, causing temporary lock-ups of minikube.

minikube --cpus 3 --memory 5g --driver=docker start start

The Docker driver allows you to install Kubernetes into an existing Docker install. If you are using Docker, please be aware that you must have at least an equivalent amount of resources allocated to Docker to apportion to minikube.

Before continuing, confirm minikube is up and running and confirm the current context of kubectl is minikube.

minikube status
kubectl config current-context

The statuses should look similar to the following:

Use the eval below command to point your shell to minikube’s docker-daemon. You can confirm this by using the docker image ls and docker container ls command to view running Kubernetes containers on minikube.

eval $(minikube -p minikube docker-env)
docker image ls
docker container ls

The output should look similar to the following:

You can also check the status of minikube from Docker Desktop. Minikube is running as a container, instantiated from a Docker image, gcr.io/k8s-minikube/kicbase. View the container’s Stats, as shown below.

Istio

Assuming you have downloaded and configured Istio, install it onto minikube. I currently have Istio 1.10.0 installed and have theISTIO_HOME environment variable set in my Oh My Zsh .zshrc file. I have also set Istio’s bin/ subdirectory in my PATH environment variable. The bin/ subdirectory contains the istioctl executable.

echo $ISTIO_HOME                                                                
> /Applications/Istio/istio-1.10.0
where istioctl
> /Applications/Istio/istio-1.10.0/bin/istioctl
istioctl version

> client version: 1.10.0
control plane version: 1.10.0
data plane version: 1.10.0 (4 proxies)

Istio comes with several built-in configuration profiles. The profiles provide customization of the Istio control plane and of the sidecars for the Istio data plane.

istioctl profile list
> Istio configuration profiles:
default
demo
empty
external
minimal
openshift
preview
remote

For this demonstration, we will use the default profile, which installs istiod and an istio-ingressgateway. We will not require the use of an istio-egressgateway, since all components will be installed locally on minikube.

istioctl install --set profile=default -y
> ✔ Istio core installed
✔ Istiod installed
✔ Ingress gateways installed
✔ Installation complete

Minikube Tunnel

kubectl get svc istio-ingressgateway -n istio-system

To associate an IP address, run the minikube tunnel command in a separate terminal tab. Since it requires opening privileged ports 80 and 443 to be exposed, this command will prompt you for your sudo password.

Services of the type LoadBalancer can be exposed by using the minikube tunnel command. It must be run in a separate terminal window to keep the LoadBalancer running. We previously created the istio-ingressgateway. Run the following command and note that the status of EXTERNAL-IP is <pending>. There is currently no external IP address associated with our LoadBalancer.

minikube tunnel

Rerun the previous command. There should now be an external IP address associated with the LoadBalancer. In my case, 127.0.0.1.

kubectl get svc istio-ingressgateway -n istio-system

The external IP address shown is the address we will use to access the resources we chose to expose externally on minikube.

Minikube Dashboard

Once again, in a separate terminal tab, open the Minikube Dashboard (aka Kubernetes Dashboard).

minikube dashboard

The dashboard will give you a visual overview of all your installed Kubernetes components.

Minikube Dashboard showing the istio-system namespace

Namespaces

Kubernetes supports multiple virtual clusters backed by the same physical cluster. These virtual clusters are called namespaces. For this demonstration, we will use four namespaces to organize our deployed resources: dev, mongo, kafka, and storefront-kafka-project. The dev namespace is where we will deploy our Storefront API’s microservices: accounts, orders, and fulfillment. We will deploy MongoDB and Mongo Express to the mongo namespace. Lastly, we will use the kafka and storefront-kafka-project namespaces to deploy Apache Kafka to minikube using Strimzi, a Cloud Native Computing Foundation sandbox project, and CMAK.

kubectl apply -f ./minikube/resources/namespaces.yaml

Automatic Sidecar Injection

In order to take advantage of all of Istio’s features, pods in the mesh must be running an Istio sidecar proxy. When you set the istio-injection=enabled label on a namespace and the injection webhook is enabled, any new pods created in that namespace will automatically have a sidecar added to them. Labeling the dev namespace for automatic sidecar injection ensures that our Storefront API’s microservices — accounts, orders, and fulfillment— will have Istio sidecar proxy automatically injected into their pods.

kubectl label namespace dev istio-injection=enabled

MongoDB

Next, deploy MongoDB and Mongo Express to the mongo namespace on minikube. To ensure a successful connection to MongoDB from Mongo Express, I suggest giving MongoDB a chance to start up fully before deploying Mongo Express.

kubectl apply -f ./minikube/resources/mongodb.yaml -n mongo
sleep 60
kubectl apply -f ./minikube/resources/mongo-express.yaml -n mongo

To confirm the success of the deployments, use the following command:

kubectl get services -n mongo

Or use the Kubernetes Dashboard to confirm deployments.

Mongo Express UI Access

For parts of your application (for example, frontends) you may want to expose a Service onto an external IP address outside of your cluster. Kubernetes ServiceTypes allows you to specify what kind of Service you want; the default is ClusterIP.

Note that while MongoDB uses the ClusterIP, Mongo Express uses NodePort. With NodePort, the Service is exposed on each Node’s IP at a static port (the NodePort). You can contact the NodePort Service, from outside the cluster, by requesting <NodeIP>:<NodePort>.

In a separate terminal tab, open Mongo Express using the following command:

minikube service --url mongo-express -n mongo

You should see output similar to the following:

Click on the link to open Mongo Express. There should already be three MongoDB operational databases shown in the UI. The three Storefront databases and collections will be created automatically, later in the post: accounts, orders, and fulfillment.

Apache Kafka using Strimzi

Next, we will install Apache Kafka and Apache Zookeeper into the kafka and storefront-kafka-project namespaces on minikube, using Strimzi. Since Strimzi has a great, easy-to-use Quick Start guide, I will not detail the complete install complete process in this post. I suggest using their guide to understand the process and what each command does. Then, use the slightly modified Strimzi commands I have included below to install Kafka and Zookeeper.

# assuming 0.23.0 is latest version available
curl -L -O https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.23.0/strimzi-0.23.0.zip
unzip strimzi-0.23.0.zip
cd strimzi-0.23.0
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
# manually change STRIMZI_NAMESPACE value to storefront-kafka-project
nano install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
kubectl create -f install/cluster-operator/ -n kafka
kubectl create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-cluster.yaml -n storefront-kafka-project
kubectl wait kafka/kafka-cluster --for=condition=Ready --timeout=300s -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-topics.yaml -n storefront-kafka-project

Zoo Entrance

We want to install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. However, CMAK required access to Zookeeper. You can not access Strimzi’s Zookeeper directly from CMAK; this is intentional to avoid performance and security issues. See this GitHub issue for a better explanation of why. We will use the appropriately named Zoo Entrance as a proxy for CMAK to Zookeeper to overcome this challenge.

To install Zoo Entrance, review the GitHub project’s install guide, then use the following commands:

git clone https://github.com/scholzj/zoo-entrance.git
cd zoo-entrance
# optional: change my-cluster to kafka-cluster
sed -i '' 's/my-cluster/kafka-cluster/' deploy.yaml
kubectl apply -f deploy.yaml -n storefront-kafka-project

Cluster Manager for Apache Kafka

Next, install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. Run the following command to deploy CMAK into the storefront-kafka-project namespace.

kubectl apply -f ./minikube/resources/cmak.yaml -n storefront-kafka-project

Similar to Mongo Express, we can access CMAK’s UI using its NodePort. In a separate terminal tab, run the following command:

minikube service --url cmak -n storefront-kafka-project

You should see output similar to Mongo Express. Click on the link provided to access CMAK. Choose ‘Add Cluster’ in CMAK to add our existing Kafka cluster to CMAK’s management interface. Use Zoo Enterence’s service address for the Cluster Zookeeper Hosts value.

zoo-entrance.storefront-kafka-project.svc:2181

Once complete, you should see the three Kafka topics we created previously with Strimzi: accounts.customer.change, fulfillment.order.change, and orders.order.change. Each topic will have three partitions, one replica, and one broker. You should also see the _consumer_offsets topic that Kafka uses to store information about committed offsets for each topic:partition per group of consumers (groupID).

Storefront API Microservices

We are finally ready to install our Storefront API’s microservices into the dev namespace. Each service is preconfigured to access Kafka and MongoDB in their respective namespaces.

kubectl apply -f ./minikube/resources/accounts.yaml -n dev
kubectl apply -f ./minikube/resources/orders.yaml -n dev
kubectl apply -f ./minikube/resources/fulfillment.yaml -n dev

Spring Boot services usually take about two minutes to fully start. The time required to download the Docker Images from docker.com and the start-up time means it could take 3–4 minutes for each of the three services to be ready to accept API traffic.

Istio Components

We want to be able to access our Storefront API’s microservices through our Kubernetes LoadBalancer, while also leveraging all the capabilities of Istio as a service mesh. To do so, we need to deploy an Istio Gateway and a VirtualService. We will also need to deploy DestinationRule resources. A Gateway describes a load balancer operating at the edge of the mesh receiving incoming or outgoing HTTP/TCP connections. A VirtualService defines a set of traffic routing rules to apply when a host is addressed. Lastly, a DestinationRule defines policies that apply to traffic intended for a Service after routing has occurred.

kubectl apply -f ./minikube/resources/destination_rules.yaml -n dev
kubectl apply -f ./minikube/resources/istio-gateway.yaml -n dev

Testing the System and Creating Sample Data

I have provided a Python 3 script that runs a series of seven HTTP GET requests, in a specific order, against the Storefront API. These calls will validate the deployments, confirm the API’s services can access Kafka and MongoDB, generate some initial data, and automatically create the MongoDB database collections from the initial Insert statements.

python3 -m pip install -r ./utility_scripts/requirements.txt -U
python3 ./utility_scripts/refresh.py

The script’s output should be as follows:

If we now look at Mongo Express, we should note three new databases: accounts, orders, and fulfillment.

Observability Tools

Istio makes it easy to integrate with a number of common tools, including cert-managerPrometheusGrafanaKialiZipkin, and Jaeger. In order to better observe our Storefront API, we will install three well-known observability tools: Kiali, Prometheus, and Grafana. Luckily, these tools are all included with Istio. You can install any or all of these to minikube. I suggest installing the tools one at a time as not to overwhelm minikube’s CPU and memory resources.

kubectl apply -f ./minikube/resources/prometheus.yaml

kubectl apply -f $ISTIO_HOME/samples/addons/grafana.yaml

kubectl apply -f $ISTIO_HOME/samples/addons/kiali.yaml

Once deployment is complete, to access any of the UI’s for these tools, use the istioctl dashboard command from a new terminal window:

istioctl dashboard kiali

istioctl dashboard prometheus

istioctl dashboard grafana

Kiali

Below we see a view of Kiali with API traffic flowing to Kafka and MongoDB.

View of Storefront API traffic from Kiali

Prometheus

Each of the three Storefront API microservices has a dependency on Micrometer; specifically, a dependency on micrometer-registry-prometheus. As an instrumentation facade, Micrometer allows you to instrument your code with dimensional metrics with a vendor-neutral interface and decide on the monitoring system as a last step. Instrumenting your core library code with Micrometer allows the libraries to be included in applications that ship metrics to different backends. Given the Micrometer Prometheus dependency, each microservice exposes a /prometheus endpoint (e.g., http://127.0.0.1/accounts/actuator/prometheus) as shown below in Postman.

The /prometheus endpoint exposes dozens of useful metrics and is configured to be scraped by Prometheus. These metrics can be displayed in Prometheus and indirectly in Grafana dashboards via Prometheus. I have customized Istio’s version of Prometheus and included it in the project (prometheus.yaml), which now scrapes the Storefront API’s metrics.

scrape_configs:
- job_name: 'spring_micrometer'
metrics_path: '/actuator/prometheus'
scrape_interval: 5s
static_configs:
- targets: ['accounts.dev:8080','orders.dev:8080','fulfillment.dev:8080']

Here we see an example graph of a Spring Kafka Listener metric, spring_kafka_listener_seconds_sum, in Prometheus. There are dozens of metrics exposed to Prometheus from our system that we can observe and alert on.

Grafana

Lastly, here is an example Spring Boot Dashboard in Grafana. More dashboards are available on Grafana’s community dashboard page. The Grafana dashboard uses Prometheus as the source of its metrics data.

Storefront API Endpoints

The three storefront services are fully functional Spring Boot, Spring Data REST, Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. To better understand the Storefront API, each Spring Boot microservice uses SpringFox, which produces automated JSON API documentation for APIs built with Spring. The service builds also include the springfox-swagger-ui web jar, which ships with Swagger UI. Swagger takes the manual work out of API documentation, with a range of solutions for generating, visualizing, and maintaining API docs.

From a web browser, you can use the /swagger-ui/ subdirectory/subpath with any of the three microservices to access the fully-featured Swagger UI (e.g., http://127.0.0.1/accounts/swagger-ui/).

Accounts service Customer entity endpoints

Each service’s data model (POJOs) is also exposed through the Swagger UI.

Accounts service data model

Spring Boot Actuator

Additionally, each service includes Spring Boot Actuator. The Actuator exposes additional operational endpoints, allowing us to observe the running services. With Actuator, you get many features, including access to available operational-oriented endpoints, using the /actuator/ subdirectory/subpath (e.g., http://127.0.0.1/accounts/actuator/). For this demonstration, I have not restricted access to any available Actuator endpoints.

Partial list of Spring Boot Actuator endpoints as seen using Swagger
Partial list of Spring Boot Actuator endpoints as seen using Postman

Conclusion

In this two-part post, we learned how to build an API using Spring Boot. We ensured the API’s distributed data integrity using a pub/sub model with Spring for Apache Kafka Project. When a relevant piece of data was changed by one microservice, that state change triggered a state change event that was shared with other microservices using Kafka topics.

We also learned how to deploy and run the API in a local development environment running on Kubernetes with Istio, using minikube. We have added production-tested observability tools to provide operational visibility, including CMAK, Mongo Express, Kiali, Prometheus, and Grafana.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment

Eventual Consistency with Spring for Apache Kafka: Part 1 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

Given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Introduction

Apache Kafka is an open-source distributed event streaming platform capable of handling trillions of messages. According to Confluent, initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform.

Eventual consistency, according to Wikipedia, is a consistency model used in distributed computing to achieve high availability that informally guarantees that if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. I previously covered the topic of eventual consistency in a distributed system using RabbitMQ in the May 2017 post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. The post was featured on Pivotal’s RabbitMQ website.

Domain-driven Design

To ground the discussion, let’s examine a common example — an online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

Given this problem domain, we can assume we have the concept of a Customer. Further, we can assume the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of the Customer will require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program and online shopping activity. Fulfillment may maintain a record of all orders being shipped to the customer. Security likely holds the customer’s access credentials, account access history, and privacy settings.

Below are the Customer data objects are shown in yellow. Orange represents the logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts or even between services within the same context, then we must ensure data consistency. Take, for example, the case where a customer changes their home address or email. Let us assume that the Accounting context is the system of record for these data fields. However, to fulfill orders, the Shipping context might also need to maintain the customer’s current home address. Likewise, the Marketing context, responsible for opt-in email advertising, also needs to be aware of the email change and update its customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change without expecting a response. They are stating a fact, not asking a question. Interested parties can choose if and how to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, defined by Martin Fowler of ThoughtWorks in his insightful post, What do you mean by “Event-Driven”?. Changes to a piece of data can be thought of as a state change event — events that contain details of the data that changed. Coincidentally, Fowler uses a customer’s address change as an example of Event-Carried State Transfer in the post. Fellow former ThoughtWorker Graham Brooks also detailed the concept in his post, Event-Carried State Transfer Pattern.

Consistency Strategies

Multiple architectural approaches can be taken to solve for data consistency in a distributed system. For example, you could use a single relational database with shared schemas to persist data, avoiding the distributed data model altogether. However, it could be argued that using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages persist in Kafka, the service has the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity to the system.

In this post’s somewhat simplistic architecture, the business microservices will maintain consistency across their respective domains by producing and consuming messages from multiple Kafka topics to which they are subscribed. Kafka Producers may also be Consumers within our domain.

Storefront Example

In this post, our online storefront API will be built in Java using Spring Boot and OpenJDK 16. We will ensure the uniformity of distributed data by using a publish/subscribe model with Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot microservice, if appropriate, that state change will trigger a state change event, which will be shared with other microservices using Kafka topics.

View of the Storefront API from Kiali

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from one another while still ensuring the data is distributed.

Given the use case of placing an order, we will examine the interactions of three services that compose our storefront API: the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other in a completely decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at three event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service.

Below is a view of the online storefront through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you an idea of where Kafka and Zookeeper, Kafka’s current cluster manager, might sit in a typical, highly-available, microservice-based, distributed application platform.

This post will focus on the storefront’s backend API — its services, databases, and messaging sub-systems.

Storefront Microservices

We will explore the functionality of each of the three microservices and how they share state change events using Kafka 2.8. Each storefront API service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream are not part of this post.

Source Code

The storefront’s microservices source code is publicly available on GitHub. The four GitHub projects can be cloned using the following commands:

git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-accounts.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-orders.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-fulfillment.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. Below we see the representation of a Customer, as a BSON document in the customer.accounts MongoDB database collection.

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}
view raw customer.orders.bson hosted with ❤ by GitHub

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

Since the CustomerChangeEvent domain event object does not persist in MongoDB, we can look at its JSON message payload in Kafka to examine its structure. Note the differences in the data structure (schema) between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload.

{
"id": "5b189af9a8d05613315b0212",
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}, {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}]
}

For simplicity, we will assume that other services do not make changes to the customer’s name, contact information, or addresses — this is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub. Use the latest 2021-istio branch of the project.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows:

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}
view raw customer.orders.bson hosted with ❤ by GitHub

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. The FulfillmentRequestEvent object only contains the information it needs to share. Our example shares a single Order, along with the customer’s name, contact information, and shipping address.

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at its JSON message payload in Kafka. Again, note the schema differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka.

{
"timestamp": 1528334218821,
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"address": {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
"order": {
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvents": [{
"timestamp": 1528333926586,
"orderStatusType": "CREATED",
"note": null
}, {
"timestamp": 1528333926586,
"orderStatusType": "APPROVED",
"note": null
}],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": 11.99
},
"quantity": 5
}]
}
}

Source code for the Orders service is available on GitHub. Use the latest 2021-istio branch of the project.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store the latest shipping status, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping address are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object represented as a BSON document in the fulfillment.requests database collection looks as follows:

{
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"),
"timestamp": NumberLong("1528553706260"),
"name": {
"title": "Ms.",
"firstName": "Susan",
"lastName": "Blackstone"
},
"contact": {
"primaryPhone": "433-544-6555",
"secondaryPhone": "223-445-6767",
"email": "susan.m.blackstone@emailisus.com"
},
"address": {
"type": "SHIPPING",
"description": "Home Sweet Home",
"address1": "33 Oak Avenue",
"city": "Nowhere",
"state": "VT",
"postalCode": "444556-9090"
},
"order": {
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44",
"orderStatusEvents": [{
"timestamp": NumberLong("1528558453686"),
"orderStatusType": "RECEIVED"
}],
"orderItems": [{
"product": {
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa",
"title": "Purple Widget"
},
"quantity": 2
},
{
"product": {
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897",
"title": "Blue Widget"
},
"quantity": 5
},
{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget"
},
"quantity": 2
}
]
},
"shippingMethod": "Drone",
"_class": "com.storefront.model.Fulfillment"
}

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, again, we can again look at its JSON message payload in Kafka.

{
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvent": {
"timestamp": 1528334452746,
"orderStatusType": "PROCESSING",
"note": null
}
}

Source code for the Fulfillment service is available on GitHub. Use the latest 2021-istio branch of the project.

State Change Event Messaging Flows

There are three state change event messaging flows illustrated in this post.

  1. Changes to a Customer triggers an event message produced by the Accounts service, which is published on the accounts.customer.change Kafka topic and consumed by the Orders service;
  2. Order Approved triggers an event message produced by the Orders service, which is published on the orders.order.fulfill Kafka topic, and is consumed by the Fulfillment service;
  3. Changes to the status of an Order triggers an event message produced by the Fulfillment Service, which is published on the fulfillment.order.change Kafka topic, and is consumed by the Orders service;

Each of these state change event messaging flows follows the same architectural pattern on both the Kafka topic’s producer and consumer sides.

Let us examine each state change event messaging flow and the code behind it.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. By way of Kafka, it can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information.

There are different methods to trigger a message to be sent to Kafka. For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity.

@Slf4j
@Controller
public class AfterSaveListener extends AbstractMongoEventListener<Customer> {
@Value("${spring.kafka.topic.accounts-customer}")
private String topic;
private Sender sender;
@Autowired
public AfterSaveListener(Sender sender) {
this.sender = sender;
}
@Override
public void onAfterSave(AfterSaveEvent<Customer> event) {
log.info("onAfterSave event='{}'", event);
Customer customer = event.getSource();
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent();
customerChangeEvent.setId(customer.getId());
customerChangeEvent.setName(customer.getName());
customerChangeEvent.setContact(customer.getContact());
customerChangeEvent.setAddresses(customer.getAddresses());
sender.send(topic, customerChangeEvent);
}
}

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class.

The SenderConfig class handles the configuration of the Sender. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender uses a KafkaTemplate to send the message to the accounts.customer.change Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

The Orders service’s Receiver class consumes the CustomerChangeEvent messages produced by the Accounts service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Orders service’s Receiver class is configured differently compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as input, denoting the object type into which the message payload needs to be deserialized. This way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file in each service’s resources directory contains the Kafka configuration (lines 11–19).

server:
port: 8080
spring:
main:
allow-bean-definition-overriding: true
application:
name: orders
data:
mongodb:
uri: mongodb://mongo:27017/orders
kafka:
bootstrap-servers: kafka:9092
topic:
accounts-customer: accounts.customer.change
orders-order: orders.order.fulfill
fulfillment-order: fulfillment.order.change
consumer:
group-id: orders
auto-offset-reset: earliest
zipkin:
sender:
type: kafka
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: INFO
spring:
config:
activate:
on-profile: local
data:
mongodb:
uri: mongodb://localhost:27017/orders
kafka:
bootstrap-servers: localhost:9092
server:
port: 8090
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
spring:
config:
activate:
on-profile: confluent
server:
port: 8080
logging:
level:
root: INFO
server:
port: 8080
spring:
config:
activate:
on-profile: minikube
data:
mongodb:
uri: mongodb://mongo.dev:27017/orders
kafka:
bootstrap-servers: kafka-cluster.dev:9092
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
view raw application.yaml hosted with ❤ by GitHub

Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the orders.order.fulfill Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate;
public void send(String topic, FulfillmentRequestEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender class uses a KafkaTemplate to send the message to the orders.order.fulfill Kafka topic, as shown below. Since message order is not critical, messages can be sent to a topic with multiple partitions if the volume of messages required it.

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. The Fulfillment object includes the order to be fulfilled and the customer’s contact and shipping information.

@Slf4j
@Component
public class Receiver {
@Autowired
private FulfillmentRepository fulfillmentRepository;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.orders-order}")
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) {
log.info("received payload='{}'", fulfillmentRequestEvent.toString());
latch.countDown();
Fulfillment fulfillment = new Fulfillment();
fulfillment.setId(fulfillmentRequestEvent.getId());
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp());
fulfillment.setName(fulfillmentRequestEvent.getName());
fulfillment.setContact(fulfillmentRequestEvent.getContact());
fulfillment.setAddress(fulfillmentRequestEvent.getAddress());
fulfillment.setOrder(fulfillmentRequestEvent.getOrder());
fulfillmentRepository.save(fulfillment);
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(FulfillmentRequestEvent.class));
}
@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Fulfillment Order Status State Change

When the Order status in a Fulfillment entity is changed to anything other than Approved, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial Created status to the final Received status.

The Fulfillment service exposes several endpoints via the FulfillmentController class, which simulates a change in order status. They allow an order’s status to be changed from Approved to Processing, to Shipped, to In Transit, and finally to Received. This change applies to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates a Kafka message, containing the OrderStatusChangeEvent in the message payload. The Fulfillment service’s Sender class handles this.

Note in this example that these two events are not handled in an atomic transaction. Either updating the database or sending the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure that both these independent actions succeed or fail as a single transaction to ensure data consistency, using any of a handful of common architectural patterns.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;
public void send(String topic, OrderStatusChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender class uses a KafkaTemplate to send the message to the fulfillment.order.change Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages can be sent to a topic with multiple partitions if the volume of messages requires it.

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message produced by the Fulfillment service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service receives messages from more than one topic. The ReceiverConfig class deserializes all messages using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory class’s setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components into a local development environment running on Kubernetes with Istio, using Minikube. To provide operational visibility, we will add observability tools, like Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo Express, Kiali, Prometheus, and Grafana to our system.

View of the Storefront API from Kiali

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Cross-Account Amazon Elastic Container Registry (ECR) Access for ECS

Deploying containerized applications on Amazon ECS using cross-account elastic container registries

This is an updated version of a post, originally published in October 2019. This post uses AWS CLI version 2 and contains updated versions of all Docker images.

Introduction

There are two scenarios I frequently encounter that require sharing Amazon Elastic Container Registry (ECR)-based Docker images across multiple AWS Accounts. In the first scenario, a vendor wants to share a Docker image with their customer, stored in the vendor’s private container registry. Many popular container security and observability solutions function in this manner.

Below, we see an example of an application consisting of three containers. Two of the container images originated from the customer’s own ECR repositories (right side). The third image originated from their vendor’s ECR repository (left side).

In the second scenario, an enterprise operates multiple AWS accounts to create logical security boundaries between environments and responsibilities. The first AWS account contains the enterprise’s deployable assets, including their ECR image repositories. The enterprise has additional accounts, such as Development, Test, Staging, and Production, for each Software Development Life Cycle (SDLC) phase. The ECR images in the repository account need to be accessed from multiple AWS accounts and often across different AWS Regions for deployment.

Below, we see an example of a deployed application also consisting of three containers. All the container images originated from the ECR repositories account (left side). The images were pulled into the Production account during deployment to ECS (right side).

This post will explore the first scenario — a vendor who wants to share a private Docker image with their customer securely. The post will demonstrate how to share images across AWS accounts for use with Docker Swarm and Amazon Elastic Container Service (ECS) with AWS Fargate, both using ECR Repository Policies.

For the demonstration, we will use an existing application I have created, a RESTful, HTTP-based NLP (Natural Language Processing) API, consisting of four Golang microservices. The edge service, nlp-client, communicates with the rake-app, lang-app, and prose-app services. There is a fifth service, dyanmo-app, which is not discussed in this post, but easily added to the API.

A customer has developed the nlp-client, lang-app, and prose-app container-based microservices as part of their NLP application in the post’s scenario. Instead of developing their own implementation of the RAKE (Rapid Automatic Keyword Extraction) algorithm, they have licensed a version from a vendor. The vendor’s rake-app service is delivered in the form of a licensed Docker image. The acronym ISV (Independent Software Vendor) is used to represent the vendor throughout the code.

The NPL API exposes several endpoints accessible through the nlp-client service. The endpoints perform common NLP operations on text input, such as extracting keywords, tokens, entities, and sentences, and determining the language. All the endpoints can be listed using the /routes endpoint.

[
{
"method": "GET",
"path": "/error",
"name": "main.getError"
},
{
"method": "POST",
"path": "/keywords",
"name": "main.getKeywords"
},
{
"method": "POST",
"path": "/language",
"name": "main.getLanguage"
},
{
"method": "GET",
"path": "/health",
"name": "main.getHealth"
},
{
"method": "GET",
"path": "/health/:app",
"name": "main.getHealthUpstream"
},
{
"method": "GET",
"path": "/routes",
"name": "main.getRoutes"
},
{
"method": "POST",
"path": "/tokens",
"name": "main.getTokens"
},
{
"method": "POST",
"path": "/entities",
"name": "main.getEntities"
},
{
"method": "POST",
"path": "/sentences",
"name": "main.getSentences"
}
]

Requirements

To follow along with the post’s demonstration, you will need two AWS accounts, one representing the vendor and one representing one of their customers. It is relatively simple to create additional AWS accounts — all you need is a unique email address (easy with Gmail) and a credit card. Using AWS Organizations can make the task of creating and managing multiple accounts even easier.

I have intentionally used different AWS Regions to demonstrate how you can share ECR images across both AWS accounts and regions. You will need a current version of the AWS CLI version 2 and of Docker. Lastly, you will need adequate access to each AWS account to create resources.

Source Code

The demonstration’s source code is contained in five public GitHub repositories.

git clone --branch v2.0.0 \
    --single-branch --depth 1 \
    https://github.com/garystafford/ecr-cross-account-demo.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/nlp-client.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/prose-app.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/rake-app.git
git clone --branch master \
    --single-branch --depth 1 \
    https://github.com/garystafford/lang-app.git

The v2.0.0 branch of the ecr-cross-account-demo GitHub repository contains all the CloudFormation templates and the Docker Compose Stack file.

.
├── LICENSE
├── README.md
├── cfn-templates
│ ├── development-user-group-customer.yml
│ ├── development-user-group-isv.yml
│ ├── ecr-repo-not-shared.yml
│ ├── ecr-repo-shared.yml
│ ├── public-subnet-public-loadbalancer.yml
│ └── public-vpc.yml
└── docker
└── stack.yml

Each of the other four GitHub repositories, such as the nlp-client repository, contains a Golang-based microservice, which together comprises the NLP API. Each repository also contains a Dockerfile.

.
├── Dockerfile
├── LICENSE
├── README.md
├── buildspec.yml
├── go.mod
├── go.sum
└── main.go

We will use AWS CloudFormation to create the necessary resources within both AWS accounts. We will also use CloudFormation to create an ECS Cluster and an Amazon ECS Task Definition for the customer account. Task Definition defines how ECS will deploy the application, consisting of four Docker containers, using AWS Fargate. In addition to ECS, we will create an Amazon Virtual Private Cloud (VPC) to house the ECS cluster and a public-facing, Layer 7 Application Load Balancer (ALB) to load-balance our ECS-based application.

Creating ECR Repositories

In the first AWS account, representing the vendor, we will execute two CloudFormation templates. The first template, development-user-group-isv.yml, creates the Development group and VendorDev user. The VendorDev user will be given explicit access to the vendor’s rake-app ECR repository. Change the DevUserPassword parameter’s value to something more secure.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export IAM_USER_PSWD=T0pS3cr3Tpa55w0rD
aws --region ${ISV_ECR_REGION} cloudformation create-stack \
--stack-name development-user-group-isv \
--template-body file://cfn-templates/development-user-group-isv.yml \
--parameters \
ParameterKey=DevUserPassword,ParameterValue=${IAM_USER_PSWD} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting CloudFormation Stack showing the new IAM Group and User.

Next, we will execute the second CloudFormation template, ecr-repo-shared.yml, which creates the vendor’s rake-app ECR image repository. The rake-app repository will house a copy of the vendor’s rake-app Docker Image. But first, let’s look at the CloudFormation template used to create the repository, specifically the RepositoryPolicyText section. Here we define two repository policies:

  • The AllowPushPull policy explicitly allows the VendorDev user to push and pull versions of the image to the ECR repository. We import the exported Amazon Resource Name (ARN) of the VendorDev user from the previous CloudFormation Stack Outputs. We have also allowed AWS CodeBuild service access to the ECR repository. This is known as a Service-Linked Role. We will not use CodeBuild in this brief post.
  • The AllowPull policy allows anyone in the customer’s AWS account (root) to pull any version of the image. They cannot push, only pull. Cross-account access can be restricted to a finer-grained set of the specific customer’s IAM Entities and source IP addresses.

Note the "ecr:GetAuthorizationToken" policy Action. This action will allow the customer’s user to log into the vendor’s ECR repository and receive an Authorization Token. The customer retrieves a token that is valid for a specified container registry for 12 hours.

RepositoryPolicyText:
Version: '2012-10-17'
Statement:
- Sid: AllowPushPull
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
AWS:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'DevUserArn']]
Action:
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:BatchGetImage'
- 'ecr:CompleteLayerUpload'
- 'ecr:DescribeImages'
- 'ecr:DescribeRepositories'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:GetRepositoryPolicy'
- 'ecr:InitiateLayerUpload'
- 'ecr:ListImages'
- 'ecr:PutImage'
- 'ecr:UploadLayerPart'
- Sid: AllowPull
Effect: Allow
Principal:
AWS: !Join [':', ['arn:aws:iam:', !Ref 'CustomerAccount', 'root']]
Action:
- 'ecr:GetAuthorizationToken'
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:BatchGetImage'
- 'ecr:DescribeRepositories' # optional permission
- 'ecr:DescribeImages' # optional permission

Before executing the following command to deploy the CloudFormation Stack, ecr-repo-shared.yml, replace the CUSTOMER_ACCOUNT value with your pseudo customer’s AWS account ID.

# change me
export CUSTOMER_ACCOUNT=999888777666
export CUSTOMER_ECR_REGION=us-west-2
# NLP Rake Microservice
REPO_NAME=rake-app
aws --region ${ISV_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-shared.yml \
--parameters \
ParameterKey=CustomerAccount,ParameterValue=${CUSTOMER_ACCOUNT} \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting CloudFormation Stack showing the new ECR repository.

Below, we see the ECR repository policies applied correctly in the Permissions tab of the rake-app repository. The first policy covers both the VendorDev user, referred to as an IAM Entity, as well as AWS CodeBuild, referred to as a Service Principal.

The second policy covers the customer’s AWS account ID.

Repeat this process in the customer’s AWS account. First, the CloudFormation template, development-user-group-customer.yml, containing the Development group and CustomerDev user.

# change me
export IAM_USER_PSWD=T0pS3cr3Tpa55w0rD
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name development-user-group-customer \
--template-body file://cfn-templates/development-user-group-customer.yml \
--parameters \
ParameterKey=DevUserPassword,ParameterValue=${IAM_USER_PSWD} \
--capabilities CAPABILITY_NAMED_IAM

Next, we will execute the second CloudFormation template, ecr-repo-not-shared.yml, three times, once for each of the customer’s three ECR repositories, nlp-client, lang-app, and prose-app. Note that in the RepositoryPolicyText section of the template we only define a single policy. Identical to the vendor’s policy, the AllowPushPull policy explicitly allows the previously-created CustomerDev user to push and pull versions of the image to the ECR repository. There is no cross-account access required to the customer’s two ECR repositories.

RepositoryPolicyText:
Version: '2012-10-17'
Statement:
- Sid: AllowPushPull
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
AWS:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'DevUserArn']]

Action:
- 'ecr:BatchCheckLayerAvailability'
- 'ecr:BatchGetImage'
- 'ecr:CompleteLayerUpload'
- 'ecr:DescribeImages'
- 'ecr:DescribeRepositories'
- 'ecr:GetDownloadUrlForLayer'
- 'ecr:GetRepositoryPolicy'
- 'ecr:InitiateLayerUpload'
- 'ecr:ListImages'
- 'ecr:PutImage'
- 'ecr:UploadLayerPart'

Execute the following commands to create the three CloudFormation Stacks. The Stacks use the same template, ecr-repo-not-shared.yml, with a different Stack name and RepoName parameter values.

# NLP Client microservice
REPO_NAME=nlp-client
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

# NLP Prose microservice
REPO_NAME=prose-app
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM
# NLP Language microservice
REPO_NAME=lang-app
aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name ecr-repo-${REPO_NAME} \
--template-body file://cfn-templates/ecr-repo-not-shared.yml \
--parameters \
ParameterKey=RepoName,ParameterValue=${REPO_NAME} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the resulting three ECR repositories.

At this point, we have our four ECR repositories across the two AWS accounts, with the proper ECR Repository Policies applied to each.

Building and Pushing Images to ECR

Next, we will build and push the three NLP application images to their corresponding ECR repositories. To confirm that the ECR policies are working correctly, log in as the VendorDev user and perform the below command.

aws ecr get-login-password --region ${ISV_ECR_REGION} \
| docker login --username AWS --password-stdin ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com

Logged in as the vendor’s VendorDev user, build and push the Docker image to the rake-app repository. The Dockerfile and Golang source code are located in each GitHub repository. With Golang and Docker multi-stage builds, we will create very small Docker images, based on Scratch, containing just the compiled Go executable binary. At a mere 7–15 MBs in size, pushing and pulling these Docker images across accounts is very fast.

docker build -t ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0 . --no-cache
docker push ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0

Below, we see the output from the vendor’s VendorDev user logging into the rake-app repository.

We see the vendor’s VendorDev user building results and pushing the Docker image to the rake-app repository.

Next, after logging in as the customer’s CustomerDev user, build and push the Docker images to the ECR nlp-client, lang-app, and prose-app repositories. Again, make sure you substitute the variable values below with your pseudo customer’s AWS account and preferred AWS region.

aws ecr get-login-password --region ${CUSTOMER_ECR_REGION} \
| docker login --username AWS --password-stdin ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com
# nlp-client
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0
# prose-app
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0
# lang-app
docker build -t ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0 . --no-cache
docker push ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0

At this point, each of the four customer ECR repositories has a Docker image pushed to them.

Deploying Locally to Docker Swarm

As a simple demonstration of cross-account ECS access, we will start with Docker Swarm. Logged in as the customer’s CustomerDev user and using the Docker Swarm Stack file included in the project, we can create and run a local copy of our NLP application in our customer’s account. First, we need to log into the vendor’s ECR repository in order to pull the image from the vendor’s ECR registry.

aws ecr get-login-password --region ${ISV_ECR_REGION} \
| docker login --username AWS --password-stdin ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com

Once logged in to the vendor’s ECR repository, we will pull the image. Using the docker describe-repositories and docker describe-images, we can list cross-account repositories and images your IAM user has access to if you are unsure.

aws ecr describe-repositories \
--registry-id ${ISV_ACCOUNT} \
--region ${ISV_ECR_REGION} \
--repository-name rake-app
aws ecr describe-images \
--registry-id ${ISV_ACCOUNT} \
--region ${ISV_ECR_REGION} \
--repository-name rake-app
docker pull ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0

Using the following command, you should see each of our four applications Docker images.

docker image ls --filter=reference='*amazonaws.com/*'

Below, we see an example of the expected terminal output from pulling the image and listing the images.

Build Docker Stack Locally

Next, build the Docker Swarm Stack. The Docker Compose file, stack.yml, is shown below. Note the location of the Docker images.

version: '3.9'
services:
nlp-client:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/nlp-client:1.1.0
networks:
nlp-demo
ports:
target: 8080
published: 8080
protocol: tcp
mode: host
environment:
NLP_CLIENT_PORT
RAKE_ENDPOINT
PROSE_ENDPOINT
LANG_ENDPOINT
API_KEY
rake-app:
image: ${ISV_ACCOUNT}.dkr.ecr.${ISV_ECR_REGION}.amazonaws.com/rake-app:1.1.0
networks:
nlp-demo
environment:
RAKE_PORT
API_KEY
prose-app:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/prose-app:1.1.0
networks:
nlp-demo
environment:
PROSE_PORT
API_KEY
lang-app:
image: ${CUSTOMER_ACCOUNT}.dkr.ecr.${CUSTOMER_ECR_REGION}.amazonaws.com/lang-app:1.1.0
networks:
nlp-demo
environment:
LANG_PORT
API_KEY
networks:
nlp-demo:
volumes:
data: {}
view raw stack.yaml hosted with ❤ by GitHub

Execute the following commands to deploy the Docker Stack to Docker Swarm. Again, make sure you substitute the variable values below with your pseudo vendor and customer AWS accounts and regions. Additionally, the NLP API uses an API Key to protect all exposed endpoints, except the /health endpoint, across all four services. Change the default CloudFormation template’s API Key parameter to something more secure.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export CUSTOMER_ACCOUNT=999888777666
export CUSTOMER_ECR_REGION=us-west-2
export API_KEY=SuP3r5eCRetAutHK3y

# don't change me
export NLP_CLIENT_PORT=8080
export RAKE_PORT=8080
export PROSE_PORT=8080
export LANG_PORT=8080
export RAKE_ENDPOINT=http://rake-app:${RAKE_PORT}
export PROSE_ENDPOINT=http://prose-app:${PROSE_PORT}
export LANG_ENDPOINT=http://lang-app:${LANG_PORT}
export TEXT="The Nobel Prize is regarded as the most prestigious award in the World. Notable winners have included Marie Curie, Theodore Roosevelt, Albert Einstein, George Bernard Shaw, and Winston Churchill."
 
docker swarm init 
docker stack deploy --compose-file docker/stack.yml nlp

You can check the success of the deployment with either of the following commands:

docker stack ps nlp --no-trunc
docker container ls

Below, we see an example of the expected terminal output.

With the Docker Stack, you can hit the nlp-client service directly on localhost:8080. Unlike Fargate, which requires unique static ports for each container in the task, with Docker, we can choose to run all the containers on the same port without conflict since only the nlp-client service is exposing port :8080. Unlike with ECS, there is no load balancer in front of the Stack, since we only have a single node in our Swarm and thus a single container instance of each microservice for testing.

To test that the images were pulled successfully and the Docker Stack is running, we can execute a curl command against any of the API endpoints, such as /keywords. Below, I am using jq to pretty-print the JSON response payload.

curl -s -X POST \
"http://localhost:${NLP_CLIENT_PORT}/keywords" \
-H 'Content-Type: application/json' \
-H "X-API-Key: ${API_KEY}" \
-d '{"text": "The Internet is the global system of interconnected computer networks that use the Internet protocol suite to link devices worldwide."}' | jq

The resulting JSON response payload indicates that the nlp-client service was reached successfully and that it was then subsequently able to communicate with the rake-app service, whose container image originated from the vendor’s ECR repository.

[
{
"candidate": "interconnected computer networks",
"score": 9
},
{
"candidate": "link devices worldwide",
"score": 9
},
{
"candidate": "internet protocol suite",
"score": 8
},
{
"candidate": "global system",
"score": 4
},
{
"candidate": "internet",
"score": 2
}
]

Creating Amazon ECS Environment

Although using Docker Swarm locally is a great way to understand how cross-account ECR access works, it is not a typical use case for deploying containerized applications on the AWS Platform. More often, you could use Amazon ECS, Amazon Elastic Kubernetes Service (EKS), or enterprise versions of third-party orchestrators such as RedHat OpenShift or Rancher.

Using CloudFormation and some very convenient CloudFormation templates supplied by Amazon as a starting point, we will create a complete ECS environment for our application. First, we will create a VPC to house the ECS cluster and a public-facing ALB to front our ECS-based application, using the public-vpc.yml template.

aws --region ${CUSTOMER_ECR_REGION} cloudformation create-stack \
--stack-name public-vpc \
--template-body file://cfn-templates/public-vpc.yml \
--capabilities CAPABILITY_NAMED_IAM

Next, we will create the ECS cluster and an Amazon ECS Task Definition using the public-subnet-public-loadbalancer.yml template. Again, the Task Definition defines how ECS will deploy our application using AWS Fargate. Amazon Fargate allows you to run containers without having to manage servers or clusters. No EC2 instances to manage! Woot! Below, in the CloudFormation template, we see the ContainerDefinitions section of the TaskDefinition resource that contains three container definitions. Note the three images and their ECR locations.

TaskDefinition:
Type: AWS::ECS::TaskDefinition
DependsOn: CloudWatchLogsGroup
Properties:
Family: !Ref 'ServiceNameClient'
Cpu: !Ref 'ContainerCpu'
Memory: !Ref 'ContainerMemory'
NetworkMode: awsvpc
RequiresCompatibilities:
FARGATE
EC2
ExecutionRoleArn:
Fn::ImportValue:
!Join [':', [!Ref 'StackName', 'ECSTaskExecutionRole']]
TaskRoleArn:
Fn::If:
'HasCustomRole'
!Ref 'Role'
!Ref 'AWS::NoValue'
ContainerDefinitions:
Name: nlp-client
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/nlp-client:1.1.0']]
PortMappings:
ContainerPort: !Ref ContainerPortClient
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: NLP_CLIENT_PORT
Value: !Ref ContainerPortClient
Name: RAKE_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortRake]]
Name: PROSE_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortProse]]
Name: LANG_ENDPOINT
Value: !Join [':', ['http://localhost&#39;, !Ref ContainerPortLang]]
Name: API_KEY
Value: !Ref ApiKey
Name: rake-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref VendorAccountId, 'dkr.ecr', !Ref VendorEcrRegion, 'amazonaws.com/rake-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: RAKE_PORT
Value: !Ref ContainerPortRake
Name: API_KEY
Value: !Ref ApiKey
Name: prose-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/prose-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: PROSE_PORT
Value: !Ref ContainerPortProse
Name: API_KEY
Value: !Ref ApiKey
Name: lang-app
Cpu: 256
Memory: 1024
Image: !Join ['.', [!Ref 'AWS::AccountId', 'dkr.ecr', !Ref 'AWS::Region', 'amazonaws.com/lang-app:1.1.0']]
Essential: true
LogConfiguration:
LogDriver: awslogs
Options:
awslogs-region: !Ref AWS::Region
awslogs-group: !Ref CloudWatchLogsGroup
awslogs-stream-prefix: ecs
Environment:
Name: LANG_PORT
Value: !Ref ContainerPortLang
Name: API_KEY
Value: !Ref ApiKey

Execute the following command to create the ECS cluster and an ECS Task Definition using the CloudFormation template.

# change me
export ISV_ACCOUNT=111222333444
export ISV_ECR_REGION=us-east-2
export API_KEY=SuP3r5eCRetAutHK3y
aws cloudformation create-stack \
--stack-name public-subnet-public-loadbalancer \
--template-body file://cfn-templates/public-subnet-public-loadbalancer.yml \
--parameters \
ParameterKey=VendorAccountId,ParameterValue=${ISV_ACCOUNT} \
ParameterKey=VendorEcrRegion,ParameterValue=${ISV_ECR_REGION} \
ParameterKey=ApiKey,ParameterValue=${API_KEY} \
--capabilities CAPABILITY_NAMED_IAM

Below, we see an example of the expected output from the CloudFormation Management Console.

If you want to update the ECS Task Definition, simply run the aws cloudformation update-stack command.

CloudWatch Container Insights

The CloudFormation template does not enable CloudWatch Container Insights by default. Container Insights collects, aggregates, and summarizes metrics and logs from your containerized applications. To enable Insights, execute the following command:

aws ecs put-account-setting \
--name "containerInsights" --value "enabled"

Confirming the Cross-account Policy

If everything went right in the previous steps, we should now have an ECS cluster running our containerized application, including the container built from the vendor’s Docker image. Below, we see an example of the ECS cluster displayed in the management console.

Within the ECR cluster, we should observe a single running ECS Service. According to AWS, Amazon ECS allows you to run and maintain a specified number of instances of a task definition simultaneously in an Amazon ECS cluster; this is referred to as a Service.

We are running two instances of each container on ECS, thus two copies of the task within a single service. Each task runs its containers in a different Availability Zone for high availability.

Drilling into the service, note the new ALB associated with the new VPC, two public subnets, and the corresponding security group.

Switching to the Task Definitions tab, note that the ECS Task contains four container definitions that comprise the NLP API. Three images originated from the customer’s ECR repositories, and one from the vendor’s ECR repository.

Drilling in further, we will see the details of each container definition, including environment variables, passed from ECR to the container and on to the Golang binary running in the container.

Accessing the NLP API on ECS

With our earlier Docker Swarm example, the curl command was issued against localhost. We now have a public-facing Application Load Balancer (ALB) in front of our ECS-based application. We will use the DNS name of the ALB to hit our application on ECS. The DNS address (A Record) can be obtained from the Load Balancer Management Console, as shown below, or from the Output tab of the public-vpc CloudFormation Stack.

Another difference between the earlier Docker Swarm example and ECS is the port. Although the edge service, nlp-client, runs on port :8080, the ALB acts as a reverse proxy, passing requests from port :80 on the ALB to port :8080 of the nlp-client container instances (actually, the shared ENI of the running task).

Although I did set up a custom domain name for the ALB using Route53 and enabled HTTPS (port 443 on the ELB), https://nlp-ecs.example-api.com, for the sake of brevity, I will not go into the details in this post.

To test our deployed ECS, we can use a tool like curl or Postman to test the API’s endpoints. Don’t forget to you will need to add the API Key for authentication using the X-API-Key header key/value pair. Below we see a successful GET against the /routes endpoint, using Postman.

Here we see a successful POST against the /keywords endpoint, using Postman.

Cleaning Up

To clean up the demonstration’s AWS resources and Docker Stack, run the following scripts in the appropriate AWS accounts. Importantly, similar to S3, you must delete all the Docker images in the ECR repositories first, before deleting the repository, or else you will receive a CloudFormation error. This includes untagged images.

# local docker stack
docker stack rm nlp
# customer account
aws ecr batch-delete-image \
--repository-name nlp-client \
--image-ids imageTag=1.1.0
aws ecr batch-delete-image \
--repository-name prose-app \
--image-ids imageTag=1.1.0
aws ecr batch-delete-image \
--repository-name lang-app \
--image-ids imageTag=1.1.0
aws cloudformation delete-stack \
--stack-name ecr-repo-nlp-client
aws cloudformation delete-stack \
--stack-name ecr-repo-prose-app
aws cloudformation delete-stack \
--stack-name ecr-repo-lang-app
aws cloudformation delete-stack \
--stack-name public-subnet-public-loadbalancer
aws cloudformation delete-stack \
--stack-name public-vpc
aws cloudformation delete-stack \
--stack-name development-user-group-customer
# vendor account
aws ecr batch-delete-image \
--repository-name rake-app \
--image-ids imageTag=1.1.0
aws cloudformation delete-stack \
--stack-name ecr-repo-rake-app
aws cloudformation delete-stack \
--stack-name development-user-group-isv

Conclusion

In the preceding post, we saw how multiple AWS accounts could share private ECR-based Docker images. There are variations and restrictions to the configuration of the ECR Repository Policies, depending on the deployment tools you are using, such as AWS CodeBuild, AWS CodeDeploy, or AWS Elastic Beanstalk. AWS does a good job of providing some examples in their documentation, including Amazon ECR Repository Policy Examples and Amazon Elastic Container Registry Identity-Based Policy Examples.

In late 2020, AWS released Amazon Elastic Container Registry Public (ECR Public). Although this post was about private images, for public images, ECR Public allows you to store, manage, share, and deploy container images for anyone to discover and download globally.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

IoT Data Analytics at the Edge: Exploring the convergence of IoT, Data Analytics, and Edge Computing with Grafana, Mosquitto, and TimescaleDB on ARM-based devices

This post is a revised version of an earlier post, featuring major version updates of TimescaleDB (v1.7.4-pg12 to v2.0.0-pg12), Grafana (v7.1.5 to v7.5.2), and Mosquitto (v1.6.12 to v2.0.9). All source code and SQL scripts are revised. Note that TimeScaleDB has a current limitation/bug with Docker on ARM later than v2.0.0.

GMT IoT Edge Analytics Stack architecture (Image by author

The Edge

Edge computing is a fast-growing technology trend, which involves pushing compute capabilities to a network’s edge. Wikipedia describes edge computing as a distributed computing paradigm that brings computation and data storage closer to the location needed to improve response times and save bandwidth. The term edge commonly refers to a compute node at the edge of a network (edge device), sitting close to the data source and between that data source and external system such as the Cloud. In his recent post, 3 Advantages (And 1 Disadvantage) Of Edge Computing, well-known futurist Bernard Marr argues reduced bandwidth requirements, reduced latency, and enhanced security and privacy as three primary advantages of edge computing.

David Ricketts, Head of Marketing at Quiss Technology PLC, in his post, Cloud and Edge Computing — The Stats You Need to Know for 2018, estimates that the global edge computing market is expected to reach USD 6.72 billion by 2022 at a compound annual growth rate of a whopping 35.4 percent. Realizing the market potential, many major Cloud providers, edge device manufacturers, and integrators are rapidly expanding their edge compute capabilities. AWS, for example, currently offers more than a dozen services in their edge computing category.

Internet of Things

Edge computing is frequently associated with the Internet of Things (IoT). IoT devices, industrial equipment, and sensors generate data transmitted to other internal and external systems, often by way of edge nodes, such as an IoT Gateway. IoT devices typically generate time-series data. According to Wikipedia, a time series is a set of data points indexed in time order — a sequence taken at successive equally spaced points in time. IoT devices typically generate continuous high-volume streams of time-series data, often on a scale of millions of data points per second. IoT data characteristics require IoT platforms to minimally support temporal accuracy, high-volume ingestion and processing, efficient data compression and downsampling, and real-time querying capabilities.

Edge devices such as IoT Gateways, which aggregate and transmit IoT data from these devices to external systems, are generally lower-powered, with limited processors, memory, and storage. Accordingly, IoT platforms must satisfy all the requirements of IoT data while simultaneously supporting resource-constrained environments.

IoT Analytics at the Edge

Leading Cloud providers AWS, Azure, Google Cloud, IBM Cloud, Oracle Cloud, and Alibaba Cloud all offer IoT services. Many offer IoT services with edge computing capabilities. AWS offers AWS IoT Greengrass. Greengrass provides local compute, messaging, data management, sync, and machine learning (ML) inference capabilities to edge devices. Azure offers Azure IoT Edge. Azure IoT Edge provides the ability to run artificial intelligence (AI), Azure and third-party services, and custom business logic on edge devices using standard containers. Google Cloud offers Edge TPU. Edge TPU (Tensor Processing Unit) is Google’s purpose-built application-specific integrated circuit (ASIC), designed to run AI at the edge.

IoT Analytics

Many Cloud providers also offer IoT analytics as part of their suite of IoT services, although not at the edge. AWS offers AWS IoT Analytics, while Azure has Azure Time Series Insights. Google provides IoT analytics, indirectly, through downstream analytic systems and ad hoc analysis using Google BigQuery or advanced analytics and machine learning with Cloud Machine Learning Engine. These services generally all require data to be transmitted to the Cloud for analytics.

Cloud-centric IoT analytics platform data flow (Image by author)

The ability to analyze real-time, streaming IoT data at the edge is critical to a rapid feedback loop. IoT edge analytics can accelerate anomaly detection and remediation, improve predictive maintenance capabilities, and expedite proactive inventory replenishment.

IoT Edge Analytics Stack

In my opinion, the ideal IoT edge analytics stack is comprised of lightweight, purpose-built, easily deployable and manageable, platform- and programming language-agnostic, open-source software components. The minimal IoT edge analytics stack should include:

  1. Lightweight message broker;
  2. Purpose-built time-series database;
  3. ANSI-standard SQL ad-hoc query engine;
  4. Data visualization tool;
  5. Simple deployment and management framework;

Each component should be purpose-built for IoT.

Lightweight Message Broker

We will use Eclipse Mosquitto as our message broker. According to the project’s description, Mosquitto is an open-source message broker that implements the Message Queuing Telemetry Transport (MQTT) protocol versions 5.0, 3.1.1, and 3.1. Mosquitto is lightweight and suitable for use on all devices, from low-power single-board computers (SBCs) to full-powered servers.

MQTT Client Library

We will interact with Mosquitto using Eclipse Paho. According to the project, the Eclipse Paho project provides open-source, mainly client-side implementations of MQTT and MQTT-SN in a variety of programming languages. MQTT and MQTT for Sensor Networks (MQTT-SN) are light-weight publish/subscribe messaging transports for TCP/IP and connectionless protocols, such as UDP, respectively.

We will be using Paho’s Python Client. The Paho Python Client provides a client class with support for both MQTT v3.1 and v3.1.1 on Python 2.7 or 3.x. The client also provides helper functions to make publishing messages to an MQTT server straightforward.

Time-Series Database

Time-series databases are optimal for storing IoT data. According to InfluxData, makers of a leading time-series database, InfluxDB, a time-series database (TSDB), is a database optimized for time-stamped or time-series data. Time series data are simply measurements or events that are tracked, monitored, downsampled, and aggregated over time. Jiao Xian of Alibaba Cloud has authored an insightful post on the time-series database ecosystem, What Are Time Series Databases? A few leading Cloud providers offer purpose-built time-series databases, though they are not available at the edge. AWS offers Amazon Timestream, and Alibaba Cloud offers Time Series Database.

InfluxDB is an excellent choice for a time-series database. It was my first choice, along with TimescaleDB, when developing this stack. However, InfluxDB Flux’s apparent incompatibilities with some ARM-based architecture ruled it out for inclusion in the stack for this particular post.

We will use TimescaleDB as our time-series database. TimescaleDB is the leading open-source relational database for time-series data. Described as ‘PostgreSQL for time-series,’ TimescaleDB is based on PostgreSQL, which provides full ANSI SQL, rock-solid reliability, and a massive ecosystem. TimescaleDB claims to achieve 10–100x faster queries than PostgreSQL, InfluxDB, and MongoDB, with native optimizations for time-series analytics.

TimescaleDB is designed for performing analytical queries, both through its native support for PostgreSQL’s full range of SQL functionality and additional functions native to TimescaleDB. These time-series optimized functions include Median/Percentile, Cumulative Sum, Moving Average, Increase, Rate, Delta, Time Bucket, Histogram, and Gap Filling.

Ad-hoc Data Query Engine

We have the option of using psql, the terminal-based front-end to PostgreSQL, to execute ad-hoc queries against TimescaleDB. The psql front-end enables you to enter queries interactively, issue them to PostgreSQL, and see the query results.

View of psql terminal-based interface for querying the TimescaleDB database

We also have the option of using pgAdmin, specifically the biarms/pgadmin4 Docker version, to execute ad-hoc queries and perform most other database tasks. pgAdmin is the most popular open-source administration and development platform for PostgreSQL. While several popular Docker versions of pgAdmin only support Linux AMD64 architectures, the biarms/pgadmin4 Docker version supports ARM-based devices.

Dashboard view of TimescaleDB database from within pgAdmin UI

Executing a query against the TimescaleDB database using pgAdmin’s Query Tool

Data Visualization

For data visualization, we will use Grafana. Grafana allows you to query, visualize, alert on, and understand metrics no matter where they are stored. With Grafana, you can create, explore, and share dashboards, fostering a data-driven culture. Grafana allows you to define thresholds visually and get notified via Slack, PagerDuty, and more. Grafana supports dozens of data sources, including MySQL, PostgreSQL, Elasticsearch, InfluxDB, TimescaleDB, Graphite, Prometheus, Google BigQuery, GraphQL, and Oracle. Grafana is extensible through a large collection of plugins.

Example of Grafana dashboard showing the post’s IoT sensor data

Edge Deployment and Management Platform

Docker introduced the current industry standard for containers in 2013. Docker containers are a standardized unit of software that allows developers to isolate apps from their environment. We will use Docker to deploy the IoT edge analytics stack, referred to herein as the GTM Stack, composed of containerized versions of Grafana, TimescaleDB, Eclipse Mosquitto, and pgAdmin, to an ARMv7-based edge node. The acronym, GTM, comes from the three primary OSS projects composing the stack. The abbreviation also suggests Greenwich Mean Time, relating to the precise time-series nature of IoT data.

GMT IoT Edge Analytics Stack architecture (Image by author)

Running Docker Engine in swarm mode, we can use Docker to deploy the complete IoT edge analytics stack to the swarm, running on the edge node. The deploy command accepts a stack description in the form of a Docker Compose file, a YAML file used to configure the application’s services. With a single command, we can create and start all the services from the configuration file.

Source Code

All source code for this post is available on GitHub. Use the following command to git clone a local copy of the project. Note that the updated version of the source code for this post is in the v2021–03 branch.

git clone --branch v2021-03 --single-branch --depth 1 \
https://github.com/garystafford/iot-analytics-at-the-edge.git

IoT Devices

For this post, I have deployed three Linux ARM-based IoT devices, each connected to a sensor array. Each sensor array contains multiple analog and digital sensors. The sensors record temperature, humidity, air quality (liquefied petroleum gas (LPG), carbon monoxide (CO), and smoke), light, and motion. For more information on the IoT device and sensor hardware involved, please see my previous post.Getting Started with IoT Analytics on AWS
Analyze environmental sensor data from IoT devices in near real-time with AWS IoT Analyticstowardsdatascience.com

Each ARM-based IoT device runs a small Python3-based script, sensor_data_to_mosquitto.py, shown below.

import argparse
import json
import logging
import sys
import time
from datetime import datetime
import paho.mqtt.publish as publish
from getmac import get_mac_address
from pytz import timezone
from Sensors import Sensors
# Sensor to Mosquitto Script
# Author: Gary A. Stafford
# Date: 2021-03-26
# Usage: python3 sensor_data_to_mosquitto.py \
# –host "192.168.1.12" –port 1883 \
# –topic "sensor/output" –frequency 10
sensors = Sensors()
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
def main():
args = parse_args()
publish_message_to_db(args)
def get_readings():
sensors.led_state(0)
# Retrieve sensor readings
payload_dht = sensors.get_sensor_data_dht()
payload_gas = sensors.get_sensor_data_gas()
payload_light = sensors.get_sensor_data_light()
payload_motion = sensors.get_sensor_data_motion()
message = {
"device_id": get_mac_address(),
"time": datetime.now(timezone("UTC")),
"data": {
"temperature": payload_dht["temperature"],
"humidity": payload_dht["humidity"],
"lpg": payload_gas["lpg"],
"co": payload_gas["co"],
"smoke": payload_gas["smoke"],
"light": payload_light["light"],
"motion": payload_motion["motion"]
}
}
return message
def date_converter(o):
if isinstance(o, datetime):
return o.__str__()
def publish_message_to_db(args):
while True:
message = get_readings()
message_json = json.dumps(message, default=date_converter, sort_keys=True,
indent=None, separators=(',', ':'))
logger.debug(message_json)
try:
publish.single(args.topic, payload=message_json, hostname=args.host, port=args.port)
except Exception as error:
logger.error("Exception: {}".format(error))
finally:
time.sleep(args.frequency)
# Read in command-line parameters
def parse_args():
parser = argparse.ArgumentParser(description='Script arguments')
parser.add_argument('–host', help='Mosquitto host', default='localhost')
parser.add_argument('–port', help='Mosquitto port', type=int, default=1883)
parser.add_argument('–topic', help='Mosquitto topic', default='paho/test')
parser.add_argument('–frequency', help='Message frequency in seconds', type=int, default=5)
return parser.parse_args()
if __name__ == "__main__":
main()

The IoT devices’ script implements the Eclipse Paho MQTT Python client library. An MQTT message containing simultaneous readings from each sensor is sent to a Mosquitto topic on the edge node at a configurable frequency.

message = {
"device_id": get_mac_address(),
"time": datetime.now(timezone("UTC")),
"data": {
"temperature": payload_dht["temperature"],
"humidity": payload_dht["humidity"],
"lpg": payload_gas["lpg"],
"co": payload_gas["co"],
"smoke": payload_gas["smoke"],
"light": payload_light["light"],
"motion": payload_motion["motion"]
}
}
view raw sensor_message.py hosted with ❤ by GitHub

Below are the actual sensor readings sent by the IoT device as an MQTT message to the Mosquitto topic.

{
"data": {
"co": 0.0031827073092533685,
"humidity": 51.099998474121094,
"light": true,
"lpg": 0.005553622262501496,
"motion": false,
"smoke": 0.01449612738171321,
"temperature": 19.100000381469727
},
"device_id": "00:0f:00:70:91:0a",
"time": "2021-04-02 17:23:44.809046+00:00"
}
view raw sample_message.json hosted with ❤ by GitHub

IoT Edge Node

For this post, I have deployed a single Linux ARM-based edge node. The three IoT devices containing sensor arrays communicate with the edge node over Wi-Fi. IoT devices could easily use an alternative communication protocol, such as BLE, LoRaWAN, or Ethernet. For more information on BLE and LoRaWAN, please see some of my previous posts:LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT and BLE and GATT for IoT: Getting Started with Bluetooth Low Energy (BLE) and Generic Attribute Profile (GATT) Specification for IoT.

The edge node also runs a small Python3-based script, mosquitto_to_timescaledb.py, shown below.

import argparse
import json
import logging
import sys
from datetime import datetime
import paho.mqtt.client as mqtt
import psycopg2
# Mosquitto to TimescaleDB Script
# Author: Gary A. Stafford
# Date: 2021-03-31
# Usage: python3 mosquitto_to_timescaledb.py \
# –msqt_topic "sensor/output –msqt_host "192.168.1.12" –msqt_port 1883 \
# –ts_host "192.168.1.12" –ts_port 5432 \
# –ts_username postgres –ts_password postgres1234 –ts_database demo_iot
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
args = argparse.Namespace
ts_connection: str = ""
def main():
global args
args = parse_args()
global ts_connection
ts_connection = "postgres://{}:{}@{}:{}/{}".format(args.ts_username, args.ts_password, args.ts_host,
args.ts_port, args.ts_database)
logger.debug("TimescaleDB connection: {}".format(ts_connection))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.msqt_host, args.msqt_port, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
logger.debug("Connected with result code {}".format(str(rc)))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(args.msqt_topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
logger.debug("Topic: {}, Message Payload: {}".format(msg.topic, str(msg.payload)))
publish_message_to_db(msg)
def date_converter(o):
if isinstance(o, datetime):
return o.__str__()
def publish_message_to_db(message):
message_payload = json.loads(message.payload)
# logger.debug("message.payload: {}".format(json.dumps(message_payload, default=date_converter)))
sql = """INSERT INTO sensor_data(time, device_id, temperature, humidity, lpg, co, smoke, light, motion)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);"""
data = (
message_payload["time"],
message_payload["device_id"],
message_payload["data"]["temperature"],
message_payload["data"]["humidity"],
message_payload["data"]["lpg"],
message_payload["data"]["co"],
message_payload["data"]["smoke"],
message_payload["data"]["light"],
message_payload["data"]["motion"]
)
try:
with psycopg2.connect(ts_connection, connect_timeout=3) as conn:
with conn.cursor() as curs:
try:
curs.execute(sql, data)
except psycopg2.Error as error:
logger.error("Exception: {}".format(error.pgerror))
except Exception as error:
logger.error("Exception: {}".format(error))
except psycopg2.OperationalError as error:
logger.error("Exception: {}".format(error.pgerror))
finally:
conn.close()
# Read in command-line parameters
def parse_args():
parser = argparse.ArgumentParser(description='Script arguments')
parser.add_argument('–msqt_topic', help='Mosquitto topic', default='paho/test')
parser.add_argument('–msqt_host', help='Mosquitto host', default='localhost')
parser.add_argument('–msqt_port', help='Mosquitto port', type=int, default=1883)
parser.add_argument('–ts_host', help='TimescaleDB host', default='localhost')
parser.add_argument('–ts_port', help='TimescaleDB port', type=int, default=5432)
parser.add_argument('–ts_username', help='TimescaleDB username', default='postgres')
parser.add_argument('–ts_password', help='TimescaleDB password', default='postgres1234')
parser.add_argument('–ts_database', help='TimescaleDB password', default='demo_iot')
return parser.parse_args()
if __name__ == "__main__":
main()

Like the IoT devices, the edge node’s script implements the Eclipse Paho MQTT Python client library. The script pulls MQTT messages off a Mosquitto topic(s), serializes the message payload to JSON, and writes the payload’s data to the TimescaleDB database. The edge node’s script accepts several arguments, which allow you to configure the necessary Mosquitto and TimescaleDB connection settings.

Why not use Telegraf?

Telegraf is a plugin-driven agent that collects, processes, aggregates, and writes metrics. There is a Telegraf output plugin, the PostgreSQL and TimescaleDB Output Plugin for Telegraf, produced by TimescaleDB. The plugin can replace the need to manage and maintain the above script. However, I chose not to use it because it is not yet an official Telegraf plugin. If the plugin was included in a Telegraf release, I would certainly encourage its use.

Script Management

Both Linux-based IoT devices and edge nodes run systemd system and service manager. To ensure the Python scripts keep running in the case of a system restart, we define a systemd unit. Units are objects that systemd knows how to manage. This is a standardized representation of system resources that can be managed by the suite of daemons and manipulated by the provided utilities. Each script has a systemd unit file. Below, we see the gtm_stack_mosquitto unit file, gtm_stack_mosquitto.service.

[Unit]
Description=GTM Stack – Sensor to Mosquitto Script
After=network.target
[Service]
ExecStart=/usr/bin/python3 -u sensor_data_to_mosquitto.py \
–host "192.168.1.12" –port 1883 –topic "sensor/output"
WorkingDirectory=/home/pi/iot-analytics-at-the-edge/scripts
StandardOutput=inherit
StandardError=inherit
Restart=always
User=pi
[Install]
WantedBy=multi-user.target

The gtm_stack_mosq_to_tmscl unit file, gtm_stack_mosq_to_tmscl.service, is nearly identical.

To install the gtm_stack_mosquitto.service systemd unit file on each IoT device, use the following commands:

SERVICE=gtm_stack_mosquitto
sudo cp systemctl/${SERVICE}.service /etc/systemd/system/
sudo systemctl start ${SERVICE}.service
sudo systemctl enable ${SERVICE}.service
# check status
systemctl status ${SERVICE}.service
ps aux | grep sensor_data_to_mosquitto.py
view raw systemd.sh hosted with ❤ by GitHub

Installing the gtm_stack_mosq_to_tmscl.service unit file on the edge node is nearly identical.

Docker Stack

The edge node runs the GTM Docker stack, stack.yml, in a swarm. As discussed earlier, the stack contains four containers: Eclipse Mosquitto, TimescaleDB, Grafana, and pgAdmin. The Mosquitto, TimescaleDB, and Grafana containers have paths within the containers bind-mounted to directories on the edge device. With bind-mounting, the container’s configuration and data will persist if the containers are removed and re-created. The containers are running on an isolated overlay network.

version: "3.9" # optional since v1.27.0
services:
timescaledb:
image: timescale/timescaledb:2.0.0-pg12
ports:
"5432:5432/tcp"
networks:
demo-iot-net
environment:
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: postgres1234
POSTGRES_DB: demo_iot
deploy:
restart_policy:
condition: on-failure
volumes:
"$HOME/data/postgres:/var/lib/postgresql/data"
grafana:
image: grafana/grafana:7.5.2
ports:
"3000:3000/tcp"
networks:
demo-iot-net
deploy:
restart_policy:
condition: on-failure
volumes:
"$HOME/data/grafana:/var/lib/grafana"
user: $ID:1
mosquitto:
image: eclipse-mosquitto:2.0.9
ports:
"1883:1883/tcp"
networks:
demo-iot-net
deploy:
restart_policy:
condition: on-failure
volumes:
"$HOME/data/mosquitto/config:/mosquitto/config"
"$HOME/data/mosquitto/data:/mosquitto/data"
"$HOME/data/mosquitto/log:/mosquitto/log"
pgadmin:
image: biarms/pgadmin4:4.21
ports:
"5050:5050/tcp"
networks:
demo-iot-net
deploy:
restart_policy:
condition: on-failure
networks:
demo-iot-net:
view raw stack.yml hosted with ❤ by GitHub

The GTM Docker stack is installed using the following commands on the edge node. We will assume Docker and git are pre-installed on the edge node for this post.

# on edge node
git clone https://github.com/garystafford/iot-analytics-at-the-edge.git
# build required directories
mkdir -p ~/data/postgres
mkdir -p ~/data/grafana
mkdir -p ~/data/mosquitto/config
mkdir -p ~/data/mosquitto/data
mkdir -p ~/data/mosquitto/log
# move mosquitto config
cd iot-analytics-at-the-edge/docker/
cp mosquitto.conf ~/data/mosquitto/config/
# deploy stack
docker swarm init
docker stack deploy -c stack.yml iot
# check status of stack
docker stack ps iot –no-trunc
docker stack services iot
view raw gtm_stack.sh hosted with ❤ by GitHub

First, we will create several local directories on the edge device, which will be used to bind-mount to the Docker container’s directories. Below, we see the bind-mounted local directories with the eventual container’s contents stored within them.

The bind-mounted local directories on the edge device from the stack

Next, we copy the custom Mosquitto configuration file, mosquitto.conf, included in the project to the edge device’s correct location.

Lastly, we initialize the Docker swarm and deploy the stack.

Output of ‘docker service ls' command, showing the running GTM Stack containers

TimescaleDB Setup

With the GTM stack running, we need to create a single Timescale hypertable, sensor_data, in the TimescaleDB demo_iot database to hold the incoming IoT sensor data. Hypertables, according to TimescaleDB, are designed to be easy to manage and to behave like standard PostgreSQL tables. Hypertables are comprised of many interlinked “chunk” tables. Commands made to the hypertable automatically propagate changes down to all of the chunks belonging to that hypertable.

CREATE TABLE IF NOT EXISTS sensor_data (
time timestamptz NOT NULL,
device_id text NOT NULL,
temperature double PRECISION NOT NULL,
humidity double PRECISION NOT NULL,
lpg double PRECISION NOT NULL,
co double PRECISION NOT NULL,
smoke double PRECISION NOT NULL,
light boolean NOT NULL,
motion boolean NOT NULL
);
SELECT create_hypertable('sensor_data', 'time');
view raw sensor_data.sql hosted with ❤ by GitHub

I suggest using psql to execute the required DDL statements, which will create the hypertable and the proceeding views and database user permissions. All SQL statements are included in the project’s statements.sql file. One way to use psql is to install it on your local workstation, then use psql to connect to the remote edge node. I prefer to instantiate a local PostgreSQL Docker container instance running psql. I then use the local container’s psql client to connect to the edge node’s TimescaleDB database. For example, from my local machine, I run the following docker run command to connect to the edge node’s TimescaleDB database on the edge node, located locally at 192.168.1.12.

docker run -it –rm postgres psql \
-U postgres -h 192.168.1.12 -p 5432 -d demo_iot
view raw docker_run.sh hosted with ❤ by GitHub

Although not as practical, you can also access psql from within the TimescaleDB Docker container, running on the actual edge node, using the following docker exec command.

TIMESCALEDB_CONTAINER=$(docker ps -q \
–filter='name=iot_timescaledb.1' –format '{{.Names}}')
docker exec -it ${TIMESCALEDB_CONTAINER} psql \
-U postgres -h localhost -d demo_iot
view raw access_psql.sh hosted with ❤ by GitHub

TimescaleDB Continuous Aggregates

For this post’s demonstration, we will create four TimescaleDB materialized views, which will be queried from a Grafana Dashboard. The materialized views are TimescaleDB Continuous Aggregates. According to Timescale, aggregate queries which touch large swathes of time-series data can take a long time to compute because the system needs to scan large amounts of data on every query execution. To make these queries faster, a continuous aggregate allows materializing the computed aggregates, while also providing means to continuously, and with low overhead, keep them up-to-date as the underlying source data changes.

For example, we generate sensor data every five seconds from the three IoT devices in this post. When visualizing a 24-hour period in Grafana, using continuous aggregates with an interval of one minute, we would reduce the total volume of data queried from 51,840 rows to 4,320 rows, a reduction of over 91%. The larger the time period or the number of IoT devices being analyzed, the more significant these savings will positively impact query performance.

A time_bucket on the time partitioning column of the hypertable is required for all continuous aggregate views. The time_bucket function, in this case, has a bucket width (interval) of 1 minute. The interval is configurable.

create materialized views (continuous aggregates)
temperature and humidity
CREATE MATERIALIZED VIEW temperature_humidity_summary_minute(device_id, bucket, avg_temp, avg_humidity)
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time),
avg(temperature),
avg(humidity)
FROM sensor_data
WHERE humidity BETWEEN 0 AND 100
GROUP BY time_bucket(INTERVAL '1 minute', time), device_id
WITH NO DATA;
air quality (lpg, co, smoke)
CREATE MATERIALIZED VIEW air_quality_summary_minute(device_id, bucket, avg_lpg, avg_co, avg_smoke)
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time),
avg(lpg),
avg(co),
avg(smoke)
FROM sensor_data
GROUP BY time_bucket(INTERVAL '1 minute', time), device_id
WITH NO DATA;
light
CREATE MATERIALIZED VIEW light_summary_minute(device_id, bucket, avg_light)
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time),
avg(case when light = 't' then 1 else 0 end)
FROM sensor_data
GROUP BY time_bucket(INTERVAL '1 minute', time), device_id
WITH NO DATA;
motion
CREATE MATERIALIZED VIEW motion_summary_minute(device_id, bucket, avg_motion)
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket(INTERVAL '1 minute', time),
avg(case when motion = 't' then 1 else 0 end)
FROM sensor_data
GROUP BY time_bucket(INTERVAL '1 minute', time), device_id
WITH NO DATA;

To automatically refresh the four materialized views, we will create four corresponding continuous aggregate policies. In this demonstration, the continuous aggregate policies create a refresh window between one week ago and one hour ago, with a refresh interval of one hour.

create policies that automatically refreshes continuous aggregates
SELECT add_continuous_aggregate_policy('air_quality_summary_minute',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
SELECT add_continuous_aggregate_policy('light_summary_minute',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
SELECT add_continuous_aggregate_policy('motion_summary_minute',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
SELECT add_continuous_aggregate_policy('temperature_humidity_summary_minute',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
view jobs
SELECT * FROM timescaledb_information.jobs;
view job stats
SELECT job_id, total_runs, total_failures, total_successes
FROM timescaledb_information.job_stats;

Advanced Analytic Queries

The ability to perform ad-hoc queries on time-series IoT data is an essential feature of the IoT edge analytics stack. We can use psql, pgAdmin, or even our own IDE to perform ad-hoc queries against the TimescaleDB database on the edge node. Below are examples of typical ad-hoc queries a data analyst might perform on IoT sensor data. These example queries demonstrate TimescaleDB’s advanced analytical capabilities for working with time-series data, including Moving Average, Delta, Time Bucket, and Histogram.

ad-hoc queries
find max temperature (°C) and humidity (%) for last 3 hours in 15 minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#select
SELECT time_bucket('15 minutes', time) AS fifteen_min,
device_id,
count(time),
max(temperature) AS max_temp,
max(humidity) AS max_hum
FROM sensor_data
WHERE time > now() INTERVAL '3 hours'
AND humidity BETWEEN 0 AND 100
GROUP BY fifteen_min, device_id
ORDER BY fifteen_min DESC, max_temp desc;
find temperature (°C) anomalies (delta > ~5°F)
https://docs.timescale.com/latest/using-timescaledb/reading-data#delta
WITH ht AS (SELECT time,
temperature,
abs(temperature lag(temperature) over (ORDER BY time)) AS delta
FROM sensor_data)
SELECT ht.time, ht.temperature, ht.delta
FROM ht
WHERE ht.delta > 2.63
ORDER BY ht.time;
find three minute moving average of temperature (°F) for last day
(5 sec. interval * 36 rows = 3 min.)
https://docs.timescale.com/latest/using-timescaledb/reading-data#moving-average
SELECT time,
avg((temperature * 1.9) + 32) over (ORDER BY time
ROWS BETWEEN 35 PRECEDING AND CURRENT ROW)
AS smooth_temp
FROM sensor_data
WHERE device_id = 'Manufacturing Plant'
AND time > now() INTERVAL '1 day'
ORDER BY time desc;
find average humidity (%) for last 12 hours in 5-minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#time-bucket
SELECT time_bucket('5 minutes', time) AS time_period,
avg(humidity) AS avg_humidity
FROM sensor_data
WHERE device_id = 'Main Warehouse'
AND humidity BETWEEN 0 AND 100
AND time > now() INTERVAL '12 hours'
GROUP BY time_period
ORDER BY time_period desc;
calculate histograms of avg. temperature (°F) between 55-85°F in 5°F buckets during last 2 days
https://docs.timescale.com/latest/using-timescaledb/reading-data#histogram
SELECT device_id,
count(time),
histogram((temperature * 1.9) + 32, 55.0, 85.0, 5)
FROM sensor_data
WHERE temperature IS NOT NULL
AND time > now() INTERVAL '2 days'
GROUP BY device_id;
find average light value for last 90 minutes in 5-minute time periods
https://docs.timescale.com/latest/using-timescaledb/reading-data#time-bucket
SELECT device_id,
time_bucket('5 minutes', time) AS five_min,
avg(case when light = 't' then 1 else 0 end) AS avg_light
FROM sensor_data
WHERE device_id = 'Manufacturing Plant'
AND time > now() INTERVAL '90 minutes'
GROUP BY device_id, five_min
ORDER BY five_min desc;

Data Visualization with Grafana

Using the TimescaleDB continuous aggregates we have created, we can quickly build a richly featured dashboard in Grafana. Below we see a typical IoT Dashboard you might build to monitor the post’s IoT sensor data in near real-time. An exported version, dashboard_external_export.json, is included in the GitHub project.

Example of Grafana dashboard showing the post’s IoT sensor data
Example of Grafana IoT Demo Dashboard showing sensor data

Limiting Grafana’s Access to IoT Data

Following the Grafana recommendation for database user permissions, we create a grafanareader PostgresSQL user, and limit the user’s access to the sensor_data table and the four views we created. Grafana will use this user’s credentials to perform SELECT queries of the TimescaleDB demo_iot database.

CREATE USER grafanareader WITH PASSWORD 'grafana1234';
GRANT USAGE ON SCHEMA public TO grafanareader;
GRANT SELECT ON public.sensor_data TO grafanareader;
GRANT SELECT ON public.temperature_humidity_summary_minute TO grafanareader;
GRANT SELECT ON public.air_quality_summary_minute TO grafanareader;
GRANT SELECT ON public.light_summary_minute TO grafanareader;
GRANT SELECT ON public.motion_summary_minute TO grafanareader;
view raw grafanareader.sql hosted with ❤ by GitHub

Using PostgreSQL in Grafana

Grafana’s documentation includes a comprehensive set of instructions for Using PostgreSQL in Grafana. To connect to the TimescaleDB database from Grafana, we use the PostgreSQL data source plugin.

Configuring the TimescaleDB database connection in Grafana

The data displayed in each Panel in the Grafana Dashboard is based on a SQL query. For example, the Average Temperature Panel might use a query similar to the example below. This particular query also converts Celsius to Fahrenheit. Note the use of Grafana Macros (e.g., $__time(), $__timeFilter()). Macros can be used within a query to simplify syntax and allow for dynamic parts.

SELECT
$__time(bucket),
device_id AS metric,
((avg_temp * 1.9) + 32) AS avg_temp
FROM temperature_humidity_summary_minute
WHERE
$__timeFilter(bucket)
ORDER BY 1,2

Below, we see another example from the Average Humidity Panel. In this particular query, we might choose to limit the humidity data to a valid range of 0%–100%.

SELECT
$__time(bucket),
device_id AS metric,
avg_humidity
FROM temperature_humidity_summary_minute
WHERE
$__timeFilter(bucket)
AND avg_humidity >= 0.0
AND avg_humidity <= 100.0
ORDER BY 1,2

Mobile Friendly

Grafana dashboards are mobile-friendly. Below we see two views of the dashboard, using the Chrome mobile browser on an Apple iPhone.

Grafana Alerts

Grafana allows Alerts to be created based on the Rules you define in each Panel. If data values match the Rule’s conditions, which you pre-define, such as a temperature reading above a certain threshold for a set amount of time, an alert is sent to your choice of destinations. According to the Rule shown below, If the average temperature exceeds 75°F for a period of 5 minutes, an alert is sent.

High-temperature rule configuration

As demonstrated below, when the laboratory temperature began to exceed 75°F, the alert entered a ‘Pending’ state. If the temperature exceeded 75°F for the pre-determined period of 5 minutes, the alert status changes to ‘Alerting’, and an alert is sent. When the temperature dropped back below 75°F for the pre-determined period of 5 minutes, the alert status changed from ‘Alerting’ to ‘OK’, and a subsequent notification was sent.

Average temperature graph showing the various alert status changes

There are currently twenty alert notifiers available out-of-the-box with Grafana, including Slack, email, PagerDuty, webhooks, VictorOps, Opsgenie, and Microsoft Teams. We can use Grafana Alerts to notify the proper resources, in near real-time, if an issue is detected based on the data. Below, we see an actual series of high-temperature alerts sent by Grafana to the Slack channel, followed by subsequent notifications as the temperature returned to normal.

Grafana alert notifications in Slack channel

Conclusion

This post explored the development of an IoT edge analytics stack comprised of lightweight, purpose-built, easily deployable and manageable platform- and programming language-agnostic, open-source software components. These components included Docker containerized versions of Grafana, TimescaleDB, Eclipse Mosquitto, and pgAdmin, referred to as the GTM Stack. Using the GTM stack, we collected, analyzed, and visualized IoT data without first shipping the data to Cloud or other external systems.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , ,

Leave a comment

AWS IoT Core for LoRaWAN, AWS IoT Analytics, and Amazon QuickSight

In the following post, we will learn how to monitor indoor air quality (IAQ) using a private LoRaWAN sensor device network. The devices transmit their sensor telemetry to AWS through a LoRaWAN gateway using the newly released AWS IoT Core for LoRaWAN service. We will then analyze and visualize the sensor data using AWS IoT Analytics and Amazon QuickSight.

Amazon QuickSight Dashboard showing IAQ sensor data

Introduction

On December 15, 2020, AWS announced support for Semtech’s low-power, long-range wide area network (LoRaWAN) connectivity. LoRaWAN devices and gateways can now connect to AWS IoT Core using AWS IoT Core for LoRaWAN. AWS IoT Core for LoRaWAN is a fully managed feature that enables customers to connect wireless devices that use the LoRaWAN protocol with the AWS cloud. Using AWS IoT Core, customers can now set up a private LoRaWAN network by securely connecting their LoRaWAN devices and gateways to the AWS cloud — without developing or operating a LoRaWAN Network Server (LNS).

AWS IoT Core for LoRaWAN website

The LoRa Alliance describes the LoRaWAN specification as a Low Power, Wide Area (LPWA) networking protocol designed to wirelessly connect battery operated ‘things’ to the internet in regional, national, or global networks, and targets key Internet of Things (IoT) requirements such as bi-directional communication, end-to-end security, mobility and localization services. The LoRaWAN specification defines both the device-to-infrastructure (LoRa) physical layer parameters and the (LoRaWAN) protocol, providing seamless interoperability between manufacturers.

According to Wikipedia, LoRa (Long Range) is a proprietary low-power wide-area network modulation technique. It is based on spread spectrum modulation techniques derived from chirp spread spectrum (CSS) technology. It was developed by Cycleo of Grenoble, France, and acquired by Semtech, the founding member of the LoRa Alliance, and it is patented.

AWS-qualified Hardware

Along with the AWS IoT Core for LoRaWAN announcement, AWS released a list of qualified gateways found in the AWS Partner Device Catalog. The catalog helps customers discover qualified hardware that works with AWS services to help build and deliver successful IoT solutions. AWS IoT Core for LoRaWAN supports open-source gateway-LNS protocol software called LoRa Basics Station, an implementation of a LoRa packet forwarder. The AWS IoT Core for LoRaWAN gateway qualification program enables customers to source pre-tested LoRaWAN gateways and developer kits that meet the required LoRa Basics Station specifications.

AWS Partner Device Catalog

LoRaWAN Gateway

In this post, we will use the AWS-qualified LoRaWAN-compliant MiniHub Pro gateway by Browan Communications. At $109, MiniHub Pro is a low-cost gateway that uses LoRa Basics Station to forward RF packets received from LoRaWAN devices (uplinks) to the LoRaWAN Network Server (LNS), part of the AWS IoT Core for LoRaWAN service. The MiniHub Pro is based on FreeRTOS, the real-time operating system for microcontrollers.

AWS-qualified LoRaWAN-compliant MiniHub Pro gateway

LoRaWAN Devices

In addition to the gateway, we will use a set of two Model TBHV110 915 Mhz Healthy Home Sensor IAQ (aka Tabs Healthy Home), also by Browan Communications Inc. According to Browan, the Healthy Home Sensor utilizes LoRaWAN connectivity (LoRaWAN 1.0.3) to communicate the temperature, relative humidity, volatile organic compound (VOC) levels, and indoor air quality (IAQ) of the surrounding environment. The Healthy Home Sensor costs $60 per sensor. The gateway and two devices used in this post were purchased from Cal-Chip Connected Devices at a total retail cost of $229 plus tax and shipping.

Healthy Home Sensor IAQ available from Cal-Chip Connected Devices

IAQ Index

The Healthy Home Sensor determines an IAQ Index, a reading between 0–500, indicating general air quality. According to the CDC, monitoring and maintaining good indoor air quality and proper ventilation have become essential to reduce the potential airborne spread of COVID-19.

Chart courtesy of the Browan Healthy Home Sensor (IAQ) Reference Guide

Source Code

The source code for this post is available on GitHub. Use the following command to git clone a local copy of the project.

git clone --branch main --single-branch --depth 1 \
https://github.com/garystafford/lorawan-iot-core-demo.git

The Lambda function and its associated AWS resources are deployed using the AWS Serverless Application Model (SAM). The primary AWS resources used in this demonstration are shown in the architectural diagram below.

The architecture of this post’s demonstration

Adding Gateways and Devices

In combination with the manufacturer’s product manual, the AWS documentation walks you through adding your LoRaWAN gateways and devices with AWS IoT Core for LoRaWAN. Adding gateways and devices is easy and straightforward on AWS as long as you have the appropriate GatewayEUI, DevEUI, AppEUI (aka JoinEUI), and AppKey, supplied by the manufacturer or retailer.

Adding a LoRaWAN device to AWS IoT Core for LoRaWAN

Over-the-Air Activation (OTAA)

In this post, the gateway and sensor devices are connected to AWS IoT Core for LoRaWAN using Over-the-Air Activation (OTAA). According to both The Things Network and AWS, OTAA is preferred over Activation by Personalization (ABP) as it is more secure. When using OTAA, temporary session keys are derived from root keys on each activation. ABP uses static keys, is less secure, and should not be used if not explicitly required by the use case.

Adding a LoRaWAN gateway to AWS IoT Core for LoRaWAN

The LoRaWAN gateway is added to AWS IoT Core for LoRaWAN. The gateway is then configured locally using the information received from IoT Core. With the MiniHub Pro, you can enable FreeRTOS Over-the-Air Updates and Configuration and Update Server (CUPS).

Configuring the MiniHub Pro gateway for Over-the-Air (OTA) updates
Configuring the MiniHub Pro gateway for Configuration and Update Server (CUPS)

Below, we see that the MiniHub Pro gateway has been successfully added to IoT Core and has exchanged uplink frames (RF packets) through that connection with the AWS IoT Core for LoRaWAN, which is acting as the LoRaWAN Network Server (LNS).

MiniHub Pro gateway added to AWS IoT Core for LoRaWAN

We then add the two Healthy Home Sensor devices in a similar fashion.

Adding a LoRaWAN device to AWS IoT Core for LoRaWAN

Below, we see that one of the two Healthy Home Sensor devices has successfully connected to AWS IoT Core for LoRaWAN and has also transmitted uplink frames via the MiniHub Pro gateway.

Healthy Home Sensor added to AWS IoT Core

IoT Destinations

Messages from the devices are received from the gateway and routed to an IoT rule using a Destination. According to AWS, a destination describes the AWS IoT rule that processes the data from a wireless device so that AWS IoT services can use the data. Many devices can share a single destination.

AWS IoT Core wireless connectivity destination connecting the device to an IoT Rule

IoT Rules

IoT rules, according to AWS, tell AWS IoT what to do when it receives messages from your devices. Rules extract data from messages, evaluate expressions using message data, and invoke one or more actions when the rule’s conditions are met.

IoT Rule that transforms base64 encoded binary device messages and passes them to IoT Analytics

Using AWS IoT Core for LoRaWAN, the device’s messages are base64 encoded binary messages. The IoT rule’s query statement is a SQL statement that determines which messages are forwarded to Actions. In the case of LoRaWAN, the query statement contains an inline call to an AWS Lambda function. The function accepts a number of input parameters. It decodes the base64 encoded message, decodes and translates the binary message, and builds a Python dictionary with the results. Finally, the dictionary is serialized to JSON and returned to the IoT rule.

SELECT WirelessDeviceId, WirelessMetadata, "tbhv110_915" as PayloadDecoderName,
aws_lambda("arn:aws:lambda:us-east-1:111222333444:function:lorawan-iot-core-TransformLoRaWANBinaryPayloadFunc-RQOC3C6CKGWZ",
{"PayloadDecoderName": "tbhv110_915",
"PayloadData": PayloadData,
"WirelessDeviceId": WirelessDeviceId,
"WirelessMetadata": WirelessMetadata}) as PayloadData
view raw lorawn_iot_rule.sql hosted with ❤ by GitHub
IoT rule query statement

Healthy Home Sensor Messages

LoRa uses a license-free sub-gigahertz radio frequency of 915 MHz in North America. The Healthy Home Sensor device transmits binary messages over this radio frequency using the LoRaWAN 1.0.3 specification. Binary messages sent by the Healthy Home Sensor device to the MiniHub Pro gateway have a payload length of 11 bytes, as follows:

Chart courtesy of the Browan Healthy Home Sensor (IAQ) Reference Guide

The device encrypts its binary message, containing sensor data, using AES128 CTR mode before transmitting it. AWS IoT Core for LoRaWAN decrypts the binary message, then encodes the decrypted binary message payload as a base64 string.

The final JSON-format message delivered by AWS IoT Core for LoRaWAN the IoT rule contains the device’s base64 encoded binary message (PayloadData), along with additional information about the source LoRaWAN device and the LoRaWAN gateway that transmitted the message.

{
"WirelessDeviceId": "3a3c05d6-a3a3-434f-c5f9-47aa04d41117",
"PayloadData": "AAs0LNsCAQB/ADM=",
"WirelessMetadata": {
"LoRaWAN": {
"DataRate": "3",
"DevEui": "40bb42d1c49fa3c1",
"FCnt": 1426,
"FPort": 103,
"Frequency": "904500000",
"Gateways": [
{
"GatewayEui": "d788ab85896d60b8",
"Rssi": -69,
"Snr": 9.5
}
],
"Timestamp": "2021-04-07T16:02:10Z"
}
},
"PayloadDecoderName": "tbhv110_915"
}

The IoT rule’s query statement calls a Lambda function to decode and transform the base64 encoded binary message. The Lambda calls the correct decoder, which contains logic to decode each byte of the binary message. AWS has developed the IoT Rule decoder Lambda along with several binary message payload decoders, freely available on GitHub, including:

  • Browan Tabs Object Locator
  • Dragino LHT65, LGT92, LSE01, LBT1, LDS01
  • Axioma W1
  • Elsys
  • Globalsat LT-100
  • NAS Pulse Reader UM3080

Since ASW did not include the Browan Healthy Home Sensor device in the list of provided decoders, I have created a new Python-based decoder using the message payload information detailed in the device’s product manual.

def dict_from_payload(base64_input: str, fport: int = None):
""" Healthy Home Sensor IAQ (TBHV110) binary payload decoder """
decoded = base64.b64decode(base64_input)
# Byte 0, bit 0
status = decoded[0] & 0b00000001 # (1 << 1) – 1
# Byte 1, bits 3:0
battery = decoded[1] & 0b00001111 # (1 << 4) – 1
battery = (25 + battery) / 10
# Byte 2, bits 6:0
board_temp = decoded[2] & 0b01111111 # (1 << 7) – 1
board_temp = board_temp 32
# Byte 3, bits 6:0
rh = decoded[3] & 0b01111111 # (1 << 7) – 1
# Byte 5-4, bits 15:0
eco2 = decoded[5] << 8 | decoded[4]
# Byte 7-6, bits 15:0
voc = decoded[7] << 8 | decoded[6]
# Byte 9-8, bits 15:0
iaq = decoded[9] << 8 | decoded[8]
# Byte 10, bits 6:0
env_temp = decoded[10] & 0b1111111 # (1 << 7) – 1
env_temp = env_temp 32
result = {
'Status': status,
'Battery': battery,
'BoardTemp': board_temp,
'RH': rh,
'ECO2': eco2,
'VOC': voc,
'IAQ': iaq,
'EnvTemp': env_temp,
}
return result
view raw tbhv110_decoder.py hosted with ❤ by GitHub

The decoder first decodes the base64 encoded binary message payload.

# base64 encoded binary message
AAs0LNsCAQB/ADM=

# base64 decoded 11-byte binary message
00000000 00001011 00110100 00101100 11011011 00000010 00000001 00000000 01111111 00000000 00110011
# as hexadecimal
00 0b 34 2c db 02 01 00 7f 00 33
# as decimal
0 11 52 44 219 2 1 0 127 0 51

The decoder then processes each byte of the binary message payload, bit-by-bit, and applies any additional logic to derive the final sensor values. Decoding the base64 encoded binary message payload and placing it back into the original message, we are left with the following message structure:

{
"WirelessDeviceId": "3a3c05d6-a3a3-434f-c5f9-47aa04d41117",
"PayloadData": {
"Status": 0,
"Battery": 3.6,
"BoardTemp": 20,
"RH": 44,
"ECO2