Posts Tagged Spring Boot

Eventual Consistency with Spring for Apache Kafka: Part 2 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

As discussed in Part One of this post, given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components in a local development environment running on Kubernetes with Istio, using minikube. For simplicity’s sake, we will only run a single instance of each service. Additionally, we are not implementing custom domain names, TLS/HTTPS, authentication and authorization, API keys, or restricting access to any sensitive operational API endpoints or ports, all of which we would certainly do in an actual production environment.

To provide operational visibility, we will add Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo ExpressKialiPrometheus, and Grafana to our system.

View of Storefront API traffic from Kiali

Prerequisites

This post will assume a basic level of knowledge of Kubernetes, minikube, Docker, and Istio. Furthermore, the post assumes you have already installed recent versions of minikube, kubectl, Docker, and Istio. Meaning, that the kubectl, istioctl, docker, and minikube commands are all available from the terminal.

Currently installed version of the required applications

For this post demonstration, I am using an Apple MacBook Pro running macOS as my development machine. I have the latest versions of Docker Desktop, minikube, kubectl, and Istio installed as of May 2021.

Source Code

The source code for this post is open-source and is publicly available on GitHub. Clone the GitHub project using the following command:

clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Minikube

Part of the Kubernetes project, minikube is local Kubernetes, focusing on making it easy to learn and develop for Kubernetes. Minikube quickly sets up a local Kubernetes cluster on macOS, Linux, and Windows. Given the number of Kubernetes resources we will be deploying to minikube, I would recommend at least 3 CPUs and 4–5 GBs of memory. If you choose to deploy multiple observability tools, you may want to increase both of these resources if you can afford it. I maxed out both CPUs and memory several times while setting up this demonstration, causing temporary lock-ups of minikube.

minikube --cpus 3 --memory 5g --driver=docker start start

The Docker driver allows you to install Kubernetes into an existing Docker install. If you are using Docker, please be aware that you must have at least an equivalent amount of resources allocated to Docker to apportion to minikube.

Before continuing, confirm minikube is up and running and confirm the current context of kubectl is minikube.

minikube status
kubectl config current-context

The statuses should look similar to the following:

Use the eval below command to point your shell to minikube’s docker-daemon. You can confirm this by using the docker image ls and docker container ls command to view running Kubernetes containers on minikube.

eval $(minikube -p minikube docker-env)
docker image ls
docker container ls

The output should look similar to the following:

You can also check the status of minikube from Docker Desktop. Minikube is running as a container, instantiated from a Docker image, gcr.io/k8s-minikube/kicbase. View the container’s Stats, as shown below.

Istio

Assuming you have downloaded and configured Istio, install it onto minikube. I currently have Istio 1.10.0 installed and have theISTIO_HOME environment variable set in my Oh My Zsh .zshrc file. I have also set Istio’s bin/ subdirectory in my PATH environment variable. The bin/ subdirectory contains the istioctl executable.

echo $ISTIO_HOME                                                                
> /Applications/Istio/istio-1.10.0
where istioctl
> /Applications/Istio/istio-1.10.0/bin/istioctl
istioctl version

> client version: 1.10.0
control plane version: 1.10.0
data plane version: 1.10.0 (4 proxies)

Istio comes with several built-in configuration profiles. The profiles provide customization of the Istio control plane and of the sidecars for the Istio data plane.

istioctl profile list
> Istio configuration profiles:
default
demo
empty
external
minimal
openshift
preview
remote

For this demonstration, we will use the default profile, which installs istiod and an istio-ingressgateway. We will not require the use of an istio-egressgateway, since all components will be installed locally on minikube.

istioctl install --set profile=default -y
> ✔ Istio core installed
✔ Istiod installed
✔ Ingress gateways installed
✔ Installation complete

Minikube Tunnel

kubectl get svc istio-ingressgateway -n istio-system

To associate an IP address, run the minikube tunnel command in a separate terminal tab. Since it requires opening privileged ports 80 and 443 to be exposed, this command will prompt you for your sudo password.

Services of the type LoadBalancer can be exposed by using the minikube tunnel command. It must be run in a separate terminal window to keep the LoadBalancer running. We previously created the istio-ingressgateway. Run the following command and note that the status of EXTERNAL-IP is <pending>. There is currently no external IP address associated with our LoadBalancer.

minikube tunnel

Rerun the previous command. There should now be an external IP address associated with the LoadBalancer. In my case, 127.0.0.1.

kubectl get svc istio-ingressgateway -n istio-system

The external IP address shown is the address we will use to access the resources we chose to expose externally on minikube.

Minikube Dashboard

Once again, in a separate terminal tab, open the Minikube Dashboard (aka Kubernetes Dashboard).

minikube dashboard

The dashboard will give you a visual overview of all your installed Kubernetes components.

Minikube Dashboard showing the istio-system namespace

Namespaces

Kubernetes supports multiple virtual clusters backed by the same physical cluster. These virtual clusters are called namespaces. For this demonstration, we will use four namespaces to organize our deployed resources: dev, mongo, kafka, and storefront-kafka-project. The dev namespace is where we will deploy our Storefront API’s microservices: accounts, orders, and fulfillment. We will deploy MongoDB and Mongo Express to the mongo namespace. Lastly, we will use the kafka and storefront-kafka-project namespaces to deploy Apache Kafka to minikube using Strimzi, a Cloud Native Computing Foundation sandbox project, and CMAK.

kubectl apply -f ./minikube/resources/namespaces.yaml

Automatic Sidecar Injection

In order to take advantage of all of Istio’s features, pods in the mesh must be running an Istio sidecar proxy. When you set the istio-injection=enabled label on a namespace and the injection webhook is enabled, any new pods created in that namespace will automatically have a sidecar added to them. Labeling the dev namespace for automatic sidecar injection ensures that our Storefront API’s microservices — accounts, orders, and fulfillment— will have Istio sidecar proxy automatically injected into their pods.

kubectl label namespace dev istio-injection=enabled

MongoDB

Next, deploy MongoDB and Mongo Express to the mongo namespace on minikube. To ensure a successful connection to MongoDB from Mongo Express, I suggest giving MongoDB a chance to start up fully before deploying Mongo Express.

kubectl apply -f ./minikube/resources/mongodb.yaml -n mongo
sleep 60
kubectl apply -f ./minikube/resources/mongo-express.yaml -n mongo

To confirm the success of the deployments, use the following command:

kubectl get services -n mongo

Or use the Kubernetes Dashboard to confirm deployments.

Mongo Express UI Access

For parts of your application (for example, frontends) you may want to expose a Service onto an external IP address outside of your cluster. Kubernetes ServiceTypes allows you to specify what kind of Service you want; the default is ClusterIP.

Note that while MongoDB uses the ClusterIP, Mongo Express uses NodePort. With NodePort, the Service is exposed on each Node’s IP at a static port (the NodePort). You can contact the NodePort Service, from outside the cluster, by requesting <NodeIP>:<NodePort>.

In a separate terminal tab, open Mongo Express using the following command:

minikube service --url mongo-express -n mongo

You should see output similar to the following:

Click on the link to open Mongo Express. There should already be three MongoDB operational databases shown in the UI. The three Storefront databases and collections will be created automatically, later in the post: accounts, orders, and fulfillment.

Apache Kafka using Strimzi

Next, we will install Apache Kafka and Apache Zookeeper into the kafka and storefront-kafka-project namespaces on minikube, using Strimzi. Since Strimzi has a great, easy-to-use Quick Start guide, I will not detail the complete install complete process in this post. I suggest using their guide to understand the process and what each command does. Then, use the slightly modified Strimzi commands I have included below to install Kafka and Zookeeper.

# assuming 0.23.0 is latest version available
curl -L -O https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.23.0/strimzi-0.23.0.zip
unzip strimzi-0.23.0.zip
cd strimzi-0.23.0
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
# manually change STRIMZI_NAMESPACE value to storefront-kafka-project
nano install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
kubectl create -f install/cluster-operator/ -n kafka
kubectl create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-cluster.yaml -n storefront-kafka-project
kubectl wait kafka/kafka-cluster --for=condition=Ready --timeout=300s -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-topics.yaml -n storefront-kafka-project

Zoo Entrance

We want to install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. However, CMAK required access to Zookeeper. You can not access Strimzi’s Zookeeper directly from CMAK; this is intentional to avoid performance and security issues. See this GitHub issue for a better explanation of why. We will use the appropriately named Zoo Entrance as a proxy for CMAK to Zookeeper to overcome this challenge.

To install Zoo Entrance, review the GitHub project’s install guide, then use the following commands:

git clone https://github.com/scholzj/zoo-entrance.git
cd zoo-entrance
# optional: change my-cluster to kafka-cluster
sed -i '' 's/my-cluster/kafka-cluster/' deploy.yaml
kubectl apply -f deploy.yaml -n storefront-kafka-project

Cluster Manager for Apache Kafka

Next, install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. Run the following command to deploy CMAK into the storefront-kafka-project namespace.

kubectl apply -f ./minikube/resources/cmak.yaml -n storefront-kafka-project

Similar to Mongo Express, we can access CMAK’s UI using its NodePort. In a separate terminal tab, run the following command:

minikube service --url cmak -n storefront-kafka-project

You should see output similar to Mongo Express. Click on the link provided to access CMAK. Choose ‘Add Cluster’ in CMAK to add our existing Kafka cluster to CMAK’s management interface. Use Zoo Enterence’s service address for the Cluster Zookeeper Hosts value.

zoo-entrance.storefront-kafka-project.svc:2181

Once complete, you should see the three Kafka topics we created previously with Strimzi: accounts.customer.change, fulfillment.order.change, and orders.order.change. Each topic will have three partitions, one replica, and one broker. You should also see the _consumer_offsets topic that Kafka uses to store information about committed offsets for each topic:partition per group of consumers (groupID).

Storefront API Microservices

We are finally ready to install our Storefront API’s microservices into the dev namespace. Each service is preconfigured to access Kafka and MongoDB in their respective namespaces.

kubectl apply -f ./minikube/resources/accounts.yaml -n dev
kubectl apply -f ./minikube/resources/orders.yaml -n dev
kubectl apply -f ./minikube/resources/fulfillment.yaml -n dev

Spring Boot services usually take about two minutes to fully start. The time required to download the Docker Images from docker.com and the start-up time means it could take 3–4 minutes for each of the three services to be ready to accept API traffic.

Istio Components

We want to be able to access our Storefront API’s microservices through our Kubernetes LoadBalancer, while also leveraging all the capabilities of Istio as a service mesh. To do so, we need to deploy an Istio Gateway and a VirtualService. We will also need to deploy DestinationRule resources. A Gateway describes a load balancer operating at the edge of the mesh receiving incoming or outgoing HTTP/TCP connections. A VirtualService defines a set of traffic routing rules to apply when a host is addressed. Lastly, a DestinationRule defines policies that apply to traffic intended for a Service after routing has occurred.

kubectl apply -f ./minikube/resources/destination_rules.yaml -n dev
kubectl apply -f ./minikube/resources/istio-gateway.yaml -n dev

Testing the System and Creating Sample Data

I have provided a Python 3 script that runs a series of seven HTTP GET requests, in a specific order, against the Storefront API. These calls will validate the deployments, confirm the API’s services can access Kafka and MongoDB, generate some initial data, and automatically create the MongoDB database collections from the initial Insert statements.

python3 -m pip install -r ./utility_scripts/requirements.txt -U
python3 ./utility_scripts/refresh.py

The script’s output should be as follows:

If we now look at Mongo Express, we should note three new databases: accounts, orders, and fulfillment.

Observability Tools

Istio makes it easy to integrate with a number of common tools, including cert-managerPrometheusGrafanaKialiZipkin, and Jaeger. In order to better observe our Storefront API, we will install three well-known observability tools: Kiali, Prometheus, and Grafana. Luckily, these tools are all included with Istio. You can install any or all of these to minikube. I suggest installing the tools one at a time as not to overwhelm minikube’s CPU and memory resources.

kubectl apply -f ./minikube/resources/prometheus.yaml

kubectl apply -f $ISTIO_HOME/samples/addons/grafana.yaml

kubectl apply -f $ISTIO_HOME/samples/addons/kiali.yaml

Once deployment is complete, to access any of the UI’s for these tools, use the istioctl dashboard command from a new terminal window:

istioctl dashboard kiali

istioctl dashboard prometheus

istioctl dashboard grafana

Kiali

Below we see a view of Kiali with API traffic flowing to Kafka and MongoDB.

View of Storefront API traffic from Kiali

Prometheus

Each of the three Storefront API microservices has a dependency on Micrometer; specifically, a dependency on micrometer-registry-prometheus. As an instrumentation facade, Micrometer allows you to instrument your code with dimensional metrics with a vendor-neutral interface and decide on the monitoring system as a last step. Instrumenting your core library code with Micrometer allows the libraries to be included in applications that ship metrics to different backends. Given the Micrometer Prometheus dependency, each microservice exposes a /prometheus endpoint (e.g., http://127.0.0.1/accounts/actuator/prometheus) as shown below in Postman.

The /prometheus endpoint exposes dozens of useful metrics and is configured to be scraped by Prometheus. These metrics can be displayed in Prometheus and indirectly in Grafana dashboards via Prometheus. I have customized Istio’s version of Prometheus and included it in the project (prometheus.yaml), which now scrapes the Storefront API’s metrics.

scrape_configs:
- job_name: 'spring_micrometer'
metrics_path: '/actuator/prometheus'
scrape_interval: 5s
static_configs:
- targets: ['accounts.dev:8080','orders.dev:8080','fulfillment.dev:8080']

Here we see an example graph of a Spring Kafka Listener metric, spring_kafka_listener_seconds_sum, in Prometheus. There are dozens of metrics exposed to Prometheus from our system that we can observe and alert on.

Grafana

Lastly, here is an example Spring Boot Dashboard in Grafana. More dashboards are available on Grafana’s community dashboard page. The Grafana dashboard uses Prometheus as the source of its metrics data.

Storefront API Endpoints

The three storefront services are fully functional Spring Boot, Spring Data REST, Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. To better understand the Storefront API, each Spring Boot microservice uses SpringFox, which produces automated JSON API documentation for APIs built with Spring. The service builds also include the springfox-swagger-ui web jar, which ships with Swagger UI. Swagger takes the manual work out of API documentation, with a range of solutions for generating, visualizing, and maintaining API docs.

From a web browser, you can use the /swagger-ui/ subdirectory/subpath with any of the three microservices to access the fully-featured Swagger UI (e.g., http://127.0.0.1/accounts/swagger-ui/).

Accounts service Customer entity endpoints

Each service’s data model (POJOs) is also exposed through the Swagger UI.

Accounts service data model

Spring Boot Actuator

Additionally, each service includes Spring Boot Actuator. The Actuator exposes additional operational endpoints, allowing us to observe the running services. With Actuator, you get many features, including access to available operational-oriented endpoints, using the /actuator/ subdirectory/subpath (e.g., http://127.0.0.1/accounts/actuator/). For this demonstration, I have not restricted access to any available Actuator endpoints.

Partial list of Spring Boot Actuator endpoints as seen using Swagger
Partial list of Spring Boot Actuator endpoints as seen using Postman

Conclusion

In this two-part post, we learned how to build an API using Spring Boot. We ensured the API’s distributed data integrity using a pub/sub model with Spring for Apache Kafka Project. When a relevant piece of data was changed by one microservice, that state change triggered a state change event that was shared with other microservices using Kafka topics.

We also learned how to deploy and run the API in a local development environment running on Kubernetes with Istio, using minikube. We have added production-tested observability tools to provide operational visibility, including CMAK, Mongo Express, Kiali, Prometheus, and Grafana.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment

Eventual Consistency with Spring for Apache Kafka: Part 1 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

Given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Introduction

Apache Kafka is an open-source distributed event streaming platform capable of handling trillions of messages. According to Confluent, initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform.

Eventual consistency, according to Wikipedia, is a consistency model used in distributed computing to achieve high availability that informally guarantees that if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. I previously covered the topic of eventual consistency in a distributed system using RabbitMQ in the May 2017 post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. The post was featured on Pivotal’s RabbitMQ website.

Domain-driven Design

To ground the discussion, let’s examine a common example — an online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

Given this problem domain, we can assume we have the concept of a Customer. Further, we can assume the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of the Customer will require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program and online shopping activity. Fulfillment may maintain a record of all orders being shipped to the customer. Security likely holds the customer’s access credentials, account access history, and privacy settings.

Below are the Customer data objects are shown in yellow. Orange represents the logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts or even between services within the same context, then we must ensure data consistency. Take, for example, the case where a customer changes their home address or email. Let us assume that the Accounting context is the system of record for these data fields. However, to fulfill orders, the Shipping context might also need to maintain the customer’s current home address. Likewise, the Marketing context, responsible for opt-in email advertising, also needs to be aware of the email change and update its customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change without expecting a response. They are stating a fact, not asking a question. Interested parties can choose if and how to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, defined by Martin Fowler of ThoughtWorks in his insightful post, What do you mean by “Event-Driven”?. Changes to a piece of data can be thought of as a state change event — events that contain details of the data that changed. Coincidentally, Fowler uses a customer’s address change as an example of Event-Carried State Transfer in the post. Fellow former ThoughtWorker Graham Brooks also detailed the concept in his post, Event-Carried State Transfer Pattern.

Consistency Strategies

Multiple architectural approaches can be taken to solve for data consistency in a distributed system. For example, you could use a single relational database with shared schemas to persist data, avoiding the distributed data model altogether. However, it could be argued that using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages persist in Kafka, the service has the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity to the system.

In this post’s somewhat simplistic architecture, the business microservices will maintain consistency across their respective domains by producing and consuming messages from multiple Kafka topics to which they are subscribed. Kafka Producers may also be Consumers within our domain.

Storefront Example

In this post, our online storefront API will be built in Java using Spring Boot and OpenJDK 16. We will ensure the uniformity of distributed data by using a publish/subscribe model with Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot microservice, if appropriate, that state change will trigger a state change event, which will be shared with other microservices using Kafka topics.

View of the Storefront API from Kiali

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from one another while still ensuring the data is distributed.

Given the use case of placing an order, we will examine the interactions of three services that compose our storefront API: the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other in a completely decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at three event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service.

Below is a view of the online storefront through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you an idea of where Kafka and Zookeeper, Kafka’s current cluster manager, might sit in a typical, highly-available, microservice-based, distributed application platform.

This post will focus on the storefront’s backend API — its services, databases, and messaging sub-systems.

Storefront Microservices

We will explore the functionality of each of the three microservices and how they share state change events using Kafka 2.8. Each storefront API service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream are not part of this post.

Source Code

The storefront’s microservices source code is publicly available on GitHub. The four GitHub projects can be cloned using the following commands:

git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-accounts.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-orders.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-fulfillment.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. Below we see the representation of a Customer, as a BSON document in the customer.accounts MongoDB database collection.

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

Since the CustomerChangeEvent domain event object does not persist in MongoDB, we can look at its JSON message payload in Kafka to examine its structure. Note the differences in the data structure (schema) between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload.

{
"id": "5b189af9a8d05613315b0212",
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}, {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}]
}

For simplicity, we will assume that other services do not make changes to the customer’s name, contact information, or addresses — this is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub. Use the latest 2021-istio branch of the project.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows:

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. The FulfillmentRequestEvent object only contains the information it needs to share. Our example shares a single Order, along with the customer’s name, contact information, and shipping address.

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at its JSON message payload in Kafka. Again, note the schema differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka.

{
"timestamp": 1528334218821,
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"address": {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
"order": {
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvents": [{
"timestamp": 1528333926586,
"orderStatusType": "CREATED",
"note": null
}, {
"timestamp": 1528333926586,
"orderStatusType": "APPROVED",
"note": null
}],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": 11.99
},
"quantity": 5
}]
}
}

Source code for the Orders service is available on GitHub. Use the latest 2021-istio branch of the project.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store the latest shipping status, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping address are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object represented as a BSON document in the fulfillment.requests database collection looks as follows:

{
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"),
"timestamp": NumberLong("1528553706260"),
"name": {
"title": "Ms.",
"firstName": "Susan",
"lastName": "Blackstone"
},
"contact": {
"primaryPhone": "433-544-6555",
"secondaryPhone": "223-445-6767",
"email": "susan.m.blackstone@emailisus.com"
},
"address": {
"type": "SHIPPING",
"description": "Home Sweet Home",
"address1": "33 Oak Avenue",
"city": "Nowhere",
"state": "VT",
"postalCode": "444556-9090"
},
"order": {
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44",
"orderStatusEvents": [{
"timestamp": NumberLong("1528558453686"),
"orderStatusType": "RECEIVED"
}],
"orderItems": [{
"product": {
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa",
"title": "Purple Widget"
},
"quantity": 2
},
{
"product": {
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897",
"title": "Blue Widget"
},
"quantity": 5
},
{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget"
},
"quantity": 2
}
]
},
"shippingMethod": "Drone",
"_class": "com.storefront.model.Fulfillment"
}

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, again, we can again look at its JSON message payload in Kafka.

{
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvent": {
"timestamp": 1528334452746,
"orderStatusType": "PROCESSING",
"note": null
}
}

Source code for the Fulfillment service is available on GitHub. Use the latest 2021-istio branch of the project.

State Change Event Messaging Flows

There are three state change event messaging flows illustrated in this post.

  1. Changes to a Customer triggers an event message produced by the Accounts service, which is published on the accounts.customer.change Kafka topic and consumed by the Orders service;
  2. Order Approved triggers an event message produced by the Orders service, which is published on the orders.order.fulfill Kafka topic, and is consumed by the Fulfillment service;
  3. Changes to the status of an Order triggers an event message produced by the Fulfillment Service, which is published on the fulfillment.order.change Kafka topic, and is consumed by the Orders service;

Each of these state change event messaging flows follows the same architectural pattern on both the Kafka topic’s producer and consumer sides.

Let us examine each state change event messaging flow and the code behind it.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. By way of Kafka, it can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information.

There are different methods to trigger a message to be sent to Kafka. For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity.

@Slf4j
@Controller
public class AfterSaveListener extends AbstractMongoEventListener<Customer> {
@Value("${spring.kafka.topic.accounts-customer}")
private String topic;
private Sender sender;
@Autowired
public AfterSaveListener(Sender sender) {
this.sender = sender;
}
@Override
public void onAfterSave(AfterSaveEvent<Customer> event) {
log.info("onAfterSave event='{}'", event);
Customer customer = event.getSource();
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent();
customerChangeEvent.setId(customer.getId());
customerChangeEvent.setName(customer.getName());
customerChangeEvent.setContact(customer.getContact());
customerChangeEvent.setAddresses(customer.getAddresses());
sender.send(topic, customerChangeEvent);
}
}

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate;
public void send(String topic, CustomerChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender uses a KafkaTemplate to send the message to the accounts.customer.change Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

The Orders service’s Receiver class consumes the CustomerChangeEvent messages produced by the Accounts service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Orders service’s Receiver class is configured differently compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as input, denoting the object type into which the message payload needs to be deserialized. This way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file in each service’s resources directory contains the Kafka configuration (lines 11–19).

server:
port: 8080
spring:
main:
allow-bean-definition-overriding: true
application:
name: orders
data:
mongodb:
uri: mongodb://mongo:27017/orders
kafka:
bootstrap-servers: kafka:9092
topic:
accounts-customer: accounts.customer.change
orders-order: orders.order.fulfill
fulfillment-order: fulfillment.order.change
consumer:
group-id: orders
auto-offset-reset: earliest
zipkin:
sender:
type: kafka
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: INFO
spring:
config:
activate:
on-profile: local
data:
mongodb:
uri: mongodb://localhost:27017/orders
kafka:
bootstrap-servers: localhost:9092
server:
port: 8090
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
spring:
config:
activate:
on-profile: confluent
server:
port: 8080
logging:
level:
root: INFO
server:
port: 8080
spring:
config:
activate:
on-profile: minikube
data:
mongodb:
uri: mongodb://mongo.dev:27017/orders
kafka:
bootstrap-servers: kafka-cluster.dev:9092
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG

Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the orders.order.fulfill Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate;
public void send(String topic, FulfillmentRequestEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender class uses a KafkaTemplate to send the message to the orders.order.fulfill Kafka topic, as shown below. Since message order is not critical, messages can be sent to a topic with multiple partitions if the volume of messages required it.

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. The Fulfillment object includes the order to be fulfilled and the customer’s contact and shipping information.

@Slf4j
@Component
public class Receiver {
@Autowired
private FulfillmentRepository fulfillmentRepository;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.orders-order}")
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) {
log.info("received payload='{}'", fulfillmentRequestEvent.toString());
latch.countDown();
Fulfillment fulfillment = new Fulfillment();
fulfillment.setId(fulfillmentRequestEvent.getId());
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp());
fulfillment.setName(fulfillmentRequestEvent.getName());
fulfillment.setContact(fulfillmentRequestEvent.getContact());
fulfillment.setAddress(fulfillmentRequestEvent.getAddress());
fulfillment.setOrder(fulfillmentRequestEvent.getOrder());
fulfillmentRepository.save(fulfillment);
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(FulfillmentRequestEvent.class));
}
@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Fulfillment Order Status State Change

When the Order status in a Fulfillment entity is changed to anything other than Approved, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial Created status to the final Received status.

The Fulfillment service exposes several endpoints via the FulfillmentController class, which simulates a change in order status. They allow an order’s status to be changed from Approved to Processing, to Shipped, to In Transit, and finally to Received. This change applies to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates a Kafka message, containing the OrderStatusChangeEvent in the message payload. The Fulfillment service’s Sender class handles this.

Note in this example that these two events are not handled in an atomic transaction. Either updating the database or sending the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure that both these independent actions succeed or fail as a single transaction to ensure data consistency, using any of a handful of common architectural patterns.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;
public void send(String topic, OrderStatusChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender class uses a KafkaTemplate to send the message to the fulfillment.order.change Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages can be sent to a topic with multiple partitions if the volume of messages requires it.

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message produced by the Fulfillment service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service receives messages from more than one topic. The ReceiverConfig class deserializes all messages using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory class’s setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components into a local development environment running on Kubernetes with Istio, using Minikube. To provide operational visibility, we will add observability tools, like Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo Express, Kiali, Prometheus, and Grafana to our system.

View of the Storefront API from Kiali

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine

Leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications. In previous posts, we have integrated other SaaS products, including as MongoDB Atlas fully-managed MongoDB-as-a-service, ElephantSQL fully-manage PostgreSQL-as-a-service, and CloudAMQP RabbitMQ-as-a-service, into cloud-native applications on Azure, AWS, GCP, and PCF.

In this post, we will build and deploy an existing, Spring Framework, microservice-based, cloud-native API to Google Kubernetes Engine (GKE), replete with Istio 1.0, on Google Cloud Platform (GCP). The API will rely on Confluent Cloud to provide a fully-managed, Kafka-based messaging-as-a-service (MaaS). Similarly, the API will rely on MongoDB Atlas to provide a fully-managed, MongoDB-based Database-as-a-service (DBaaS).

Background

In a previous two-part post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1 and Part 2, we examined the role of Apache Kafka in an event-driven, eventually consistent, distributed system architecture. The system, an online storefront RESTful API simulation, was composed of multiple, Java Spring Boot microservices, each with their own MongoDB database. The microservices used a publish/subscribe model to communicate with each other using Kafka-based messaging. The Spring services were built using the Spring for Apache Kafka and Spring Data MongoDB projects.

Given the use case of placing an order through the Storefront API, we examined the interactions of three microservices, the Accounts, Fulfillment, and Orders service. We examined how the three services used Kafka to communicate state changes to each other, in a fully-decoupled manner.

The Storefront API’s microservices were managed behind an API Gateway, Netflix’s Zuul. Service discovery and load balancing were handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. In that post, the entire containerized system was deployed to Docker Swarm.

Kafka-Eventual-Cons-Swarm.png

Developing the services, not operationalizing the platform, was the primary objective of the previous post.

Featured Technologies

The following technologies are featured prominently in this post.

Confluent Cloud

confluent_cloud_apache-300x228

In May 2018, Google announced a partnership with Confluence to provide Confluent Cloud on GCP, a managed Apache Kafka solution for the Google Cloud Platform. Confluent, founded by the creators of Kafka, Jay Kreps, Neha Narkhede, and Jun Rao, is known for their commercial, Kafka-based streaming platform for the Enterprise.

Confluent Cloud is a fully-managed, cloud-based streaming service based on Apache Kafka. Confluent Cloud delivers a low-latency, resilient, scalable streaming service, deployable in minutes. Confluent deploys, upgrades, and maintains your Kafka clusters. Confluent Cloud is currently available on both AWS and GCP.

Confluent Cloud offers two plans, Professional and Enterprise. The Professional plan is optimized for projects under development, and for smaller organizations and applications. Professional plan rates for Confluent Cloud start at $0.55/hour. The Enterprise plan adds full enterprise capabilities such as service-level agreements (SLAs) with a 99.95% uptime and virtual private cloud (VPC) peering. The limitations and supported features of both plans are detailed, here.

MongoDB Atlas

mongodb

Similar to Confluent Cloud, MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime SLAs, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

MongoDB Atlas has been featured in several past posts, including Deploying and Configuring Istio on Google Kubernetes Engine (GKE) and Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.

Kubernetes Engine

gkeAccording to Google, Google Kubernetes Engine (GKE) provides a fully-managed, production-ready Kubernetes environment for deploying, managing, and scaling your containerized applications using Google infrastructure. GKE consists of multiple Google Compute Engine instances, grouped together to form a cluster.

A forerunner to other managed Kubernetes platforms, like EKS (AWS), AKS (Azure), PKS (Pivotal), and IBM Cloud Kubernetes Service, GKE launched publicly in 2015. GKE was built on Google’s experience of running hyper-scale services like Gmail and YouTube in containers for over 12 years.

GKE’s pricing is based on a pay-as-you-go, per-second-billing plan, with no up-front or termination fees, similar to Confluent Cloud and MongoDB Atlas. Cluster sizes range from 1 – 1,000 nodes. Node machine types may be optimized for standard workloads, CPU, memory, GPU, or high-availability. Compute power ranges from 1 – 96 vCPUs and memory from 1 – 624 GB of RAM.

Demonstration

In this post, we will deploy the three Storefront API microservices to a GKE cluster on GCP. Confluent Cloud on GCP will replace the previous Docker-based Kafka implementation. Similarly, MongoDB Atlas will replace the previous Docker-based MongoDB implementation.

ConfluentCloud-v3a.png

Kubernetes and Istio 1.0 will replace Netflix’s Zuul and  Eureka for API management, load-balancing, routing, and service discovery. Google Stackdriver will provide logging and monitoring. Docker Images for the services will be stored in Google Container Registry. Although not fully operationalized, the Storefront API will be closer to a Production-like platform, than previously demonstrated on Docker Swarm.

ConfluentCloudRouting.png

For brevity, we will not enable standard API security features like HTTPS, OAuth for authentication, and request quotas and throttling, all of which are essential in Production. Nor, will we integrate a full lifecycle API management tool, like Google Apigee.

Source Code

The source code for this demonstration is contained in four separate GitHub repositories, storefront-kafka-dockerstorefront-demo-accounts, storefront-demo-orders, and, storefront-demo-fulfillment. However, since the Docker Images for the three storefront services are available on Docker Hub, it is only necessary to clone the storefront-kafka-docker project. This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources (gist).

git clone –branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/storefront-kafka-docker.git
# optional repositories
git clone –branch gke –single-branch –depth 1 –no-tags \
https://github.com/garystafford/storefront-demo-accounts.git
git clone –branch gke –single-branch –depth 1 –no-tags \
https://github.com/garystafford/storefront-demo-orders.git
git clone –branch gke –single-branch –depth 1 –no-tags \
https://github.com/garystafford/storefront-demo-fulfillment.git

Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

Setup Process

The setup of the Storefront API platform is divided into a few logical steps:

  1. Create the MongoDB Atlas cluster;
  2. Create the Confluent Cloud Kafka cluster;
  3. Create Kafka topics;
  4. Modify the Kubernetes resources;
  5. Modify the microservices to support Confluent Cloud configuration;
  6. Create the GKE cluster with Istio on GCP;
  7. Apply the Kubernetes resources to the GKE cluster;
  8. Test the Storefront API, Kafka, and MongoDB are functioning properly;

MongoDB Atlas Cluster

This post assumes you already have a MongoDB Atlas account and an existing project created. MongoDB Atlas accounts are free to set up if you do not already have one. Account creation does require the use of a Credit Card.

For minimal latency, we will be creating the MongoDB Atlas, Confluent Cloud Kafka, and GKE clusters, all on the Google Cloud Platform’s us-central1 Region. Available GCP Regions and Zones for MongoDB Atlas, Confluent Cloud, and GKE, vary, based on multiple factors.

screen_shot_2018-12-23_at_6.48.12_pm

For this demo, I suggest creating a free, M0-sized MongoDB cluster. The M0-sized 3-data node cluster, with shared RAM and 512 MB of storage, and currently running MongoDB 4.0.4, is fine for individual development. The us-central1 Region is the only available US Region for the free-tier M0-cluster on GCP. An M0-sized Atlas cluster may take between 7-10 minutes to provision.

screen_shot_2018-12-23_at_6.49.24_pm

MongoDB Atlas’ Web-based management console provides convenient links to cluster details, metrics, alerts, and documentation.

screen_shot_2018-12-23_at_6.51.41_pm

Once the cluster is ready, you can review details about the cluster and each individual cluster node.

screen_shot_2018-12-23_at_6.51.54_pm

In addition to the account owner, create a demo_user account. This account will be used to authenticate and connect with the MongoDB databases from the storefront services. For this demo, we will use the same, single user account for all three services. In Production, you would most likely have individual users for each service.

screen_shot_2018-12-23_at_6.52.18_pm

Again, for security purposes, Atlas requires you to whitelist the IP address or CIDR block from which the storefront services will connect to the cluster. For now, open the access to your specific IP address using whatsmyip.com, or much less-securely, to all IP addresses (0.0.0.0/0). Once the GKE cluster and external static IP addresses are created, make sure to come back and update this value; do not leave this wide open to the Internet.

screen_shot_2018-12-23_at_6.52.36_pm

The Java Spring Boot storefront services use a Spring Profile, gke. According to Spring, Spring Profiles provide a way to segregate parts of your application configuration and make it available only in certain environments. The gke Spring Profile’s configuration values may be set in a number of ways. For this demo, the majority of the values will be set using Kubernetes Deployment, ConfigMap and Secret resources, shown later.

The first two Spring configuration values will need are the MongoDB Atlas cluster’s connection string and the demo_user account password. Note these both for later use.

screen_shot_2018-12-23_at_6.53.00_pm

Confluent Cloud Kafka Cluster

Similar to MongoDB Atlas, this post assumes you already have a Confluent Cloud account and an existing project. It is free to set up a Professional account and a new project if you do not already have one. Atlas account creation does require the use of a Credit Card.

The Confluent Cloud web-based management console is shown below. Experienced users of other SaaS platforms may find the Confluent Cloud web-based console a bit sparse on features. In my opinion, the console lacks some necessary features, like cluster observability, individual Kafka topic management, detailed billing history (always says $0?), and persistent history of cluster activities, which survives cluster deletion. It seems like Confluent prefers users to download and configure their Confluent Control Center to get the functionality you might normally expect from a web-based Saas management tool.

screen_shot_2018-12-23_at_6.34.18_pm

As explained earlier, for minimal latency, I suggest creating the MongoDB Atlas cluster, Confluent Cloud Kafka cluster, and the GKE cluster, all on the Google Cloud Platform’s us-central1 Region. For this demo, choose the smallest cluster size available on GCP, in the us-central1 Region, with 1 MB/s R/W throughput and 500 MB of storage. As shown below, the cost will be approximately $0.55/hour. Don’t forget to delete this cluster when you are done with the demonstration, or you will continue to be charged.

screen_shot_2018-12-23_at_6.34.56_pm

Cluster creation of the minimally-sized Confluent Cloud cluster is pretty quick.

screen_shot_2018-12-23_at_6.39.52_pmOnce the cluster is ready, Confluent provides instructions on how to interact with the cluster via the Confluent Cloud CLI. Install the Confluent Cloud CLI, locally, for use later.

screen_shot_2018-12-23_at_6.35.56_pm

As explained earlier, the Java Spring Boot storefront services use a Spring Profile, gke. Like MongoDB Atlas, the Confluent Cloud Kafka cluster configuration values will be set using Kubernetes ConfigMap and Secret resources, shown later. There are several Confluent Cloud Java configuration values shown in the Client Config Java tab; we will need these for later use.

screen_shot_2018-12-23_at_6.36.12_pm

SASL and JAAS

Some users may not be familiar with the terms, SASL and JAAS. According to Wikipedia, Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols. According to Confluent, Kafka brokers support client authentication via SASL. SASL authentication can be enabled concurrently with SSL encryption (SSL client authentication will be disabled).

There are numerous SASL mechanisms.  The PLAIN SASL mechanism (SASL/PLAIN), used by Confluent, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use. The SASL/PLAIN mechanism should only be used with SSL as a transport layer to ensure that clear passwords are not transmitted on the wire without encryption.

According to Wikipedia, Java Authentication and Authorization Service (JAAS) is the Java implementation of the standard Pluggable Authentication Module (PAM) information security framework. According to Confluent, Kafka uses the JAAS for SASL configuration. You must provide JAAS configurations for all SASL authentication mechanisms.

Cluster Authentication

Similar to MongoDB Atlas, we need to authenticate with the Confluent Cloud cluster from the storefront services. The authentication to Confluent Cloud is done with an API Key. Create a new API Key, and note the Key and Secret; these two additional pieces of configuration will be needed later.

screen_shot_2018-12-23_at_6.38.09_pm

Confluent Cloud API Keys can be created and deleted as necessary. For security in Production, API Keys should be created for each service and regularly rotated.

screen_shot_2018-12-23_at_6.38.21_pm

Kafka Topics

With the cluster created, create the storefront service’s three Kafka topics manually, using the Confluent Cloud’s ccloud CLI tool. First, configure the Confluent Cloud CLI using the ccloud init command, using your new cluster’s Bootstrap Servers address, API Key, and API Secret. The instructions are shown above Clusters Client Config tab of the Confluent Cloud web-based management interface.

screen_shot_2018-12-26_at_2.05.09_pm

Create the storefront service’s three Kafka topics using the ccloud topic create command. Use the list command to confirm they are created.

# manually create kafka topics
ccloud topic create accounts.customer.change
ccloud topic create fulfillment.order.change
ccloud topic create orders.order.fulfill
  
# list kafka topics
ccloud topic list
  
accounts.customer.change
fulfillment.order.change
orders.order.fulfill

Another useful ccloud command, topic describe, displays topic replication details. The new topics will have a replication factor of 3 and a partition count of 12.

screen_shot_2018-12-26_at_5.03.11_pm

Adding the --verbose flag to the command, ccloud --verbose topic describe, displays low-level topic and cluster configuration details, as well as a log of all topic-related activities.

screen_shot_2018-12-26_at_5.07.20_pm

Kubernetes Resources

The deployment of the three storefront microservices to the dev Namespace will minimally require the following Kubernetes configuration resources.

  • (1) Kubernetes Namespace;
  • (3) Kubernetes Deployments;
  • (3) Kubernetes Services;
  • (1) Kubernetes ConfigMap;
  • (2) Kubernetes Secrets;
  • (1) Istio 1.0 Gateway;
  • (1) Istio 1.0 VirtualService;
  • (2) Istio 1.0 ServiceEntry;

The Istio networking.istio.io v1alpha3 API introduced the last three configuration resources in the list, to control traffic routing into, within, and out of the mesh. There are a total of four new io networking.istio.io v1alpha3 API routing resources: Gateway, VirtualService, DestinationRule, and ServiceEntry.

Creating and managing such a large number of resources is a common complaint regarding the complexity of Kubernetes. Imagine the resource sprawl when you have dozens of microservices replicated across several namespaces. Fortunately, all resource files for this post are included in the storefront-kafka-docker project’s gke directory.

To follow along with the demo, you will need to make minor modifications to a few of these resources, including the Istio Gateway, Istio VirtualService, two Istio ServiceEntry resources, and two Kubernetes Secret resources.

Istio Gateway & VirtualService

Both the Istio Gateway and VirtualService configuration resources are contained in a single file, istio-gateway.yaml. For the demo, I am using a personal domain, storefront-demo.com, along with the sub-domain, api.dev, to host the Storefront API. The domain’s primary A record (‘@’) and sub-domain A record are both associated with the external IP address on the frontend of the load balancer. In the file, this host is configured for the Gateway and VirtualService resources. You can choose to replace the host with your own domain, or simply remove the host block altogether on lines 13–14 and 21–22. Removing the host blocks, you would then use the external IP address on the frontend of the load balancer (explained later in the post) to access the Storefront API (gist).

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: storefront-gateway
spec:
selector:
istio: ingressgateway
servers:
port:
number: 80
name: http
protocol: HTTP
hosts:
api.dev.storefront-demo.com
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-dev
spec:
hosts:
api.dev.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.dev.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.dev.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.dev.svc.cluster.local

Istio ServiceEntry

There are two Istio ServiceEntry configuration resources. Both ServiceEntry resources control egress traffic from the Storefront API services, both of their ServiceEntry Location items are set to MESH_INTERNAL. The first ServiceEntry, mongodb-atlas-external-mesh.yaml, defines MongoDB Atlas cluster egress traffic from the Storefront API (gist).

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: mongdb-atlas-external-mesh
spec:
hosts:
<your_atlas_url.gcp.mongodb.net>
ports:
name: mongo
number: 27017
protocol: MONGO
location: MESH_EXTERNAL
resolution: NONE

The other ServiceEntry, confluent-cloud-external-mesh.yaml, defines Confluent Cloud Kafka cluster egress traffic from the Storefront API (gist).

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: confluent-cloud-external-mesh
spec:
hosts:
<your_cluster_url.us-central1.gcp.confluent.cloud>
ports:
name: kafka
number: 9092
protocol: TLS
location: MESH_EXTERNAL
resolution: NONE

Both need to have their host items replaced with the appropriate Atlas and Confluent URLs.

Inspecting Istio Resources

The easiest way to view Istio resources is from the command line using the istioctl and kubectl CLI tools.

istioctl get gateway
istioctl get virtualservices
istioctl get serviceentry
  
kubectl describe gateway
kubectl describe virtualservices
kubectl describe serviceentry

Multiple Namespaces

In this demo, we are only deploying to a single Kubernetes Namespace, dev. However, Istio will also support routing traffic to multiple namespaces. For example, a typical non-prod Kubernetes cluster might support devtest, and uat, each associated with a different sub-domain. One way to support multiple Namespaces with Istio 1.0 is to add each host to the Istio Gateway (lines 14–16, below), then create a separate Istio VirtualService for each Namespace. All the VirtualServices are associated with the single Gateway. In the VirtualService, each service’s host address is the fully qualified domain name (FQDN) of the service. Part of the FQDN is the Namespace, which we change for each for each VirtualService (gist).

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: storefront-gateway
spec:
selector:
istio: ingressgateway
servers:
port:
number: 80
name: http
protocol: HTTP
hosts:
api.dev.storefront-demo.com
api.test.storefront-demo.com
api.uat.storefront-demo.com
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-dev
spec:
hosts:
api.dev.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.dev.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.dev.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.dev.svc.cluster.local
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-test
spec:
hosts:
api.test.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.test.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.test.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.test.svc.cluster.local
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-uat
spec:
hosts:
api.uat.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.uat.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.uat.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.uat.svc.cluster.local

MongoDB Atlas Secret

There is one Kubernetes Secret for the sensitive MongoDB configuration and one Secret for the sensitive Confluent Cloud configuration. The Kubernetes Secret object type is intended to hold sensitive information, such as passwords, OAuth tokens, and SSH keys.

The mongodb-atlas-secret.yaml file contains the MongoDB Atlas cluster connection string, with the demo_user username and password, one for each of the storefront service’s databases (gist).

apiVersion: v1
kind: Secret
metadata:
name: mongodb-atlas
namespace: dev
type: Opaque
data:
mongodb.uri.accounts: your_base64_encoded_value
mongodb.uri.fulfillment: your_base64_encoded_value
mongodb.uri.orders: your_base64_encoded_value

Kubernetes Secrets are Base64 encoded. The easiest way to encode the secret values is using the Linux base64 program. The base64 program encodes and decodes Base64 data, as specified in RFC 4648. Pass each MongoDB URI string to the base64 program using echo -n.

MONGODB_URI=mongodb+srv://demo_user:your_password@your_cluster_address/accounts?retryWrites=true
echo -n $MONGODB_URI | base64

bW9uZ29kYitzcnY6Ly9kZW1vX3VzZXI6eW91cl9wYXNzd29yZEB5b3VyX2NsdXN0ZXJfYWRkcmVzcy9hY2NvdW50cz9yZXRyeVdyaXRlcz10cnVl

Repeat this process for the three MongoDB connection strings.

screen_shot_2018-12-26_at_2.15.21_pm

Confluent Cloud Secret

The confluent-cloud-kafka-secret.yaml file contains two data fields in the Secret’s data map, bootstrap.servers and sasl.jaas.config. These configuration items were both listed in the Client Config Java tab of the Confluent Cloud web-based management console, as shown previously. The sasl.jaas.config data field requires the Confluent Cloud cluster API Key and Secret you created earlier. Again, use the base64 encoding process for these two data fields (gist).

apiVersion: v1
kind: Secret
metadata:
name: confluent-cloud-kafka
namespace: dev
type: Opaque
data:
bootstrap.servers: your_base64_encoded_value
sasl.jaas.config: your_base64_encoded_value

Confluent Cloud ConfigMap

The remaining five Confluent Cloud Kafka cluster configuration values are not sensitive, and therefore, may be placed in a Kubernetes ConfigMapconfluent-cloud-kafka-configmap.yaml (gist).

apiVersion: v1
kind: ConfigMap
metadata:
name: confluent-cloud-kafka
data:
ssl.endpoint.identification.algorithm: "https"
sasl.mechanism: "PLAIN"
request.timeout.ms: "20000"
retry.backoff.ms: "500"
security.protocol: "SASL_SSL"

Accounts Deployment Resource

To see how the services consume the ConfigMap and Secret values, review the Accounts Deployment resource, shown below. Note the environment variables section, on lines 44–90, are a mix of hard-coded values and values referenced from the ConfigMap and two Secrets, shown above (gist).

apiVersion: v1
kind: Service
metadata:
name: accounts
labels:
app: accounts
spec:
ports:
name: http
port: 8080
selector:
app: accounts
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: accounts
labels:
app: accounts
spec:
replicas: 2
strategy:
type: Recreate
selector:
matchLabels:
app: accounts
template:
metadata:
labels:
app: accounts
annotations:
sidecar.istio.io/inject: "true"
spec:
containers:
name: accounts
image: garystafford/storefront-accounts:gke-2.2.0
resources:
requests:
memory: "250M"
cpu: "100m"
limits:
memory: "400M"
cpu: "250m"
env:
name: SPRING_PROFILES_ACTIVE
value: "gke"
name: SERVER_SERVLET_CONTEXT-PATH
value: "/accounts"
name: LOGGING_LEVEL_ROOT
value: "INFO"
name: SPRING_DATA_MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb-atlas
key: mongodb.uri.accounts
name: SPRING_KAFKA_BOOTSTRAP-SERVERS
valueFrom:
secretKeyRef:
name: confluent-cloud-kafka
key: bootstrap.servers
name: SPRING_KAFKA_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: ssl.endpoint.identification.algorithm
name: SPRING_KAFKA_PROPERTIES_SASL_MECHANISM
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: sasl.mechanism
name: SPRING_KAFKA_PROPERTIES_REQUEST_TIMEOUT_MS
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: request.timeout.ms
name: SPRING_KAFKA_PROPERTIES_RETRY_BACKOFF_MS
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: retry.backoff.ms
name: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG
valueFrom:
secretKeyRef:
name: confluent-cloud-kafka
key: sasl.jaas.config
name: SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: security.protocol
ports:
containerPort: 8080
imagePullPolicy: IfNotPresent

view raw

accounts.yaml

hosted with ❤ by GitHub

Modify Microservices for Confluent Cloud

As explained earlier, Confluent Cloud’s Kafka cluster requires some very specific configuration, based largely on the security features of Confluent Cloud. Connecting to Confluent Cloud requires some minor modifications to the existing storefront service source code. The changes are identical for all three services. To understand the service’s code, I suggest reviewing the previous post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1. Note the following changes are already made to the source code in the gke git branch, and not necessary for this demo.

The previous Kafka SenderConfig and ReceiverConfig Java classes have been converted to Java interfaces. There are four new SenderConfigConfluent, SenderConfigNonConfluent, ReceiverConfigConfluent, and ReceiverConfigNonConfluent classes, which implement one of the new interfaces. The new classes contain the Spring Boot Profile class-level annotation. One set of Sender and Receiver classes are assigned the @Profile("gke") annotation, and the others, the @Profile("!gke") annotation. When the services start, one of the two class implementations are is loaded, depending on the Active Spring Profile, gke or not gke. To understand the changes better, examine the Account service’s SenderConfigConfluent.java file (gist).

Line 20: Designates this class as belonging to the gke Spring Profile.

Line 23: The class now implements an interface.

Lines 25–44: Reference the Confluent Cloud Kafka cluster configuration. The values for these variables will come from the Kubernetes ConfigMap and Secret, described previously, when the services are deployed to GKE.

Lines 55–59: Additional properties that have been added to the Kafka Sender configuration properties, specifically for Confluent Cloud.

package com.storefront.config;
import com.storefront.kafka.Sender;
import com.storefront.model.CustomerChangeEvent;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Profile("gke")
@Configuration
@EnableKafka
public class SenderConfigConfluent implements SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.properties.ssl.endpoint.identification.algorithm}")
private String sslEndpointIdentificationAlgorithm;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.properties.request.timeout.ms}")
private String requestTimeoutMs;
@Value("${spring.kafka.properties.retry.backoff.ms}")
private String retryBackoffMs;
@Value("${spring.kafka.properties.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String saslJaasConfig;
@Override
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);
return props;
}
@Override
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Override
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Override
@Bean
public Sender sender() {
return new Sender();
}
}

Once code changes were completed and tested, the Docker Image for each service was rebuilt and uploaded to Docker Hub for public access. When recreating the images, the version of the Java Docker base image was upgraded from the previous post to Alpine OpenJDK 12 (openjdk:12-jdk-alpine).

Google Kubernetes Engine (GKE) with Istio

Having created the MongoDB Atlas and Confluent Cloud clusters, built the Kubernetes and Istio resources, modified the service’s source code, and pushed the new Docker Images to Docker Hub, the GKE cluster may now be built.

For the sake of brevity, we will manually create the cluster and deploy the resources, using the Google Cloud SDK gcloud and Kubernetes kubectl CLI tools, as opposed to automating with CI/CD tools, like Jenkins or Spinnaker. For this demonstration, I suggest a minimally-sized two-node GKE cluster using n1-standard-2 machine-type instances. The latest available release of Kubernetes on GKE at the time of this post was 1.11.5-gke.5 and Istio 1.03 (Istio on GKE still considered beta). Note Kubernetes and Istio are evolving rapidly, thus the configuration flags often change with newer versions. Check the GKE Clusters tab for the latest clusters create command format (gist).

#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Create non-prod Kubernetes cluster on GKE
# Constants – CHANGE ME!
readonly NAMESPACE='dev'
readonly PROJECT='gke-confluent-atlas'
readonly CLUSTER='storefront-api'
readonly REGION='us-central1'
readonly ZONE='us-central1-a'
# Create GKE cluster (time in foreground)
time \
gcloud beta container \
–project $PROJECT clusters create $CLUSTER \
–zone $ZONE \
–username "admin" \
–cluster-version "1.11.5-gke.5" \
–machine-type "n1-standard-2" \
–image-type "COS" \
–disk-type "pd-standard" \
–disk-size "100" \
–scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \
–num-nodes "2" \
–enable-stackdriver-kubernetes \
–enable-ip-alias \
–network "projects/$PROJECT/global/networks/default" \
–subnetwork "projects/$PROJECT/regions/$REGION/subnetworks/default" \
–default-max-pods-per-node "110" \
–addons HorizontalPodAutoscaling,HttpLoadBalancing,Istio \
–istio-config auth=MTLS_PERMISSIVE \
–issue-client-certificate \
–metadata disable-legacy-endpoints=true \
–enable-autoupgrade \
–enable-autorepair
# Get cluster creds
gcloud container clusters get-credentials $CLUSTER \
–zone $ZONE –project $PROJECT
kubectl config current-context
# Create dev Namespace
kubectl apply -f ./resources/other/namespaces.yaml
# Enable Istio automatic sidecar injection in Dev Namespace
kubectl label namespace $NAMESPACE istio-injection=enabled

Executing these commands successfully will build the cluster and the dev Namespace, into which all the resources will be deployed. The two-node cluster creation process takes about three minutes on average.

screen_shot_2018-12-26_at_2.00.56_pm

We can also observe the new GKE cluster from the GKE Clusters Details tab.

screen_shot_2018-12-26_at_2.18.32_pm

Creating the GKE cluster also creates several other GCP resources, including a TCP load balancer and three external IP addresses. Shown below in the VPC network External IP addresses tab, there is one IP address associated with each of the two GKE cluster’s VM instances, and one IP address associated with the frontend of the load balancer.

screen_shot_2018-12-26_at_2.59.38_pm

While the TCP load balancer’s frontend is associated with the external IP address, the load balancer’s backend is a target pool, containing the two GKE cluster node machine instances.

screen_shot_2018-12-26_at_2.58.42_pm

A forwarding rule associates the load balancer’s frontend IP address with the backend target pool. External requests to the frontend IP address will be routed to the GKE cluster. From there, requests will be routed by Kubernetes and Istio to the individual storefront service Pods, and through the Istio sidecar (Envoy) proxies. There is an Istio sidecar proxy deployed to each Storefront service Pod.

screen_shot_2018-12-26_at_2.59.59_pm

Below, we see the details of the load balancer’s target pool, containing the two GKE cluster’s VMs.

screen_shot_2018-12-26_at_3.57.03_pm.png

As shown at the start of the post, a simplified view of the GCP/GKE network routing looks as follows. For brevity, firewall rules and routes are not illustrated in the diagram.

ConfluentCloudRouting

Apply Kubernetes Resources

Again, using kubectl, deploy the three services and associated Kubernetes and Istio resources. Note the Istio Gateway and VirtualService(s) are not deployed to the dev Namespace since their role is to control ingress and route traffic to the dev Namespace and the services within it (gist).

#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Deploy Kubernetes/Istio resources
# Constants – CHANGE ME!
readonly NAMESPACE='dev'
readonly PROJECT='gke-confluent-atlas'
readonly CLUSTER='storefront-api'
readonly REGION='us-central1'
readonly ZONE='us-central1-a'
kubectl apply -f ./resources/other/istio-gateway.yaml
kubectl apply -n $NAMESPACE -f ./resources/other/mongodb-atlas-external-mesh.yaml
kubectl apply -n $NAMESPACE -f ./resources/other/confluent-cloud-external-mesh.yaml
kubectl apply -n $NAMESPACE -f ./resources/config/confluent-cloud-kafka-configmap.yaml
kubectl apply -f ./resources/config/mongodb-atlas-secret.yaml
kubectl apply -f ./resources/config/confluent-cloud-kafka-secret.yaml
kubectl apply -n $NAMESPACE -f ./resources/services/accounts.yaml
kubectl apply -n $NAMESPACE -f ./resources/services/fulfillment.yaml
kubectl apply -n $NAMESPACE -f ./resources/services/orders.yaml

Once these commands complete successfully, on the Workloads tab, we should observe two Pods of each of the three storefront service Kubernetes Deployments deployed to the dev Namespace, all six Pods with a Status of ‘OK’. A Deployment controller provides declarative updates for Pods and ReplicaSets.

screen_shot_2018-12-26_at_2.51.01_pm

On the Services tab, we should observe the three storefront service’s Kubernetes Services. A Service in Kubernetes is a REST object.

screen_shot_2018-12-26_at_2.51.16_pm

On the Configuration Tab, we should observe the Kubernetes ConfigMap and two Secrets also deployed to the dev Environment.

screen_shot_2018-12-26_at_2.51.36_pm

Below, we see the confluent-cloud-kafka ConfigMap resource with its data map of Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.54.51_pm

Below, we see the confluent-cloud-kafka Secret with its data map of sensitive Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.55.17_pm

Test the Storefront API

If you recall from part two of the previous post, there are a set of seven Storefront API endpoints that can be called to create sample data and test the API. The HTTP GET Requests hit each service, generate test data, populate the three MongoDB databases, and produce and consume Kafka messages across all three topics. Making these requests is the easiest way to confirm the Storefront API is working properly.

  1. Sample Customer: accounts/customers/sample
  2. Sample Orders: orders/customers/sample/orders
  3. Sample Fulfillment Requests: orders/customers/sample/fulfill
  4. Sample Processed Order Event: fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Event: fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Event: fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Event: fulfillment/fulfillment/sample/receive

Thee are a wide variety of tools to interact with the Storefront API. The project includes a simple Python script, sample_data.py, which will make HTTP GET requests to each of the above endpoints, after confirming their health, and return a success message.

screen_shot_2018-12-31_at_12.19.50_pm.png

Postman

Postman, my personal favorite, is also an excellent tool to explore the Storefront API resources. I have the above set of the HTTP GET requests saved in a Postman Collection. Using Postman, below, we see the response from an HTTP GET request to the /accounts/customers endpoint.

screen_shot_2018-12-26_at_5.48.34_pm

Postman also allows us to create integration tests and run Collections of Requests in batches using Postman’s Collection Runner. To test the Storefront API, below, I used Collection Runner to run a single series of integration tests, intended to confirm the API’s functionality, by checking for expected HTTP response codes and expected values in the response payloads. Postman also shows the response times from the Storefront API. Since this platform was not built to meet Production SLAs, measuring response times is less critical in the Development environment.

screen_shot_2018-12-26_at_5.47.57_pm

Google Stackdriver

If you recall, the GKE cluster had the Stackdriver Kubernetes option enabled, which gives us, amongst other observability features, access to all cluster, node, pod, and container logs. To confirm data is flowing to the MongoDB databases and Kafka topics, we can check the logs from any of the containers. Below we see the logs from the two Accounts Pod containers. Observe the AfterSaveListener handler firing on an onAfterSave event, which sends a CustomerChangeEvent payload to the accounts.customer.change Kafka topic, without error. These entries confirm that both Atlas and Confluent Cloud are reachable by the GKE-based workloads, and appear to be functioning properly.

screen_shot_2018-12-26_at_8.05.50_pm.png

MongoDB Atlas Collection View

Review the MongoDB Atlas Clusters Collections tab. In this Development environment, the MongoDB databases and collections are created the first time a service tries to connects to them. In Production, the databases would be created and secured in advance of deploying resources. Once the sample data requests are completed successfully, you should now observe the three Storefront API databases, each with collections of documents.

screen_shot_2018-12-26_at_4.56.25_pm

MongoDB Compass

In addition to the Atlas web-based management console, MongoDB Compass is an excellent desktop tool to explore and manage MongoDB databases. Compass is available for Mac, Linux, and Windows. One of the many great features of Compass is the ability to visualize collection schemas and interactively filter documents. Below we see the fulfillment.requests collection schema.

Screen Shot 2019-01-20 at 10.21.54 AM.png

Confluent Control Center

Confluent Control Center is a downloadable, web browser-based tool for managing and monitoring Apache Kafka, including your Confluent Cloud clusters. Confluent Control Center provides rich functionality for building and monitoring production data pipelines and streaming applications. Confluent offers a free 30-day trial of Confluent Control Center. Since the Control Center is provided at an additional fee, and I found difficult to configure for Confluent Cloud clusters based on Confluent’s documentation, I chose not to cover it in detail, for this post.

screen_shot_2018-12-23_at_10.21.41_pm

screen_shot_2018-12-23_at_10.48.49_pm

Tear Down Cluster

Delete your Confluent Cloud and MongoDB clusters using their web-based management consoles. To delete the GKE cluster and all deployed Kubernetes resources, use the cluster delete command. Also, double-check that the external IP addresses and load balancer, associated with the cluster, were also deleted as part of the cluster deletion (gist).

#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Tear down GKE cluster and associated resources
# Constants – CHANGE ME!
readonly PROJECT='gke-confluent-atlas'
readonly CLUSTER='storefront-api'
readonly REGION='us-central1'
readonly ZONE='us-central1-a'
# Delete GKE cluster (time in foreground)
time yes | gcloud beta container clusters delete $CLUSTER –zone $ZONE
# Confirm network resources are also deleted
gcloud compute forwarding-rules list
gcloud compute target-pools list
gcloud compute firewall-rules list
# In case target-pool associated with Cluster is not deleted
yes | gcloud compute target-pools delete \
$(gcloud compute target-pools list \
–filter="region:($REGION)" –project $PROJECT \
| awk 'NR==2 {print $1}')

Conclusion

In this post, we have seen how easy it is to integrate Cloud-based DBaaS and MaaS products with the managed Kubernetes services from GCP, AWS, and Azure. As this post demonstrated, leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications.

In future posts, we will revisit this Storefront API example, further demonstrating how to enable HTTPS (Securing Your Istio Ingress Gateway with HTTPS) and end-user authentication (Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , , ,

4 Comments

Integrating Search Capabilities with Actions for Google Assistant, using GKE and Elasticsearch: Part 2

Voice and text-based conversational interfaces, such as chatbots, have recently seen tremendous growth in popularity. Much of this growth can be attributed to leading Cloud providers, such as Google, Amazon, and Microsoft, who now provide affordable, end-to-end development, machine learning-based training, and hosting platforms for conversational interfaces.

Cloud-based machine learning services greatly improve a conversational interface’s ability to interpret user intent with greater accuracy. However, the ability to return relevant responses to user inquiries, also requires interfaces have access to rich informational datastores, and the ability to quickly and efficiently query and analyze that data.

In this two-part post, we will enhance the capabilities of a voice and text-based conversational interface by integrating it with a search and analytics engine. By interfacing an Action for Google Assistant conversational interface with Elasticsearch, we will improve the Action’s ability to provide relevant results to the end-user. Instead of querying a traditional database for static responses to user intent, our Action will access a  Near Real-time (NRT) Elasticsearch index of searchable documents. The Action will leverage Elasticsearch’s advanced search and analytics capabilities to optimize and shape user responses, based on their intent.

Action Preview

Here is a brief YouTube video preview of the final Action for Google Assistant, integrated with Elasticsearch, running on an Apple iPhone.

Architecture

If you recall from part one of this post, the high-level architecture of our search engine-enhanced Action for Google Assistant resembles the following. Most of the components are running on Google Cloud.

Google Search Assistant Diagram GCP

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for the Spring Boot Service and one for the Action for Google Assistant. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Development Process

In part two of this post, we will tie everything together by creating and integrating our Action for Google Assistant:

  • Create the new Actions for Google Assistant project using the Actions on Google console;
  • Develop the Action’s Intents and Entities using the Dialogflow console;
  • Develop, deploy, and test the Cloud Function to GCP;

Let’s explore each step in more detail.

New ‘Actions on Google’ Project

With Elasticsearch running and the Spring Boot Service deployed to our GKE cluster, we can start building our Actions for Google Assistant. Using the Actions on Google web console, we first create a new Actions project.

wp-search-021

The Directory Information tab is where we define metadata about the project. This information determines how it will look in the Actions directory and is required to publish your project. The Actions directory is where users discover published Actions on the web and mobile devices.

wp-search-019

The Directory Information tab also includes sample invocations, which may be used to invoke our Actions.

wp-search-020

Actions and Intents

Our project will contain a series of related Actions. According to Google, an Action is ‘an interaction you build for the Assistant that supports a specific intent and has a corresponding fulfillment that processes the intent.’ To build our Actions, we first want to create our Intents. To do so, we will want to switch from the Actions on Google console to the Dialogflow console. Actions on Google provides a link for switching to Dialogflow in the Actions tab.

wp-search-022

We will build our Action’s Intents in Dialogflow. The term Intent, used by Dialogflow, is standard terminology across other voice-assistant platforms, such as Amazon’s Alexa and Microsoft’s Azure Bot Service and LUIS. In Dialogflow, will be building Intents — the Find Multiple Posts Intent, Find Post Intent, Find By ID Intent, and so forth.

wp-search-023

Below, we see the Find Post Intent. The Find Post Intent is responsible for handling our user’s requests for a single post about a topic, for example, ‘Find a post about Docker.’ The Intent shown below contains a fair number, but indeed not an exhaustive list, of training phrases. These represent possible ways a user might express intent when invoking the Action.

wp-search-026

Below, we see the Find Multiple Posts Intent. The Find Multiple Posts Intent is responsible for handling our user’s requests for a list of posts about a topic, for example, ‘I’m interested in Docker.’ Similar to the Find Post Intent above, the Find Multiple Posts Intent contains a list of training phrases.

wp-search-025

Dialog Model Training

According to Google, the greater the number of natural language examples in the Training Phrases section of Intents, the better the classification accuracy. Every time a user interacts with our Action, the user’s utterances are logged. Using the Training tab in the Dialogflow console, we can train our model by reviewing and approving or correcting how the Action handled the user’s utterances.

Below we see the user’s utterances, part of an interaction with the Action. We have the option to review and approve the Intent that was called to handle the utterance, re-assign it, or delete it. This helps improve our accuracy of our dialog model.

wp-search-039.png

Dialogflow Entities

Each of the highlighted words in the training phrases maps to the facts parameter, which maps to a collection of @topic Entities. Entities represent a list of intents the Action is trained to understand.  According to Google, there are three types of entities: ‘system’ (defined by Dialogflow), ‘developer’ (defined by a developer), and ‘user’ (built for each individual end-user in every request) objects. We will be creating ‘developer’ type entities for our Action’s Intents.

wp-search-037.png

Automated Expansion

We do not have to define all possible topics a user might search for, as an entity.  By enabling the Allow Automated Expansion option, an Agent will recognize values that have not been explicitly listed in the entity list. Google describes Agents as NLU (Natural Language Understanding) modules.

wp-search-042.png

Entity Synonyms

An entity may contain synonyms. Multiple synonyms are mapped to a single reference value. The reference value is the value passed to the Cloud Function by the Action. For example, take the reference value of ‘GCP.’ The user might ask Google about ‘GCP’. However, the user might also substitute the words ‘Google Cloud’ or ‘Google Cloud Platform.’ Using synonyms, if the user utters any of these three synonymous words or phrase in their intent, the reference value, ‘GCP’, is passed in the request.

But, what if the post contains the phrase, ‘Google Cloud Platform’ more frequently than, or instead of, ‘GCP’? If the acronym, ‘GCP’, is defined as the entity reference value, then it is the value passed to the function, even if you ask for ‘Google Cloud Platform’. In the use case of searching blog posts by topic, entity synonyms are not an effective search strategy.

Elasticsearch Synonyms

A better way to solve for synonyms is by using the synonyms feature of Elasticsearch. Take, for example, the topic of ‘Istio’, Istio is also considered a Service Mesh. If I ask for posts about ‘Service Mesh’, I would like to get back posts that contain the phrase ‘Service Mesh’, but also the word ‘Istio’. To accomplish this, you would define an association between ‘Istio’ and ‘Service Mesh’, as part of the Elasticsearch WordPress posts index.

wp-search-041d

Searches for ‘Istio’ against that index would return results that contain ‘Istio’ and/or contain ‘Service Mesh’; the reverse is also true. Having created and applied a custom synonyms filter to the index, we see how Elasticsearch responds to an analysis of the natural language style phrase, ‘What is a Service Mesh?’. As shown by the tokens output in Kibana’s Dev Tools Console, Elasticsearch understands that ‘service mesh’ is synonymous with ‘istio’.

wp-search-041g

If we query the same five fields as our Action, for the topic of ‘service mesh’, we get four hits for posts (indexed documents) that contain ‘service mesh’ and/or ‘istio’.

wp-search-041c

Actions on Google Integration

Another configuration item in Dialogflow that needs to be completed is the Dialogflow’s Actions on Google integration. This will integrate our Action with Google Assistant. Google currently provides more than fifteen different integrations, including Google Assistant, Slack, Facebook Messanger, Twitter, and Twilio, as shown below.

wp-search-028

To configure the Google Assistant integration, choose the Welcome Intent as our Action’s Explicit Invocation intent. Then we designate our other Intents as Implicit Invocation intents. According to Google, this Google Assistant Integration allows our Action to reach users on every device where the Google Assistant is available.

wp-search-029

Action Fulfillment

When a user’s intent is received, it is fulfilled by the Action. In the Dialogflow Fulfillment console, we see the Action has two fulfillment options, a Webhook or an inline-editable Cloud Function, edited inline. A Webhook allows us to pass information from a matched intent into a web service and get a result back from the service. Our Action’s Webhook will call our Cloud Function on GCP, using the Cloud Function’s URL endpoint (we’ll get this URL in the next section).

wp-search-030

Google Cloud Functions

Our Cloud Function, called by our Action, is written in Node.js. Our function, index.js, is divided into four sections, which are: constants and environment variables, intent handlers, helper functions, and the function’s entry point. The helper functions are part of the Helper module, contained in the helper.js file.

Constants and Environment Variables

The section, in both index.js and helper.js, defines the global constants and environment variables used within the function. Values that reference environment variables, such as SEARCH_API_HOSTNAME are defined in the .env.yaml file. All environment variables in the .env.yaml file will be set during the Cloud Function’s deployment, described later in this post. Environment variables were recently released, and are still considered beta functionality (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
'use strict';
/* CONSTANTS AND GLOBAL VARIABLES */
const Helper = require('./helper');
let helper = new Helper();
const {
dialogflow,
Button,
Suggestions,
BasicCard,
SimpleResponse,
List
} = require('actions-on-google');
const functions = require('firebase-functions');
const app = dialogflow({debug: true});
app.middleware(conv => {
conv.hasScreen =
conv.surface.capabilities.has('actions.capability.SCREEN_OUTPUT');
conv.hasAudioPlayback =
conv.surface.capabilities.has('actions.capability.AUDIO_OUTPUT');
});
const SUGGESTION_1 = 'tell me about Docker';
const SUGGESTION_2 = 'help';
const SUGGESTION_3 = 'cancel';

The npm module dependencies declared in this section are defined in the dependencies section of the package.json file. Function dependencies include Actions on Google, Firebase Functions, Winston, and Request (gist).

{
"name": "functionBlogSearchAction",
"description": "Programmatic Ponderings Search Action for Google Assistant",
"version": "1.0.0",
"private": true,
"license": "MIT License",
"author": "Gary A. Stafford",
"engines": {
"node": ">=8"
},
"scripts": {
"deploy": "sh ./deploy-cloud-function.sh"
},
"dependencies": {
"@google-cloud/logging-winston": "^0.9.0",
"actions-on-google": "^2.2.0",
"dialogflow": "^0.6.0",
"dialogflow-fulfillment": "^0.5.0",
"firebase-admin": "^6.0.0",
"firebase-functions": "^2.0.2",
"request": "^2.88.0",
"request-promise-native": "^1.0.5",
"winston": "2.4.4"
}
}
view raw package.json hosted with ❤ by GitHub

Intent Handlers

The intent handlers in this section correspond to the intents in the Dialogflow console. Each handler responds with a SimpleResponse, BasicCard, and Suggestion Chip response types, or  Simple Response, List, and Suggestion Chip response types. These response types were covered in part one of this post. (gist).

/* INTENT HANDLERS */
app.intent('Welcome Intent', conv => {
const WELCOME_TEXT_SHORT = 'What topic are you interested in reading about?';
const WELCOME_TEXT_LONG = `You can say things like: \n` +
` _'Find a post about GCP'_ \n` +
` _'I'd like to read about Kubernetes'_ \n` +
` _'I'm interested in Docker'_`;
conv.ask(new SimpleResponse({
speech: WELCOME_TEXT_SHORT,
text: WELCOME_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: WELCOME_TEXT_LONG,
title: 'Programmatic Ponderings Search',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Fallback Intent', conv => {
const FACTS_LIST = "Kubernetes, Docker, Cloud, DevOps, AWS, Spring, Azure, Messaging, and GCP";
const HELP_TEXT_SHORT = 'Need a little help?';
const HELP_TEXT_LONG = `Some popular topics include: ${FACTS_LIST}.`;
conv.ask(new SimpleResponse({
speech: HELP_TEXT_LONG,
text: HELP_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: HELP_TEXT_LONG,
title: 'Programmatic Ponderings Search Help',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Post Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let posts = await helper.getPostsByTopic(postTopic, 1);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
let post = posts[0];
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `The top result for '${postTopic}' is the post, '${post.post_title}', published ${formattedDate}, with a relevance score of ${post._score.toFixed(2)}`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate} \nScore: ${post._score.toFixed(2)}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Multiple Posts Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let postCount = 6;
let posts = await helper.getPostsByTopic(postTopic, postCount);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
const POST_SPOKEN = `Here's a list of the top ${posts.length} posts about '${postTopic}'`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
}));
let itemsArray = {};
posts.forEach(function (post) {
itemsArray[post.ID] = {
title: `Post ID ${post.ID}`,
description: `${post.post_title.substring(0,80)}... \nScore: ${post._score.toFixed(2)}`,
};
});
if (conv.hasScreen) {
conv.ask(new List({
title: 'Top Results',
items: itemsArray
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find By ID Intent', async (conv, {topic}) => {
let postId = topic.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Okay, I found that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Option Intent', async (conv, params, option) => {
let postId = option.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Sure, here's that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});

The Welcome Intent handler handles explicit invocations of our Action. The Fallback Intent handler handles both help requests, as well as cases when Dialogflow is unable to handle the user’s request.

As described above in the Dialogflow section, the Find Post Intent handler is responsible for handling our user’s requests for a single post about a topic. For example, ‘Find a post about Docker’. To fulfill the user request, the Find Post Intent handler, calls the Helper module’s getPostByTopic function, passing the topic requested and specifying a result set size of one post with the highest relevance score higher than an arbitrary value of  1.0.

Similarly, the Find Multiple Posts Intent handler is responsible for handling our user’s requests for a list of posts about a topic; for example, ‘I’m interested in Docker’. To fulfill the user request, the Find Multiple Posts Intent handler, calls the Helper module’s getPostsByTopic function, passing the topic requested and specifying a result set size of a maximum of six posts with the highest relevance scores greater than 1.0

The Find By ID Intent handler is responsible for handling our user’s requests for a specific, unique posts ID; for example, ‘Post ID 22141’. To fulfill the user request, the Find By ID Intent handler, calls the Helper module’s getPostById function, passing the unique Post ID (gist).

/* INTENT HANDLERS */
app.intent('Welcome Intent', conv => {
const WELCOME_TEXT_SHORT = 'What topic are you interested in reading about?';
const WELCOME_TEXT_LONG = `You can say things like: \n` +
` _'Find a post about GCP'_ \n` +
` _'I'd like to read about Kubernetes'_ \n` +
` _'I'm interested in Docker'_`;
conv.ask(new SimpleResponse({
speech: WELCOME_TEXT_SHORT,
text: WELCOME_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: WELCOME_TEXT_LONG,
title: 'Programmatic Ponderings Search',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Fallback Intent', conv => {
const FACTS_LIST = "Kubernetes, Docker, Cloud, DevOps, AWS, Spring, Azure, Messaging, and GCP";
const HELP_TEXT_SHORT = 'Need a little help?';
const HELP_TEXT_LONG = `Some popular topics include: ${FACTS_LIST}.`;
conv.ask(new SimpleResponse({
speech: HELP_TEXT_LONG,
text: HELP_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: HELP_TEXT_LONG,
title: 'Programmatic Ponderings Search Help',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Post Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let posts = await helper.getPostsByTopic(postTopic, 1);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
let post = posts[0];
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `The top result for '${postTopic}' is the post, '${post.post_title}', published ${formattedDate}, with a relevance score of ${post._score.toFixed(2)}`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate} \nScore: ${post._score.toFixed(2)}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Multiple Posts Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let postCount = 6;
let posts = await helper.getPostsByTopic(postTopic, postCount);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
const POST_SPOKEN = `Here's a list of the top ${posts.length} posts about '${postTopic}'`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
}));
let itemsArray = {};
posts.forEach(function (post) {
itemsArray[post.ID] = {
title: `Post ID ${post.ID}`,
description: `${post.post_title.substring(0,80)}... \nScore: ${post._score.toFixed(2)}`,
};
});
if (conv.hasScreen) {
conv.ask(new List({
title: 'Top Results',
items: itemsArray
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find By ID Intent', async (conv, {topic}) => {
let postId = topic.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Okay, I found that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Option Intent', async (conv, params, option) => {
let postId = option.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Sure, here's that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});

Entry Point

The entry point creates a way to handle the communication with Dialogflow’s fulfillment API (gist).

/* ENTRY POINT */
exports.functionBlogSearchAction = functions.https.onRequest(app);

Helper Functions

The helper functions are part of the Helper module, contained in the helper.js file. In addition to typical utility functions like formatting dates, there are two functions, which interface with Elasticsearch, via our Spring Boot API, getPostsByTopic and getPostById. As described above, the intent handlers call one of these functions to obtain search results from Elasticsearch.

The getPostsByTopic function handles both the Find Post Intent handler and Find Multiple Posts Intent handler, described above. The only difference in the two calls is the size of the response set, either one result or six results maximum (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
'use strict';
/* CONSTANTS AND GLOBAL VARIABLES */
const {
dialogflow,
BasicCard,
SimpleResponse,
} = require('actions-on-google');
const app = dialogflow({debug: true});
app.middleware(conv => {
conv.hasScreen =
conv.surface.capabilities.has('actions.capability.SCREEN_OUTPUT');
conv.hasAudioPlayback =
conv.surface.capabilities.has('actions.capability.AUDIO_OUTPUT');
});
const SEARCH_API_HOSTNAME = process.env.SEARCH_API_HOSTNAME;
const SEARCH_API_PORT = process.env.SEARCH_API_PORT;
const SEARCH_API_ENDPOINT = process.env.SEARCH_API_ENDPOINT;
const rpn = require('request-promise-native');
const winston = require('winston');
const Logger = winston.Logger;
const Console = winston.transports.Console;
const {LoggingWinston} = require('@google-cloud/logging-winston');
const loggingWinston = new LoggingWinston();
const logger = new Logger({
level: 'info', // log at 'info' and above
transports: [
new Console(),
loggingWinston,
],
});
/* HELPER FUNCTIONS */
module.exports = class Helper {
/**
* Returns an collection of ElasticsearchPosts objects based on a topic
* @param postTopic topic to search for
* @param responseSize
* @returns {Promise<any>}
*/
getPostsByTopic(postTopic, responseSize = 1) {
return new Promise((resolve, reject) => {
const SEARCH_API_RESOURCE = `dismax-search?value=${postTopic}&start=0&size=${responseSize}&minScore=1`;
const SEARCH_API_URL = `http://${SEARCH_API_HOSTNAME}:${SEARCH_API_PORT}/${SEARCH_API_ENDPOINT}/${SEARCH_API_RESOURCE}`;
logger.info(`getPostsByTopic API URL: ${SEARCH_API_URL}`);
let options = {
uri: SEARCH_API_URL,
json: true
};
rpn(options)
.then(function (posts) {
posts = posts.ElasticsearchPosts;
logger.info(`getPostsByTopic Posts: ${JSON.stringify(posts)}`);
resolve(posts);
})
.catch(function (err) {
logger.error(`Error: ${err}`);
reject(err)
});
});
}
// truncated for brevity
};
view raw helper-1.js hosted with ❤ by GitHub

Both functions use the request and request-promise-native npm modules to call the Spring Boot service’s RESTful API over HTTP. However, instead of returning a callback, the request-promise-native module allows us to return a native ES6 Promise. By returning a promise, we can use async/await with our Intent handlers. Using async/await with Promises is a newer way of handling asynchronous operations in Node.js. The asynchronous programming model, using promises, is described in greater detail in my previous post, Building Serverless Actions for Google Assistant with Google Cloud Functions, Cloud Datastore, and Cloud Storage.

ThegetPostById function handles both the Find By ID Intent handler and Option Intent handler, described above. This function is similar to the getPostsByTopic function, calling a Spring Boot service’s RESTful API endpoint and passing the Post ID (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// truncated for brevity
module.exports = class Helper {
/**
* Returns a single result based in the Post ID
* @param postId ID of the Post to search for
* @returns {Promise<any>}
*/
getPostById(postId) {
return new Promise((resolve, reject) => {
const SEARCH_API_RESOURCE = `${postId}`;
const SEARCH_API_URL = `http://${SEARCH_API_HOSTNAME}:${SEARCH_API_PORT}/${SEARCH_API_ENDPOINT}/${SEARCH_API_RESOURCE}`;
logger.info(`getPostById API URL: ${SEARCH_API_URL}`);
let options = {
uri: SEARCH_API_URL,
json: true
};
rpn(options)
.then(function (post) {
post = post.ElasticsearchPosts;
logger.info(`getPostById Post: ${JSON.stringify(post)}`);
resolve(post);
})
.catch(function (err) {
logger.error(`Error: ${err}`);
reject(err)
});
});
}
// truncated for brevity
};
view raw helper-2.js hosted with ❤ by GitHub

Cloud Function Deployment

To deploy the Cloud Function to GCP, use the gcloud CLI with the beta version of the functions deploy command. According to Google, gcloud is a part of the Google Cloud SDK. You must download and install the SDK on your system and initialize it before you can use gcloud. Currently, Cloud Functions are only available in four regions. I have included a shell scriptdeploy-cloud-function.sh, to make this step easier. It is called using the npm run deploy function. (gist).

#!/usr/bin/env sh
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
set -ex
# Set constants
REGION="us-east1"
FUNCTION_NAME="<your_function_name>"
# Deploy the Google Cloud Function
gcloud beta functions deploy ${FUNCTION_NAME} \
--runtime nodejs8 \
--region ${REGION} \
--trigger-http \
--memory 256MB \
--env-vars-file .env.yaml

The creation or update of the Cloud Function can take up to two minutes. Note the output indicates the environment variables, contained in the .env.yaml file, have been deployed. The URL endpoint of the function and the function’s entry point are also both output.

wp-search-031.png

If you recall, the URL endpoint of the Cloud Function is required in the Dialogflow Fulfillment tab. The URL can be retrieved from the deployment output (shown above). The Cloud Function is now deployed and will be called by the Action when a user invokes the Action.

What is Deployed

The .gcloudignore file is created the first time you deploy a new function. Using the the .gcloudignore file, you limit the files deployed to GCP. For this post, of all the files in the project, only four files, index.js, helper.js, package.js, and the PNG file used in the Action’s responses, need to be deployed. All other project files are ear-marked in the .gcloudignore file to avoid being deployed.

wp-search-038.png

Simulation Testing and Debugging

With our Action and all its dependencies deployed and configured, we can test the Action using the Simulation console on Actions on Google. According to Google, the Action Simulation console allows us to manually test our Action by simulating a variety of Google-enabled hardware devices and their settings.

Below, in the Simulation console, we see the successful display of our Programmatic Ponderings Search Action for Google Assistant containing the expected Simple Response, List, and Suggestion Chips response types, triggered by a user’s invocation of the Action.

wp-search-035

The simulated response indicates that the Google Cloud Function was called, and it responded successfully. That also indicates the Dialogflow-based Action successfully communicated with the Cloud Function, the Cloud Function successfully communicated with the Spring Boot service instances running on Google Kubernetes Engine, and finally, the Spring Boot services successfully communicated with Elasticsearch running on Google Compute Engine.

If we had issues with the testing, the Action Simulation console also contains tabs containing the request and response objects sent to and from the Cloud Function, the audio response, a debug console, any errors, and access to the logs.

Stackdriver Logging

In the log output below, from our Cloud Function, we see our Cloud Function’s activities. These activities including information log entries, which we explicitly defined in our Cloud Function using the winston and @google-cloud/logging-winston npm modules. According to Google, the author of the module, Stackdriver Logging for Winston provides an easy to use, higher-level layer (transport) for working with Stackdriver Logging, compatible with Winston. Developing an effective logging strategy is essential to maintaining and troubleshooting your code in Development, as well as Production.

wp-search-036

Conclusion

In this two-part post, we observed how the capabilities of a voice and text-based conversational interface, such as an Action for Google Assistant, may be enhanced through integration with a search and analytics engine, such as Elasticsearch. This post barely scraped the surface of what could be achieved with such an integration. Elasticsearch, as well as other leading Lucene-based search and analytics engines, such as Apache Solr, have tremendous capabilities, which are easily integrated to machine learning-based conversational interfaces, resulting in a more powerful and a more intuitive end-user experience.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, or Google.

, , , , , , , , , , , , ,

3 Comments

Integrating Search Capabilities with Actions for Google Assistant, using GKE and Elasticsearch: Part 1

Voice and text-based conversational interfaces, such as chatbots, have recently seen tremendous growth in popularity. Much of this growth can be attributed to leading Cloud providers, such as Google, Amazon, and Microsoft, who now provide affordable, end-to-end development, machine learning-based training, and hosting platforms for conversational interfaces.

Cloud-based machine learning services greatly improve a conversational interface’s ability to interpret user intent with greater accuracy. However, the ability to return relevant responses to user inquiries, also requires interfaces have access to rich informational datastores, and the ability to quickly and efficiently query and analyze that data.

In this two-part post, we will enhance the capabilities of a voice and text-based conversational interface by integrating it with a search and analytics engine. By interfacing an Action for Google Assistant conversational interface with Elasticsearch, we will improve the Action’s ability to provide relevant results to the end-user. Instead of querying a traditional database for static responses to user intent, our Action will access a  Near Realtime (NRT) Elasticsearch index of searchable documents. The Action will leverage Elasticsearch’s advanced search and analytics capabilities to optimize and shape user responses, based on their intent.

Action Preview

Here is a brief YouTube video preview of the final Action for Google Assistant, integrated with Elasticsearch, running on an Apple iPhone.

Google Technologies

The high-level architecture of our search engine-enhanced Action for Google Assistant will look as follows.

Google Search Assistant Diagram GCP

Here is a brief overview of the key technologies we will incorporate into our architecture.

Actions on Google

According to Google, Actions on Google is the platform for developers to extend the Google Assistant. Actions on Google is a web-based platform that provides a streamlined user-experience to create, manage, and deploy Actions. We will use the Actions on Google platform to develop our Action in this post.

Dialogflow

According to Google, Dialogflow is an enterprise-grade NLU platform that makes it easy for developers to design and integrate conversational user interfaces into mobile apps, web applications, devices, and bots. Dialogflow is powered by Google’s machine learning for Natural Language Processing (NLP).

Google Cloud Functions

Google Cloud Functions are part of Google’s event-driven, serverless compute platform, part of the Google Cloud Platform (GCP). Google Cloud Functions are analogous to Amazon’s AWS Lambda and Azure Functions. Features include automatic scaling, high availability, fault tolerance, no servers to provision, manage, patch or update, and a payment model based on the function’s execution time.

Google Kubernetes Engine

Kubernetes Engine is a managed, production-ready environment, available on GCP, for deploying containerized applications. According to Google, Kubernetes Engine is a reliable, efficient, and secure way to run Kubernetes clusters in the Cloud.

Elasticsearch

Elasticsearch is a leading, distributed, RESTful search and analytics engine. Elasticsearch is a product of Elastic, the company behind the Elastic Stack, which includes Elasticsearch, Kibana, Beats, Logstash, X-Pack, and Elastic Cloud. Elasticsearch provides a distributed, multitenant-capable, full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is similar to Apache Solr in terms of features and functionality. Both Solr and Elasticsearch is based on Apache Lucene.

Other Technologies

In addition to the major technologies highlighted above, the project also relies on the following:

  • Google Container Registry – As an alternative to Docker Hub, we will store the Spring Boot API service’s Docker Image in Google Container Registry, making deployment to GKE a breeze.
  • Google Cloud Deployment Manager – Google Cloud Deployment Manager allows users to specify all the resources needed for application in a declarative format using YAML. The Elastic Stack will be deployed with Deployment Manager.
  • Google Compute Engine – Google Compute Engine delivers scalable, high-performance virtual machines (VMs) running in Google’s data centers, on their worldwide fiber network.
  • Google Stackdriver – Stackdriver aggregates metrics, logs, and events from our Cloud-based project infrastructure, for troubleshooting.  We are also integrating Stackdriver Logging for Winston into our Cloud Function for fast application feedback.
  • Google Cloud DNS – Hosts the primary project domain and subdomains for the search engine and API. Google Cloud DNS is a scalable, reliable and managed authoritative Domain Name System (DNS) service running on the same infrastructure as Google.
  • Google VPC Network FirewallFirewall rules provide fine-grain, secure access controls to our API and search engine. We will several firewall port openings to talk to the Elastic Stack.
  • Spring Boot – Pivotal’s Spring Boot project makes it easy to create stand-alone, production-grade Spring-based Java applications, such as our Spring Boot service.
  • Spring Data Elasticsearch – Pivotal Software’s Spring Data Elasticsearch project provides easy integration to Elasticsearch from our Java-based Spring Boot service.

Demonstration

To demonstrate an Action for Google Assistant with search engine integration, we need an index of content to search. In this post, we will build an informational Action, the Programmatic Ponderings Search Action, that responds to a user’s interests in certain technical topics, by returning post suggestions from the Programmatic Ponderings blog. For this demonstration, I have indexed the last two years worth of blog posts into Elasticsearch, using the ElasticPress WordPress plugin.

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for the Spring Boot Service and one for the Action for Google Assistant. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Development Process

This post will focus on the development and integration of the Action for Google Assistant with Elasticsearch, via a Google Cloud Function, Kubernetes Engine, and the Spring Boot API service. The post is not intended to be a general how-to on developing for Actions for Google Assistant, Google Cloud Platform, Elasticsearch, or WordPress.

Building and integrating the Action will involve the following steps:

  • Design the Action’s conversation model;
  • Provision the Elastic Stack on Google Compute Engine using Deployment Manager;
  • Create an Elasticsearch index of blog posts;
  • Provision the Kubernetes cluster on GCP with GKE;
  • Develop and deploy the Spring Boot API service to Kubernetes;

Covered in Part Two of the Post:

  • Create a new Actions project using the Actions on Google;
  • Develop the Action’s Intents using the Dialogflow;
  • Develop, deploy, and test the Cloud Function to GCP;

Let’s explore each step in more detail.

Conversational Model

The conversational model design of the Programmatic Ponderings Search Action for Google Assistant will have the option to invoke the Action in two ways, with or without intent. Below on the left, we see an example of an invocation of the Action – ‘Talk to Programmatic Ponderings’. Google Assistant then responds to the user for more information (intent) – ‘What topic are you interested in reading about?’.

sample-dialog-1.png

Below on the left, we see an invocation of the Action, which includes the intent – ‘Ask Programmatic Ponderings to find a post about Kubernetes’. Google Assistant will respond directly, both verbally and visually with the most relevant post.

sample-dialog-2

When a user requests a single result, for example, ‘Find a post about Docker’, Google Assistant will include Simple ResponseBasic Card, and Suggestion Chip response types for devices with a display. This is shown in the center, above. The user may continue to ask for additional facts or choose to cancel the Action at any time.

When a user requests multiple results, for example, ‘I’m interested in Docker’, Google Assistant will include Simple ResponseList, and Suggestion Chip response types for devices with a display. An example of a List Response is shown in the center of the previous set of screengrabs, above. The user will receive up to six results in the list, with a relevance score of 1.0 or greater. The user may choose to click on any of the post results in the list, which will initiate a new search using the post’s unique ID, as shown on the right, in the first set of screengrabs, above.

The conversational model also understands a request for help and to cancel the interaction.

GCP Account and Project

The following steps assume you have an existing GCP account and you have created a project on GCP to house the Cloud Function, GKE Cluster, and Elastic Stack on Google Compute Engine. The post also assumes that you have the latest Google Cloud SDK installed on your development machine, and have authenticated your identity from the command line (gist).

# Authenticate with the Google Cloud SDK
export PROJECT_ID="<your_project_id>"
gcloud beta auth login
gcloud config set project ${PROJECT_ID}
# Update components or new runtime nodejs8 may be unknown
gcloud components update

Elasticsearch on GCP

There are a number of options available to host Elasticsearch. Elastic, the company behind Elasticsearch, offers the Elasticsearch Service, a fully managed, scalable, and reliable service on AWS and GCP. AWS also offers their own managed Elasticsearch Service. I found some limitations with AWS’ Elasticsearch Service, which made integration with Spring Data Elasticsearch difficult. According to AWS, the service supports HTTP but does not support TCP transport.

For this post, we will stand up the Elastic Stack on GCP using an offering from the Google Cloud Platform Marketplace. A well-known provider of packaged applications for multiple Cloud platforms, Bitnami, offers the ELK Stack (the previous name for the Elastic Stack), running on Google Compute Engine.

wp-search-004.png

GCP Marketplace Solutions are deployed using the Google Cloud Deployment Manager.  The Bitnami ELK solution is a complete stack with all the necessary software and software-defined Cloud infrastructure to securely run Elasticsearch. You select the instance’s zone(s), machine type, boot disk size, and security and networking configurations. Using that configuration, the Deployment Manager will deploy the solution and provide you with information and credentials for accessing the Elastic Stack. For this demo, we will configure a minimally-sized, single VM instance to run the Elastic Stack.

wp-search-005.png

Below we see the Bitnami ELK stack’s components being created on GCP, by the Deployment Manager.

wp-search-006.png

Indexed Content

With the Elastic Stack fully provisioned, I then configured WordPress to index the last two years of the Programmatic Pondering blog posts to Elasticsearch on GCP. If you want to follow along with this post and content to index, there is plenty of open source and public domain indexable content available on the Internet – books, movie lists, government and weather data, online catalogs of products, and so forth. Anything in a document database is directly indexable in Elasticsearch. Elastic even provides a set of index samples, available on their GitHub site.

wp-search-009

Firewall Ports for Elasticseach

The Deployment Manager opens up firewall ports 80 and 443. To index the WordPress posts, I also had to open port 9200. According to Elastic, Elasticsearch uses port 9200 for communicating with their RESTful API with JSON over HTTP. For security, I locked down this firewall opening to my WordPress server’s address as the source. (gist).

SOURCE_IP=<wordpress_ip_address>
PORT=9200
gcloud compute \
--project=wp-search-bot \
firewall-rules create elk-1-tcp-${PORT} \
--description=elk-1-tcp-${PORT} \
--direction=INGRESS \
--priority=1000 \
--network=default \
--action=ALLOW \
--rules=tcp:${PORT} \
--source-ranges=${SOURCE_IP} \
--target-tags=elk-1-tcp-${PORT}

The two existing firewall rules for port opening 80 and 443 should also be locked down to your own IP address as the source. Common Elasticsearch ports are constantly scanned by Hackers, who will quickly hijack your Elasticsearch contents and hold them for ransom, in addition to deleting your indexes. Similar tactics are used on well-known and unprotected ports for many platforms, including Redis, MySQL, PostgreSQL, MongoDB, and Microsoft SQL Server.

Kibana

Once the posts are indexed, the best way to view the resulting Elasticsearch documents is through Kibana, which is included as part of the Bitnami solution. Below we see approximately thirty posts, spread out across two years.

wp-search-010.png

Each Elasticsearch document, representing an indexed WordPress blog post, contains over 125 fields of information. Fields include a unique post ID, post title, content, publish date, excerpt, author, URL, and so forth. All these fields are exposed through Elasticsearch’s API, and as we will see,  will be available to our Spring Boot service to query.

wp-search-011.png

Spring Boot Service

To ensure decoupling between the Action for Google Assistant and Elasticsearch, we will expose a RESTful search API, written in Java using Spring Boot and Spring Data Elasticsearch. The API will expose a tailored set of flexible endpoints to the Action. Google’s machine learning services will ensure our conversational model is trained to understand user intent. The API’s query algorithm and Elasticsearch’s rich Lucene-based search features will ensure the most relevant results are returned. We will host the Spring Boot service on Google Kubernetes Engine (GKE).

Will use a Spring Rest Controller to expose our RESTful web service’s resources to our Action’s Cloud Function. The current Spring Boot service contains five /elastic resource endpoints exposed by the ElasticsearchPostController class . Of those five, two endpoints will be called by our Action in this demo, the /{id} and the /dismax-search endpoints. The endpoints can be seen using the Swagger UI. Our Spring Boot service implements SpringFox, which has the option to expose the Swagger interactive API UI.

wp-search-017.png

The /{id} endpoint accepts a unique post ID as a path variable in the API call and returns a single ElasticsearchPost object wrapped in a Map object, and serialized to a  JSON payload (gist).

@RequestMapping(value = "/{id}")
@ApiOperation(value = "Returns a post by id")
public Map<String, Optional<ElasticsearchPost>> findById(@PathVariable("id") long id) {
Optional<ElasticsearchPost> elasticsearchPost = elasticsearchPostRepository.findById(id);
Map<String, Optional<ElasticsearchPost>> elasticsearchPostMap = new HashMap<>();
elasticsearchPostMap.put("ElasticsearchPosts", elasticsearchPost);
return elasticsearchPostMap;
}

Below we see an example response from the Spring Boot service to an API call to the /{id} endpoint, for post ID 22141. Since we are returning a single post, based on ID, the relevance score will always be 0.0 (gist).

# http http://api.chatbotzlabs.com/blog/api/v1/elastic/22141
HTTP/1.1 200
Content-Type: application/json;charset=UTF-8
Date: Mon, 17 Sep 2018 23:15:01 GMT
Transfer-Encoding: chunked
{
"ElasticsearchPosts": {
"ID": 22141,
"_score": 0.0,
"guid": "https://programmaticponderings.com/?p=22141&quot;,
"post_date": "2018-04-13 12:45:19",
"post_excerpt": "Learn to manage distributed applications, spanning multiple Kubernetes environments, using Istio on GKE.",
"post_title": "Managing Applications Across Multiple Kubernetes Environments with Istio: Part 1"
}
}

This controller’s /{id} endpoint relies on a method exposed by the ElasticsearchPostRepository interface. The ElasticsearchPostRepository is a Spring Data Repository , which extends ElasticsearchRepository. The repository exposes the findById() method, which returns a single instance of the type, ElasticsearchPost, from Elasticsearch (gist).

package com.example.elasticsearch.repository;
import com.example.elasticsearch.model.ElasticsearchPost;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
public interface ElasticsearchPostRepository extends ElasticsearchRepository<ElasticsearchPost, Long> {
}

The ElasticsearchPost class is annotated as an Elasticsearch Document, similar to other Spring Data Document annotations, such as Spring Data MongoDB. The ElasticsearchPost class is instantiated to hold deserialized JSON documents stored in ElasticSeach stores indexed data (gist).

package com.example.elasticsearch.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import java.io.Serializable;
@JsonIgnoreProperties(ignoreUnknown = true)
@Document(indexName = "<elasticsearch_index_name>", type = "post")
public class ElasticsearchPost implements Serializable {
@Id
@JsonProperty("ID")
private long id;
@JsonProperty("_score")
private float score;
@JsonProperty("post_title")
private String title;
@JsonProperty("post_date")
private String publishDate;
@JsonProperty("post_excerpt")
private String excerpt;
@JsonProperty("guid")
private String url;