Archive for category Java Development

Eventual Consistency with Spring for Apache Kafka: Part 2 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

As discussed in Part One of this post, given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components in a local development environment running on Kubernetes with Istio, using minikube. For simplicity’s sake, we will only run a single instance of each service. Additionally, we are not implementing custom domain names, TLS/HTTPS, authentication and authorization, API keys, or restricting access to any sensitive operational API endpoints or ports, all of which we would certainly do in an actual production environment.

To provide operational visibility, we will add Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo ExpressKialiPrometheus, and Grafana to our system.

View of Storefront API traffic from Kiali

Prerequisites

This post will assume a basic level of knowledge of Kubernetes, minikube, Docker, and Istio. Furthermore, the post assumes you have already installed recent versions of minikube, kubectl, Docker, and Istio. Meaning, that the kubectl, istioctl, docker, and minikube commands are all available from the terminal.

Currently installed version of the required applications

For this post demonstration, I am using an Apple MacBook Pro running macOS as my development machine. I have the latest versions of Docker Desktop, minikube, kubectl, and Istio installed as of May 2021.

Source Code

The source code for this post is open-source and is publicly available on GitHub. Clone the GitHub project using the following command:

clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Minikube

Part of the Kubernetes project, minikube is local Kubernetes, focusing on making it easy to learn and develop for Kubernetes. Minikube quickly sets up a local Kubernetes cluster on macOS, Linux, and Windows. Given the number of Kubernetes resources we will be deploying to minikube, I would recommend at least 3 CPUs and 4–5 GBs of memory. If you choose to deploy multiple observability tools, you may want to increase both of these resources if you can afford it. I maxed out both CPUs and memory several times while setting up this demonstration, causing temporary lock-ups of minikube.

minikube --cpus 3 --memory 5g --driver=docker start start

The Docker driver allows you to install Kubernetes into an existing Docker install. If you are using Docker, please be aware that you must have at least an equivalent amount of resources allocated to Docker to apportion to minikube.

Before continuing, confirm minikube is up and running and confirm the current context of kubectl is minikube.

minikube status
kubectl config current-context

The statuses should look similar to the following:

Use the eval below command to point your shell to minikube’s docker-daemon. You can confirm this by using the docker image ls and docker container ls command to view running Kubernetes containers on minikube.

eval $(minikube -p minikube docker-env)
docker image ls
docker container ls

The output should look similar to the following:

You can also check the status of minikube from Docker Desktop. Minikube is running as a container, instantiated from a Docker image, gcr.io/k8s-minikube/kicbase. View the container’s Stats, as shown below.

Istio

Assuming you have downloaded and configured Istio, install it onto minikube. I currently have Istio 1.10.0 installed and have theISTIO_HOME environment variable set in my Oh My Zsh .zshrc file. I have also set Istio’s bin/ subdirectory in my PATH environment variable. The bin/ subdirectory contains the istioctl executable.

echo $ISTIO_HOME                                                                
> /Applications/Istio/istio-1.10.0
where istioctl
> /Applications/Istio/istio-1.10.0/bin/istioctl
istioctl version

> client version: 1.10.0
control plane version: 1.10.0
data plane version: 1.10.0 (4 proxies)

Istio comes with several built-in configuration profiles. The profiles provide customization of the Istio control plane and of the sidecars for the Istio data plane.

istioctl profile list
> Istio configuration profiles:
default
demo
empty
external
minimal
openshift
preview
remote

For this demonstration, we will use the default profile, which installs istiod and an istio-ingressgateway. We will not require the use of an istio-egressgateway, since all components will be installed locally on minikube.

istioctl install --set profile=default -y
> ✔ Istio core installed
✔ Istiod installed
✔ Ingress gateways installed
✔ Installation complete

Minikube Tunnel

kubectl get svc istio-ingressgateway -n istio-system

To associate an IP address, run the minikube tunnel command in a separate terminal tab. Since it requires opening privileged ports 80 and 443 to be exposed, this command will prompt you for your sudo password.

Services of the type LoadBalancer can be exposed by using the minikube tunnel command. It must be run in a separate terminal window to keep the LoadBalancer running. We previously created the istio-ingressgateway. Run the following command and note that the status of EXTERNAL-IP is <pending>. There is currently no external IP address associated with our LoadBalancer.

minikube tunnel

Rerun the previous command. There should now be an external IP address associated with the LoadBalancer. In my case, 127.0.0.1.

kubectl get svc istio-ingressgateway -n istio-system

The external IP address shown is the address we will use to access the resources we chose to expose externally on minikube.

Minikube Dashboard

Once again, in a separate terminal tab, open the Minikube Dashboard (aka Kubernetes Dashboard).

minikube dashboard

The dashboard will give you a visual overview of all your installed Kubernetes components.

Minikube Dashboard showing the istio-system namespace

Namespaces

Kubernetes supports multiple virtual clusters backed by the same physical cluster. These virtual clusters are called namespaces. For this demonstration, we will use four namespaces to organize our deployed resources: dev, mongo, kafka, and storefront-kafka-project. The dev namespace is where we will deploy our Storefront API’s microservices: accounts, orders, and fulfillment. We will deploy MongoDB and Mongo Express to the mongo namespace. Lastly, we will use the kafka and storefront-kafka-project namespaces to deploy Apache Kafka to minikube using Strimzi, a Cloud Native Computing Foundation sandbox project, and CMAK.

kubectl apply -f ./minikube/resources/namespaces.yaml

Automatic Sidecar Injection

In order to take advantage of all of Istio’s features, pods in the mesh must be running an Istio sidecar proxy. When you set the istio-injection=enabled label on a namespace and the injection webhook is enabled, any new pods created in that namespace will automatically have a sidecar added to them. Labeling the dev namespace for automatic sidecar injection ensures that our Storefront API’s microservices — accounts, orders, and fulfillment— will have Istio sidecar proxy automatically injected into their pods.

kubectl label namespace dev istio-injection=enabled

MongoDB

Next, deploy MongoDB and Mongo Express to the mongo namespace on minikube. To ensure a successful connection to MongoDB from Mongo Express, I suggest giving MongoDB a chance to start up fully before deploying Mongo Express.

kubectl apply -f ./minikube/resources/mongodb.yaml -n mongo
sleep 60
kubectl apply -f ./minikube/resources/mongo-express.yaml -n mongo

To confirm the success of the deployments, use the following command:

kubectl get services -n mongo

Or use the Kubernetes Dashboard to confirm deployments.

Mongo Express UI Access

For parts of your application (for example, frontends) you may want to expose a Service onto an external IP address outside of your cluster. Kubernetes ServiceTypes allows you to specify what kind of Service you want; the default is ClusterIP.

Note that while MongoDB uses the ClusterIP, Mongo Express uses NodePort. With NodePort, the Service is exposed on each Node’s IP at a static port (the NodePort). You can contact the NodePort Service, from outside the cluster, by requesting <NodeIP>:<NodePort>.

In a separate terminal tab, open Mongo Express using the following command:

minikube service --url mongo-express -n mongo

You should see output similar to the following:

Click on the link to open Mongo Express. There should already be three MongoDB operational databases shown in the UI. The three Storefront databases and collections will be created automatically, later in the post: accounts, orders, and fulfillment.

Apache Kafka using Strimzi

Next, we will install Apache Kafka and Apache Zookeeper into the kafka and storefront-kafka-project namespaces on minikube, using Strimzi. Since Strimzi has a great, easy-to-use Quick Start guide, I will not detail the complete install complete process in this post. I suggest using their guide to understand the process and what each command does. Then, use the slightly modified Strimzi commands I have included below to install Kafka and Zookeeper.

# assuming 0.23.0 is latest version available
curl -L -O https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.23.0/strimzi-0.23.0.zip
unzip strimzi-0.23.0.zip
cd strimzi-0.23.0
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
# manually change STRIMZI_NAMESPACE value to storefront-kafka-project
nano install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
kubectl create -f install/cluster-operator/ -n kafka
kubectl create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-cluster.yaml -n storefront-kafka-project
kubectl wait kafka/kafka-cluster --for=condition=Ready --timeout=300s -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-topics.yaml -n storefront-kafka-project

Zoo Entrance

We want to install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. However, CMAK required access to Zookeeper. You can not access Strimzi’s Zookeeper directly from CMAK; this is intentional to avoid performance and security issues. See this GitHub issue for a better explanation of why. We will use the appropriately named Zoo Entrance as a proxy for CMAK to Zookeeper to overcome this challenge.

To install Zoo Entrance, review the GitHub project’s install guide, then use the following commands:

git clone https://github.com/scholzj/zoo-entrance.git
cd zoo-entrance
# optional: change my-cluster to kafka-cluster
sed -i '' 's/my-cluster/kafka-cluster/' deploy.yaml
kubectl apply -f deploy.yaml -n storefront-kafka-project

Cluster Manager for Apache Kafka

Next, install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. Run the following command to deploy CMAK into the storefront-kafka-project namespace.

kubectl apply -f ./minikube/resources/cmak.yaml -n storefront-kafka-project

Similar to Mongo Express, we can access CMAK’s UI using its NodePort. In a separate terminal tab, run the following command:

minikube service --url cmak -n storefront-kafka-project

You should see output similar to Mongo Express. Click on the link provided to access CMAK. Choose ‘Add Cluster’ in CMAK to add our existing Kafka cluster to CMAK’s management interface. Use Zoo Enterence’s service address for the Cluster Zookeeper Hosts value.

zoo-entrance.storefront-kafka-project.svc:2181

Once complete, you should see the three Kafka topics we created previously with Strimzi: accounts.customer.change, fulfillment.order.change, and orders.order.change. Each topic will have three partitions, one replica, and one broker. You should also see the _consumer_offsets topic that Kafka uses to store information about committed offsets for each topic:partition per group of consumers (groupID).

Storefront API Microservices

We are finally ready to install our Storefront API’s microservices into the dev namespace. Each service is preconfigured to access Kafka and MongoDB in their respective namespaces.

kubectl apply -f ./minikube/resources/accounts.yaml -n dev
kubectl apply -f ./minikube/resources/orders.yaml -n dev
kubectl apply -f ./minikube/resources/fulfillment.yaml -n dev

Spring Boot services usually take about two minutes to fully start. The time required to download the Docker Images from docker.com and the start-up time means it could take 3–4 minutes for each of the three services to be ready to accept API traffic.

Istio Components

We want to be able to access our Storefront API’s microservices through our Kubernetes LoadBalancer, while also leveraging all the capabilities of Istio as a service mesh. To do so, we need to deploy an Istio Gateway and a VirtualService. We will also need to deploy DestinationRule resources. A Gateway describes a load balancer operating at the edge of the mesh receiving incoming or outgoing HTTP/TCP connections. A VirtualService defines a set of traffic routing rules to apply when a host is addressed. Lastly, a DestinationRule defines policies that apply to traffic intended for a Service after routing has occurred.

kubectl apply -f ./minikube/resources/destination_rules.yaml -n dev
kubectl apply -f ./minikube/resources/istio-gateway.yaml -n dev

Testing the System and Creating Sample Data

I have provided a Python 3 script that runs a series of seven HTTP GET requests, in a specific order, against the Storefront API. These calls will validate the deployments, confirm the API’s services can access Kafka and MongoDB, generate some initial data, and automatically create the MongoDB database collections from the initial Insert statements.

python3 -m pip install -r ./utility_scripts/requirements.txt -U
python3 ./utility_scripts/refresh.py

The script’s output should be as follows:

If we now look at Mongo Express, we should note three new databases: accounts, orders, and fulfillment.

Observability Tools

Istio makes it easy to integrate with a number of common tools, including cert-managerPrometheusGrafanaKialiZipkin, and Jaeger. In order to better observe our Storefront API, we will install three well-known observability tools: Kiali, Prometheus, and Grafana. Luckily, these tools are all included with Istio. You can install any or all of these to minikube. I suggest installing the tools one at a time as not to overwhelm minikube’s CPU and memory resources.

kubectl apply -f ./minikube/resources/prometheus.yaml

kubectl apply -f $ISTIO_HOME/samples/addons/grafana.yaml

kubectl apply -f $ISTIO_HOME/samples/addons/kiali.yaml

Once deployment is complete, to access any of the UI’s for these tools, use the istioctl dashboard command from a new terminal window:

istioctl dashboard kiali

istioctl dashboard prometheus

istioctl dashboard grafana

Kiali

Below we see a view of Kiali with API traffic flowing to Kafka and MongoDB.

View of Storefront API traffic from Kiali

Prometheus

Each of the three Storefront API microservices has a dependency on Micrometer; specifically, a dependency on micrometer-registry-prometheus. As an instrumentation facade, Micrometer allows you to instrument your code with dimensional metrics with a vendor-neutral interface and decide on the monitoring system as a last step. Instrumenting your core library code with Micrometer allows the libraries to be included in applications that ship metrics to different backends. Given the Micrometer Prometheus dependency, each microservice exposes a /prometheus endpoint (e.g., http://127.0.0.1/accounts/actuator/prometheus) as shown below in Postman.

The /prometheus endpoint exposes dozens of useful metrics and is configured to be scraped by Prometheus. These metrics can be displayed in Prometheus and indirectly in Grafana dashboards via Prometheus. I have customized Istio’s version of Prometheus and included it in the project (prometheus.yaml), which now scrapes the Storefront API’s metrics.

scrape_configs:
- job_name: 'spring_micrometer'
metrics_path: '/actuator/prometheus'
scrape_interval: 5s
static_configs:
- targets: ['accounts.dev:8080','orders.dev:8080','fulfillment.dev:8080']

Here we see an example graph of a Spring Kafka Listener metric, spring_kafka_listener_seconds_sum, in Prometheus. There are dozens of metrics exposed to Prometheus from our system that we can observe and alert on.

Grafana

Lastly, here is an example Spring Boot Dashboard in Grafana. More dashboards are available on Grafana’s community dashboard page. The Grafana dashboard uses Prometheus as the source of its metrics data.

Storefront API Endpoints

The three storefront services are fully functional Spring Boot, Spring Data REST, Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. To better understand the Storefront API, each Spring Boot microservice uses SpringFox, which produces automated JSON API documentation for APIs built with Spring. The service builds also include the springfox-swagger-ui web jar, which ships with Swagger UI. Swagger takes the manual work out of API documentation, with a range of solutions for generating, visualizing, and maintaining API docs.

From a web browser, you can use the /swagger-ui/ subdirectory/subpath with any of the three microservices to access the fully-featured Swagger UI (e.g., http://127.0.0.1/accounts/swagger-ui/).

Accounts service Customer entity endpoints

Each service’s data model (POJOs) is also exposed through the Swagger UI.

Accounts service data model

Spring Boot Actuator

Additionally, each service includes Spring Boot Actuator. The Actuator exposes additional operational endpoints, allowing us to observe the running services. With Actuator, you get many features, including access to available operational-oriented endpoints, using the /actuator/ subdirectory/subpath (e.g., http://127.0.0.1/accounts/actuator/). For this demonstration, I have not restricted access to any available Actuator endpoints.

Partial list of Spring Boot Actuator endpoints as seen using Swagger
Partial list of Spring Boot Actuator endpoints as seen using Postman

Conclusion

In this two-part post, we learned how to build an API using Spring Boot. We ensured the API’s distributed data integrity using a pub/sub model with Spring for Apache Kafka Project. When a relevant piece of data was changed by one microservice, that state change triggered a state change event that was shared with other microservices using Kafka topics.

We also learned how to deploy and run the API in a local development environment running on Kubernetes with Istio, using minikube. We have added production-tested observability tools to provide operational visibility, including CMAK, Mongo Express, Kiali, Prometheus, and Grafana.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment

Eventual Consistency with Spring for Apache Kafka: Part 1 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

Given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Introduction

Apache Kafka is an open-source distributed event streaming platform capable of handling trillions of messages. According to Confluent, initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform.

Eventual consistency, according to Wikipedia, is a consistency model used in distributed computing to achieve high availability that informally guarantees that if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. I previously covered the topic of eventual consistency in a distributed system using RabbitMQ in the May 2017 post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. The post was featured on Pivotal’s RabbitMQ website.

Domain-driven Design

To ground the discussion, let’s examine a common example — an online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

Given this problem domain, we can assume we have the concept of a Customer. Further, we can assume the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of the Customer will require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program and online shopping activity. Fulfillment may maintain a record of all orders being shipped to the customer. Security likely holds the customer’s access credentials, account access history, and privacy settings.

Below are the Customer data objects are shown in yellow. Orange represents the logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts or even between services within the same context, then we must ensure data consistency. Take, for example, the case where a customer changes their home address or email. Let us assume that the Accounting context is the system of record for these data fields. However, to fulfill orders, the Shipping context might also need to maintain the customer’s current home address. Likewise, the Marketing context, responsible for opt-in email advertising, also needs to be aware of the email change and update its customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change without expecting a response. They are stating a fact, not asking a question. Interested parties can choose if and how to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, defined by Martin Fowler of ThoughtWorks in his insightful post, What do you mean by “Event-Driven”?. Changes to a piece of data can be thought of as a state change event — events that contain details of the data that changed. Coincidentally, Fowler uses a customer’s address change as an example of Event-Carried State Transfer in the post. Fellow former ThoughtWorker Graham Brooks also detailed the concept in his post, Event-Carried State Transfer Pattern.

Consistency Strategies

Multiple architectural approaches can be taken to solve for data consistency in a distributed system. For example, you could use a single relational database with shared schemas to persist data, avoiding the distributed data model altogether. However, it could be argued that using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages persist in Kafka, the service has the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity to the system.

In this post’s somewhat simplistic architecture, the business microservices will maintain consistency across their respective domains by producing and consuming messages from multiple Kafka topics to which they are subscribed. Kafka Producers may also be Consumers within our domain.

Storefront Example

In this post, our online storefront API will be built in Java using Spring Boot and OpenJDK 16. We will ensure the uniformity of distributed data by using a publish/subscribe model with Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot microservice, if appropriate, that state change will trigger a state change event, which will be shared with other microservices using Kafka topics.

View of the Storefront API from Kiali

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from one another while still ensuring the data is distributed.

Given the use case of placing an order, we will examine the interactions of three services that compose our storefront API: the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other in a completely decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at three event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service.

Below is a view of the online storefront through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you an idea of where Kafka and Zookeeper, Kafka’s current cluster manager, might sit in a typical, highly-available, microservice-based, distributed application platform.

This post will focus on the storefront’s backend API — its services, databases, and messaging sub-systems.

Storefront Microservices

We will explore the functionality of each of the three microservices and how they share state change events using Kafka 2.8. Each storefront API service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream are not part of this post.

Source Code

The storefront’s microservices source code is publicly available on GitHub. The four GitHub projects can be cloned using the following commands:

git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-accounts.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-orders.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-fulfillment.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. Below we see the representation of a Customer, as a BSON document in the customer.accounts MongoDB database collection.

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}
view raw customer.orders.bson hosted with ❤ by GitHub

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

Since the CustomerChangeEvent domain event object does not persist in MongoDB, we can look at its JSON message payload in Kafka to examine its structure. Note the differences in the data structure (schema) between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload.

{
"id": "5b189af9a8d05613315b0212",
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}, {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}]
}

For simplicity, we will assume that other services do not make changes to the customer’s name, contact information, or addresses — this is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub. Use the latest 2021-istio branch of the project.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows:

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}
view raw customer.orders.bson hosted with ❤ by GitHub

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. The FulfillmentRequestEvent object only contains the information it needs to share. Our example shares a single Order, along with the customer’s name, contact information, and shipping address.

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at its JSON message payload in Kafka. Again, note the schema differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka.

{
"timestamp": 1528334218821,
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"address": {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
"order": {
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvents": [{
"timestamp": 1528333926586,
"orderStatusType": "CREATED",
"note": null
}, {
"timestamp": 1528333926586,
"orderStatusType": "APPROVED",
"note": null
}],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": 11.99
},
"quantity": 5
}]
}
}

Source code for the Orders service is available on GitHub. Use the latest 2021-istio branch of the project.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store the latest shipping status, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping address are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object represented as a BSON document in the fulfillment.requests database collection looks as follows:

{
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"),
"timestamp": NumberLong("1528553706260"),
"name": {
"title": "Ms.",
"firstName": "Susan",
"lastName": "Blackstone"
},
"contact": {
"primaryPhone": "433-544-6555",
"secondaryPhone": "223-445-6767",
"email": "susan.m.blackstone@emailisus.com"
},
"address": {
"type": "SHIPPING",
"description": "Home Sweet Home",
"address1": "33 Oak Avenue",
"city": "Nowhere",
"state": "VT",
"postalCode": "444556-9090"
},
"order": {
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44",
"orderStatusEvents": [{
"timestamp": NumberLong("1528558453686"),
"orderStatusType": "RECEIVED"
}],
"orderItems": [{
"product": {
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa",
"title": "Purple Widget"
},
"quantity": 2
},
{
"product": {
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897",
"title": "Blue Widget"
},
"quantity": 5
},
{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget"
},
"quantity": 2
}
]
},
"shippingMethod": "Drone",
"_class": "com.storefront.model.Fulfillment"
}

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, again, we can again look at its JSON message payload in Kafka.

{
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvent": {
"timestamp": 1528334452746,
"orderStatusType": "PROCESSING",
"note": null
}
}

Source code for the Fulfillment service is available on GitHub. Use the latest 2021-istio branch of the project.

State Change Event Messaging Flows

There are three state change event messaging flows illustrated in this post.

  1. Changes to a Customer triggers an event message produced by the Accounts service, which is published on the accounts.customer.change Kafka topic and consumed by the Orders service;
  2. Order Approved triggers an event message produced by the Orders service, which is published on the orders.order.fulfill Kafka topic, and is consumed by the Fulfillment service;
  3. Changes to the status of an Order triggers an event message produced by the Fulfillment Service, which is published on the fulfillment.order.change Kafka topic, and is consumed by the Orders service;

Each of these state change event messaging flows follows the same architectural pattern on both the Kafka topic’s producer and consumer sides.

Let us examine each state change event messaging flow and the code behind it.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. By way of Kafka, it can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information.

There are different methods to trigger a message to be sent to Kafka. For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity.

@Slf4j
@Controller
public class AfterSaveListener extends AbstractMongoEventListener<Customer> {
@Value("${spring.kafka.topic.accounts-customer}")
private String topic;
private Sender sender;
@Autowired
public AfterSaveListener(Sender sender) {
this.sender = sender;
}
@Override
public void onAfterSave(AfterSaveEvent<Customer> event) {
log.info("onAfterSave event='{}'", event);
Customer customer = event.getSource();
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent();
customerChangeEvent.setId(customer.getId());
customerChangeEvent.setName(customer.getName());
customerChangeEvent.setContact(customer.getContact());
customerChangeEvent.setAddresses(customer.getAddresses());
sender.send(topic, customerChangeEvent);
}
}

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate;
public void send(String topic, CustomerChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender uses a KafkaTemplate to send the message to the accounts.customer.change Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

The Orders service’s Receiver class consumes the CustomerChangeEvent messages produced by the Accounts service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Orders service’s Receiver class is configured differently compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as input, denoting the object type into which the message payload needs to be deserialized. This way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file in each service’s resources directory contains the Kafka configuration (lines 11–19).

server:
port: 8080
spring:
main:
allow-bean-definition-overriding: true
application:
name: orders
data:
mongodb:
uri: mongodb://mongo:27017/orders
kafka:
bootstrap-servers: kafka:9092
topic:
accounts-customer: accounts.customer.change
orders-order: orders.order.fulfill
fulfillment-order: fulfillment.order.change
consumer:
group-id: orders
auto-offset-reset: earliest
zipkin:
sender:
type: kafka
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: INFO
spring:
config:
activate:
on-profile: local
data:
mongodb:
uri: mongodb://localhost:27017/orders
kafka:
bootstrap-servers: localhost:9092
server:
port: 8090
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
spring:
config:
activate:
on-profile: confluent
server:
port: 8080
logging:
level:
root: INFO
server:
port: 8080
spring:
config:
activate:
on-profile: minikube
data:
mongodb:
uri: mongodb://mongo.dev:27017/orders
kafka:
bootstrap-servers: kafka-cluster.dev:9092
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
view raw application.yaml hosted with ❤ by GitHub

Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the orders.order.fulfill Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate;
public void send(String topic, FulfillmentRequestEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender class uses a KafkaTemplate to send the message to the orders.order.fulfill Kafka topic, as shown below. Since message order is not critical, messages can be sent to a topic with multiple partitions if the volume of messages required it.

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. The Fulfillment object includes the order to be fulfilled and the customer’s contact and shipping information.

@Slf4j
@Component
public class Receiver {
@Autowired
private FulfillmentRepository fulfillmentRepository;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.orders-order}")
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) {
log.info("received payload='{}'", fulfillmentRequestEvent.toString());
latch.countDown();
Fulfillment fulfillment = new Fulfillment();
fulfillment.setId(fulfillmentRequestEvent.getId());
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp());
fulfillment.setName(fulfillmentRequestEvent.getName());
fulfillment.setContact(fulfillmentRequestEvent.getContact());
fulfillment.setAddress(fulfillmentRequestEvent.getAddress());
fulfillment.setOrder(fulfillmentRequestEvent.getOrder());
fulfillmentRepository.save(fulfillment);
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(FulfillmentRequestEvent.class));
}
@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Fulfillment Order Status State Change

When the Order status in a Fulfillment entity is changed to anything other than Approved, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial Created status to the final Received status.

The Fulfillment service exposes several endpoints via the FulfillmentController class, which simulates a change in order status. They allow an order’s status to be changed from Approved to Processing, to Shipped, to In Transit, and finally to Received. This change applies to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates a Kafka message, containing the OrderStatusChangeEvent in the message payload. The Fulfillment service’s Sender class handles this.

Note in this example that these two events are not handled in an atomic transaction. Either updating the database or sending the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure that both these independent actions succeed or fail as a single transaction to ensure data consistency, using any of a handful of common architectural patterns.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;
public void send(String topic, OrderStatusChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
view raw SenderConfig.java hosted with ❤ by GitHub

The Sender class uses a KafkaTemplate to send the message to the fulfillment.order.change Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages can be sent to a topic with multiple partitions if the volume of messages requires it.

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message produced by the Fulfillment service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service receives messages from more than one topic. The ReceiverConfig class deserializes all messages using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory class’s setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components into a local development environment running on Kubernetes with Istio, using Minikube. To provide operational visibility, we will add observability tools, like Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo Express, Kiali, Prometheus, and Grafana to our system.

View of the Storefront API from Kiali

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Java Development with Microsoft SQL Server: Calling Microsoft SQL Server Stored Procedures from Java Applications Using JDBC

Introduction

Enterprise software solutions often combine multiple technology platforms. Accessing an Oracle database via a Microsoft .NET application and vice-versa, accessing Microsoft SQL Server from a Java-based application is common. In this post, we will explore the use of the JDBC (Java Database Connectivity) API to call stored procedures from a Microsoft SQL Server 2017 database and return data to a Java 11-based console application.

View of the post’s Java project from JetBrains’ IntelliJ IDE

The objectives of this post include:

  • Demonstrate the differences between using static SQL statements and stored procedures to return data.
  • Demonstrate three types of JDBC statements to return data: Statement, PreparedStatement, and CallableStatement.
  • Demonstrate how to call stored procedures with input and output parameters.
  • Demonstrate how to return single values and a result set from a database using stored procedures.

Why Stored Procedures?

To access data, many enterprise software organizations require their developers to call stored procedures within their code as opposed to executing static T-SQL (Transact-SQL) statements against the database. There are several reasons stored procedures are preferred:

  • Optimization: Stored procedures are often written by DBAs or database developers who specialize in database development. They understand the best way to construct queries for optimal performance and minimal load on the database server. Think of it as a developer using an API to interact with the database.
  • Safety and Security: Stored procedures are considered safer and more secure than static SQL statements. The stored procedure provides tight control over the content of the queries, preventing malicious or unintentionally destructive code from being executed against the database.
  • Error Handling: Stored procedures can contain logic for handling errors before they bubble up to the application layer and possibly to the end-user.

AdventureWorks 2017 Database

For brevity, I will use an existing and well-known Microsoft SQL Server database, AdventureWorks. The AdventureWorks database was originally published by Microsoft for SQL Server 2008. Although a bit dated architecturally, the database comes prepopulated with plenty of data for demonstration purposes.

The HumanResources schema, one of five schemas within the AdventureWorks database

For the demonstration, I have created an Amazon RDS for SQL Server 2017 Express Edition instance on AWS. You have several options for deploying SQL Server, including AWS, Microsoft Azure, Google Cloud, or installed on your local workstation.

There are many methods to deploy the AdventureWorks database to Microsoft SQL Server. For this post’s demonstration, I used the AdventureWorks2017.bak backup file, which I copied to Amazon S3. Then, I enabled and configured the native backup and restore feature of Amazon RDS for SQL Server to import and install the backup.

DROP DATABASE IF EXISTS AdventureWorks;
GO

EXECUTE msdb.dbo.rds_restore_database
@restore_db_name='AdventureWorks',
@s3_arn_to_restore_from='arn:aws:s3:::my-bucket/AdventureWorks2017.bak',
@type='FULL',
@with_norecovery=0;

-- get task_id from output (e.g. 1)

EXECUTE msdb.dbo.rds_task_status
@db_name='AdventureWorks',
@task_id=1;

Install Stored Procedures

For the demonstration, I have added four stored procedures to the AdventureWorks database to use in this post. To follow along, you will need to install these stored procedures, which are included in the GitHub project.

View of the new stored procedures from JetBrains’ IntelliJ IDE Database tab

Data Sources, Connections, and Properties

Using the latest Microsoft JDBC Driver 8.4 for SQL Server (ver. 8.4.1.jre11), we create a SQL Server data source, com.microsoft.sqlserver.jdbc.SQLServerDataSource, and database connection, java.sql.Connection. There are several patterns for creating and working with JDBC data sources and connections. This post does not necessarily focus on the best practices for creating or using either. In this example, the application instantiates a connection class, SqlConnection.java, which in turn instantiates the java.sql.Connection and com.microsoft.sqlserver.jdbc.SQLServerDataSource objects. The data source’s properties are supplied from an instance of a singleton class, ProjectProperties.java. This properties class instantiates the java.util.Properties class, which reads values from a configuration properties file, config.properties. On startup, the application creates the database connection, calls each of the example methods, and then closes the connection.

Examples

For each example, I will show the stored procedure, if applicable, followed by the Java method that calls the procedure or executes the static SQL statement. I have left out the data source and connection code in the article. Again, a complete copy of all the code for this article is available on GitHub, including Java source code, SQL statements, helper SQL scripts, and a set of basic JUnit tests.

To run the JUnit unit tests, using Gradle, which the project is based on, use the ./gradlew cleanTest test --warning-mode none command.

A successful run of the JUnit tests

To build and run the application, using Gradle, which the project is based on, use the ./gradlew run --warning-mode none command.

The output of the Java console application

Example 1: SQL Statement

Before jumping into stored procedures, we will start with a simple static SQL statement. This example’s method, getAverageProductWeightST, uses the java.sql.Statement class. According to Oracle’s JDBC documentation, the Statement object is used for executing a static SQL statement and returning the results it produces. This SQL statement calculates the average weight of all products in the AdventureWorks database. It returns a solitary double numeric value. This example demonstrates one of the simplest methods for returning data from SQL Server.

/**
* Statement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightST() {
double averageWeight = 0;
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.getConnection().createStatement();
String sql = "WITH Weights_CTE(AverageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2)" +
"FROM [Weights_CTE];";
rs = stmt.executeQuery(sql);
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 2: Prepared Statement

Next, we will execute almost the same static SQL statement as in Example 1. The only change is the addition of the column name, averageWeight. This allows us to parse the results by column name, making the code easier to understand as opposed to using the numeric index of the column as in Example 1.

Also, instead of using the java.sql.Statement class, we use the java.sql.PreparedStatement class. According to Oracle’s documentation, a SQL statement is precompiled and stored in a PreparedStatement object. This object can then be used to execute this statement multiple times efficiently.

/**
* PreparedStatement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightPS() {
double averageWeight = 0;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
String sql = "WITH Weights_CTE(averageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2) AS [averageWeight]" +
"FROM [Weights_CTE];";
pstmt = connection.getConnection().prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
averageWeight = rs.getDouble("averageWeight");
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 3: Callable Statement

In this example, the average product weight query has been moved into a stored procedure. The procedure is identical in functionality to the static statement in the first two examples. To call the stored procedure, we use the java.sql.CallableStatement class. According to Oracle’s documentation, the CallableStatement extends PreparedStatement. It is the interface used to execute SQL stored procedures. The CallableStatement accepts both input and output parameters; however, this simple example does not use either. Like the previous two examples, the procedure returns a double numeric value.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeight]
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
ResultSet rs = null;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeight]}");
cstmt.execute();
rs = cstmt.getResultSet();
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 4: Calling a Stored Procedure with an Output Parameter

In this example, we use almost the same stored procedure as in Example 3. The only difference is the inclusion of an output parameter. This time, instead of returning a result set with a value in a single unnamed column, the column has a name, averageWeight. We can now call that column by name when retrieving the value.

The stored procedure patterns found in Examples 3 and 4 are both commonly used. One procedure uses an output parameter, and one not, both return the same value(s). You can use the CallableStatement to for either type.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeightOUT]@averageWeight DECIMAL(8, 2) OUT
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT @averageWeight = ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) output parameter, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightOutCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeightOUT](?)}");
cstmt.registerOutParameter("averageWeight", Types.DECIMAL);
cstmt.execute();
averageWeight = cstmt.getDouble("averageWeight");
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 5: Calling a Stored Procedure with an Input Parameter

In this example, the procedure returns a result set, java.sql.ResultSet, of employees whose last name starts with a particular sequence of characters (e.g., ‘M’ or ‘Sa’). The sequence of characters is passed as an input parameter, lastNameStartsWith, to the stored procedure using the CallableStatement.

The method making the call iterates through the rows of the result set returned by the stored procedure, concatenating multiple columns to form the employee’s full name as a string. Each full name string is then added to an ordered collection of strings, a List<String> object. The List instance is returned by the method. You will notice this procedure takes a little longer to run because of the use of the LIKE operator. The database server has to perform pattern matching on each last name value in the table to determine the result set.

CREATE OR
ALTER PROCEDURE [HumanResources].[uspGetEmployeesByLastName]
@lastNameStartsWith VARCHAR(20) = 'A'
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[FirstName], p.[MiddleName], p.[LastName], p.[Suffix], e.[JobTitle], m.[EmailAddress]
FROM [HumanResources].[Employee] AS e
LEFT JOIN [Person].[Person] p ON e.[BusinessEntityID] = p.[BusinessEntityID]
LEFT JOIN [Person].[EmailAddress] m ON e.[BusinessEntityID] = m.[BusinessEntityID]
WHERE e.[CurrentFlag] = 1
AND p.[PersonType] = 'EM'
AND p.[LastName] LIKE @lastNameStartsWith + '%'
ORDER BY p.[LastName], p.[FirstName], p.[MiddleName]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) input parameter, returns ResultSet
*
*
@param lastNameStartsWith
*
@return List of employee names
*/
public List<String> getEmployeesByLastNameCS(String lastNameStartsWith) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<String> employeeFullName = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [HumanResources].[uspGetEmployeesByLastName](?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("lastNameStartsWith", lastNameStartsWith);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
employeeFullName.add(
rs.getString("LastName") + ", "
+ rs.getString("FirstName") + " "
+ rs.getString("MiddleName"));
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return employeeFullName;
}

Example 6: Converting a Result Set to Ordered Collection of Objects

In this last example, we pass two input parameters, productColor and productSize, to a slightly more complex stored procedure. The stored procedure returns a result set containing several columns of product information. This time, the example’s method iterates through the result set returned by the procedure and constructs an ordered collection of products, List<Product> object. The Product objects in the list are instances of the Product.java POJO class. The method converts each results set’s row’s field value into a Product property (e.g., Product.Size, Product.Model). Using a collection is a common method for persisting data from a result set in an application.

CREATE OR
ALTER PROCEDURE [Production].[uspGetProductsByColorAndSize]
@productColor VARCHAR(20),
@productSize INTEGER
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[ProductNumber], m.[Name] AS [Model], p.[Name] AS [Product], p.[Color], p.[Size]
FROM [Production].[ProductModel] AS m
INNER JOIN
[Production].[Product] AS p ON m.[ProductModelID] = p.[ProductModelID]
WHERE (p.[Color] = @productColor)
AND (p.[Size] = @productSize)
ORDER BY p.[ProductNumber], [Model], [Product]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (2) input parameters, returns ResultSet
*
*
@param color
*
@param size
*
@return List of Product objects
*/
public List<Product> getProductsByColorAndSizeCS(String color, String size) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<Product> productList = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetProductsByColorAndSize](?, ?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("productColor", color);
cstmt.setString("productSize", size);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
Product product = new Product(
rs.getString("Product"),
rs.getString("ProductNumber"),
rs.getString("Color"),
rs.getString("Size"),
rs.getString("Model"));
productList.add(product);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return productList;
}

Proper T-SQL: Schema Reference and Brackets

You will notice in all T-SQL statements, I refer to the schema as well as the table or stored procedure name (e.g., {call [Production].[uspGetAverageProductWeightOUT](?)}). According to Microsoft, it is always good practice to refer to database objects by a schema name and the object name, separated by a period; that even includes the default schema (e.g., dbo).

You will also notice I wrap the schema and object names in square brackets (e.g., SELECT [ProductNumber] FROM [Production].[ProductModel]). The square brackets are to indicate that the name represents an object and not a reserved word (e.g, CURRENT or NATIONAL). By default, SQL Server adds these to make sure the scripts it generates run correctly.

Running the Examples

The application will display the name of the method being called, a description, the duration of time it took to retrieve the data, and the results returned by the method.

package com.article.examples;
import java.util.List;
/**
* Main class that calls all example methods
*
* @author Gary A. Stafford
*/
public class RunExamples {
private static final Examples examples = new Examples();
private static final ProcessTimer timer = new ProcessTimer();
/**
* @param args the command line arguments
* @throws Exception
*/
public static void main(String[] args) throws Exception {
System.out.println();
System.out.println("SQL SERVER STATEMENT EXAMPLES");
System.out.println("======================================");
// Statement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
double averageWeight = examples.getAverageProductWeightST();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightST");
System.out.println("Description: Statement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// PreparedStatement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightPS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightPS");
System.out.println("Description: PreparedStatement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// CallableStatement, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightCS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightCS");
System.out.println("Description: CallableStatement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.println("");
// CallableStatement example, (1) output parameter, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightOutCS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightOutCS");
System.out.println("Description: CallableStatement, (1) output parameter, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("");
// CallableStatement example, (1) input parameter, returns ResultSet
timer.setStartTime(System.nanoTime());
String lastNameStartsWith = "Sa";
List<String> employeeFullName =
examples.getEmployeesByLastNameCS(lastNameStartsWith);
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetEmployeesByLastNameCS");
System.out.println("Description: CallableStatement, (1) input parameter, returns ResultSet");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Last names starting with '%s': %d%n", lastNameStartsWith, employeeFullName.size());
if (employeeFullName.size() > 0) {
System.out.printf(" Last employee found: %s%n", employeeFullName.get(employeeFullName.size() 1));
} else {
System.out.printf("No employees found with last name starting with '%s'%n", lastNameStartsWith);
}
System.out.println("");
// CallableStatement example, (2) input parameters, returns ResultSet
timer.setStartTime(System.nanoTime());
String color = "Red";
String size = "44";
List<Product> productList =
examples.getProductsByColorAndSizeCS(color, size);
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetProductsByColorAndSizeCS");
System.out.println("Description: CallableStatement, (2) input parameter, returns ResultSet");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
if (productList.size() > 0) {
System.out.printf("Results: Products found (color: '%s', size: '%s'): %d%n", color, size, productList.size());
System.out.printf(" First product: %s (%s)%n", productList.get(0).getProduct(), productList.get(0).getProductNumber());
} else {
System.out.printf("No products found with color '%s' and size '%s'%n", color, size);
}
System.out.println("");
examples.closeConnection();
}
}
view raw RunExamples.java hosted with ❤ by GitHub

Below, we see the results.

SQL Statement Performance

This post is certainly not about SQL performance, demonstrated by the fact I am only using Amazon RDS for SQL Server 2017 Express Edition on a single, very underpowered db.t2.micro Amazon RDS instance types. However, I have added a timer feature, ProcessTimer.java class, to capture the duration of time each example takes to return data, measured in milliseconds. The ProcessTimer.java class is part of the project code. Using the timer, you should observe significant differences between the first run and proceeding runs of the application for several of the called methods. The time difference is a result of several factors, primarily pre-compilation of the SQL statements and SQL Server plan caching.

The effects of these two factors are easily demonstrated by clearing the SQL Server plan cache (see SQL script below) using DBCC (Database Console Commands) statements. and then running the application twice in a row. The second time, pre-compilation and plan caching should result in significantly faster times for the prepared statements and callable statements, in Examples 2–6. In the two random runs shown below, we see up to a 497% improvement in query time.

USE AdventureWorks;
DBCC FREESYSTEMCACHE('SQL Plans');
GO
CHECKPOINT;
GO
-- Impossible to run with Amazon RDS for Microsoft SQL Server on AWS
-- DBCC DROPCLEANBUFFERS;
-- GO

The first run results are shown below.

SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 122
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 146
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 72
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 623
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 830
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 427
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---

The second run results are shown below.

SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 116
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 89
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 80
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 340
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 139
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 208
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---

Conclusion

This post has demonstrated several methods for querying and calling stored procedures from a SQL Server 2017 database using JDBC with the Microsoft JDBC Driver 8.4 for SQL Server. Although the examples are quite simple, the same patterns can be used with more complex stored procedures, with multiple input and output parameters, which not only select, but insert, update, and delete data.

There are some limitations of the Microsoft JDBC Driver for SQL Server you should be aware of by reading the documentation. However, for most tasks that require database interaction, the Driver provides adequate functionality with SQL Server.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

Leave a comment

Automating Multi-Environment Kubernetes Virtual Clusters with Google Cloud DNS, Auth0, and Istio 1.0

Kubernetes supports multiple virtual clusters within the same physical cluster. These virtual clusters are called Namespaces. Namespaces are a way to divide cluster resources between multiple users. Many enterprises use Namespaces to divide the same physical Kubernetes cluster into different virtual software development environments as part of their overall Software Development Lifecycle (SDLC). This practice is commonly used in ‘lower environments’ or ‘non-prod’ (not Production) environments. These environments commonly include Continous Integration and Delivery (CI/CD), Development, Integration, Testing/Quality Assurance (QA), User Acceptance Testing (UAT), Staging, Demo, and Hotfix. Namespaces provide a basic form of what is referred to as soft multi-tenancy.

Generally, the security boundaries and performance requirements between non-prod environments, within the same enterprise, are less restrictive than Production or Disaster Recovery (DR) environments. This allows for multi-tenant environments, while Production and DR are normally single-tenant environments. In order to approximate the performance characteristics of Production, the Performance Testing environment is also often isolated to a single-tenant. A typical enterprise would minimally have a non-prod, performance, production, and DR environment.

Using Namespaces to create virtual separation on the same physical Kubernetes cluster provides enterprises with more efficient use of virtual compute resources, reduces Cloud costs, eases the management burden, and often expedites and simplifies the release process.

Demonstration

In this post, we will re-examine the topic of virtual clusters, similar to the recent post, Managing Applications Across Multiple Kubernetes Environments with Istio: Part 1 and Part 2. We will focus specifically on automating the creation of the virtual clusters on GKE with Istio 1.0, managing the Google Cloud DNS records associated with the cluster’s environments, and enabling both HTTPS and token-based OAuth access to each environment. We will use the Storefront API for our demonstration, featured in the previous three posts, including Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine.

gke-routing.png

Source Code

The source code for this post may be found on the gke branch of the storefront-kafka-docker GitHub repository.

git clone --branch gke --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/storefront-kafka-docker.git

Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers, such as LinkedIn.

This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources.

Screen Shot 2019-01-19 at 11.49.31 AM.png

To follow along, you will need to register your own domain, arrange for an Auth0, or alternative, authentication and authorization service, and obtain an SSL/TLS certificate.

SSL/TLS Wildcard Certificate

In the recent post, Securing Your Istio Ingress Gateway with HTTPS, we examined how to create and apply an SSL/TLS certificate to our GKE cluster, to secure communications. Although we are only creating a non-prod cluster, it is more and more common to use SSL/TLS everywhere, especially in the Cloud. For this post, I have registered a single wildcard certificate, *.api.storefront-demo.com. This certificate will cover the three second-level subdomains associated with the virtual clusters: dev.api.storefront-demo.com, test.api.storefront-demo.com, and uat.api.storefront-demo.com. Setting the environment name, such as dev.*, as the second-level subdomain of my storefront-demo domain, following the first level api.* subdomain, makes the use of a wildcard certificate much easier.

screen_shot_2019-01-13_at_10.04.23_pm

As shown below, my wildcard certificate contains the Subject Name and Subject Alternative Name (SAN) of *.api.storefront-demo.com. For Production, api.storefront-demo.com, I prefer to use a separate certificate.

screen_shot_2019-01-13_at_10.36.33_pm_detail

Create GKE Cluster

With your certificate in hand, create the non-prod Kubernetes cluster. Below, the script creates a minimally-sized, three-node, multi-zone GKE cluster, running on GCP, with Kubernetes Engine cluster version 1.11.5-gke.5 and Istio on GKE version 1.0.3-gke.0. I have enabled the master authorized networks option to secure my GKE cluster master endpoint. For the demo, you can add your own IP address CIDR on line 9 (i.e. 1.2.3.4/32), or remove lines 30 – 31 to remove the restriction (gist).

  • Lines 16–39: Create a 3-node, multi-zone GKE cluster with Istio;
  • Line 48: Creates three non-prod Namespaces: dev, test, and uat;
  • Lines 51–53: Enable Istio automatic sidecar injection within each Namespace;
#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Create non-prod Kubernetes cluster on GKE
# Constants – CHANGE ME!
readonly PROJECT='gke-confluent-atlas'
readonly CLUSTER='storefront-api-non-prod'
readonly REGION='us-central1'
readonly MASTER_AUTH_NETS='<your_ip_cidr>'
readonly NAMESPACES=( 'dev' 'test' 'uat' )
# Build a 3-node, single-region, multi-zone GKE cluster
time gcloud beta container \
–project $PROJECT clusters create $CLUSTER \
–region $REGION \
–no-enable-basic-auth \
–no-issue-client-certificate \
–cluster-version "1.11.5-gke.5" \
–machine-type "n1-standard-2" \
–image-type "COS" \
–disk-type "pd-standard" \
–disk-size "100" \
–scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \
–num-nodes "1" \
–enable-stackdriver-kubernetes \
–enable-ip-alias \
–enable-master-authorized-networks \
–master-authorized-networks $MASTER_AUTH_NETS \
–network "projects/${PROJECT}/global/networks/default" \
–subnetwork "projects/${PROJECT}/regions/${REGION}/subnetworks/default" \
–default-max-pods-per-node "110" \
–addons HorizontalPodAutoscaling,HttpLoadBalancing,Istio \
–istio-config auth=MTLS_STRICT \
–metadata disable-legacy-endpoints=true \
–enable-autoupgrade \
–enable-autorepair
# Get cluster creds
gcloud container clusters get-credentials $CLUSTER \
–region $REGION –project $PROJECT
kubectl config current-context
# Create Namespaces
kubectl apply -f ./resources/other/namespaces.yaml
# Enable automatic Istio sidecar injection
for namespace in ${NAMESPACES[@]}; do
kubectl label namespace $namespace istio-injection=enabled
done

If successful, the results should look similar to the output, below.

screen_shot_2019-01-15_at_11.51.08_pm

The cluster will contain a pool of three minimally-sized VMs, the Kubernetes nodes.

screen_shot_2019-01-16_at_12.06.03_am

Deploying Resources

The Istio Gateway and three ServiceEntry resources are the primary resources responsible for routing the traffic from the ingress router to the Services, within the multiple Namespaces. Both of these resource types are new to Istio 1.0 (gist).

  • Lines 9–16: Port config that only accepts HTTPS traffic on port 443 using TLS;
  • Lines 18–20: The three subdomains being routed to the non-prod GKE cluster;
  • Lines 28, 63, 98: The three subdomains being routed to the non-prod GKE cluster;
  • Lines 39, 47, 65, 74, 82, 90, 109, 117, 125: Routing to FQDN of Storefront API Services within the three Namespaces;
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: storefront-gateway
spec:
selector:
istio: ingressgateway
servers:
port:
number: 443
name: https
protocol: HTTPS
tls:
mode: SIMPLE
serverCertificate: /etc/istio/ingressgateway-certs/tls.crt
privateKey: /etc/istio/ingressgateway-certs/tls.key
hosts:
dev.api.storefront-demo.com
test.api.storefront-demo.com
uat.api.storefront-demo.com
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-dev
spec:
hosts:
dev.api.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.dev.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.dev.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.dev.svc.cluster.local
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-test
spec:
hosts:
test.api.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.test.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.test.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.test.svc.cluster.local
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-uat
spec:
hosts:
uat.api.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.uat.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.uat.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.uat.svc.cluster.local

view raw
istio-gateway.yaml
hosted with ❤ by GitHub

Next, deploy the Istio and Kubernetes resources to the new GKE cluster. For the sake of brevity, we will deploy the same number of instances and the same version of each the three Storefront API services (Accounts, Orders, Fulfillment) to each of the three non-prod environments (dev, test, uat). In reality, you would have varying numbers of instances of each service, and each environment would contain progressive versions of each service, as part of the SDLC of each microservice (gist).

  • Lines 13–14: Deploy the SSL/TLS certificate and the private key;
  • Line 17: Deploy the Istio Gateway and three ServiceEntry resources;
  • Lines 20–22: Deploy the Istio Authentication Policy resources each Namespace;
  • Lines 26–37: Deploy the same set of resources to the dev, test, and uat Namespaces;
#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Deploy Kubernetes/Istio resources
# Constants – CHANGE ME!
readonly CERT_PATH=~/Documents/Articles/gke-kafka/sslforfree_non_prod
readonly NAMESPACES=( 'dev' 'test' 'uat' )
# Kubernetes Secret to hold the server’s certificate and private key
kubectl create -n istio-system secret tls istio-ingressgateway-certs \
–key $CERT_PATH/private.key –cert $CERT_PATH/certificate.crt
# Istio Gateway and three ServiceEntry resources
kubectl apply -f ./resources/other/istio-gateway.yaml
# End-user auth applied per environment
kubectl apply -f ./resources/other/auth-policy-dev.yaml
kubectl apply -f ./resources/other/auth-policy-test.yaml
kubectl apply -f ./resources/other/auth-policy-uat.yaml
# Loop through each non-prod Namespace (environment)
# Re-use same resources (incld. credentials) for all environments, just for the demo
for namespace in ${NAMESPACES[@]}; do
kubectl apply -n $namespace -f ./resources/config/confluent-cloud-kafka-configmap.yaml
kubectl apply -n $namespace -f ./resources/config/mongodb-atlas-secret.yaml
kubectl apply -n $namespace -f ./resources/config/confluent-cloud-kafka-secret.yaml
kubectl apply -n $namespace -f ./resources/other/mongodb-atlas-external-mesh.yaml
kubectl apply -n $namespace -f ./resources/other/confluent-cloud-external-mesh.yaml
kubectl apply -n $namespace -f ./resources/services/accounts.yaml
kubectl apply -n $namespace -f ./resources/services/fulfillment.yaml
kubectl apply -n $namespace -f ./resources/services/orders.yaml
done

The deployed Storefront API Services should look as follows.

screen_shot_2019-01-13_at_7.16.03_pm

Google Cloud DNS

Next, we need to enable DNS access to the GKE cluster using Google Cloud DNS. According to Google, Cloud DNS is a scalable, reliable and managed authoritative Domain Name System (DNS) service running on the same infrastructure as Google. It has low latency, high availability, and is a cost-effective way to make your applications and services available to your users.

Whenever a new GKE cluster is created, a new Network Load Balancer is also created. By default, the load balancer’s front-end is an external IP address.

screen_shot_2019-01-15_at_11.56.01_pm.png

Using a forwarding rule, traffic directed at the external IP address is redirected to the load balancer’s back-end. The load balancer’s back-end is comprised of three VM instances, which are the three Kubernete nodes in the GKE cluster.

screen_shot_2019-01-15_at_11.56.19_pm

If you are following along with this post’s demonstration, we will assume you have a domain registered and configured with Google Cloud DNS. I am using the storefront-demo.com domain, which I have used in the last three posts to demonstrate Istio and GKE.

Google Cloud DNS has a fully functional web console, part of the Google Cloud Console. However, using the Cloud DNS web console is impractical in a DevOps CI/CD workflow, where Kubernetes clusters, Namespaces, and Workloads are ephemeral. Therefore we will use the following script. Within the script, we reset the IP address associated with the A records for each non-prod subdomains associated with storefront-demo.com domain (gist).

  • Lines 23–25: Find the previous load balancer’s front-end IP address;
  • Lines 27–29: Find the new load balancer’s front-end IP address;
  • Line 35: Start the Cloud DNS transaction;
  • Lines 37–47: Add the DNS record changes to the transaction;
  • Line 49: Execute the Cloud DNS transaction;
#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Update Cloud DNS A Records
# Constants – CHANGE ME!
readonly PROJECT='gke-confluent-atlas'
readonly DOMAIN='storefront-demo.com'
readonly ZONE='storefront-demo-com-zone'
readonly REGION='us-central1'
readonly TTL=300
readonly RECORDS=('dev' 'test' 'uat')
# Make sure any old load balancers were removed
if [ $(gcloud compute forwarding-rules list –filter "region:($REGION)" | wc -l | awk '{$1=$1};1') -gt 2 ]; then
echo "More than one load balancer detected, exiting script."
exit 1
fi
# Get load balancer IP address from first record
readonly OLD_IP=$(gcloud dns record-sets list \
–filter "name=${RECORDS[0]}.api.${DOMAIN}." –zone $ZONE \
| awk 'NR==2 {print $4}')
readonly NEW_IP=$(gcloud compute forwarding-rules list \
–filter "region:($REGION)" \
| awk 'NR==2 {print $3}')
echo "Old LB IP Address: ${OLD_IP}"
echo "New LB IP Address: ${NEW_IP}"
# Update DNS records
gcloud dns record-sets transaction start –zone $ZONE
for record in ${RECORDS[@]}; do
echo "${record}.api.${DOMAIN}."
gcloud dns record-sets transaction remove \
–name "${record}.api.${DOMAIN}." –ttl $TTL \
–type A –zone $ZONE "${OLD_IP}"
gcloud dns record-sets transaction add \
–name "${record}.api.${DOMAIN}." –ttl $TTL \
–type A –zone $ZONE "${NEW_IP}"
done
gcloud dns record-sets transaction execute –zone $ZONE

The outcome of the script is shown below. Note how changes are executed as part of a transaction, by automatically creating a transaction.yaml file. The file contains the six DNS changes, three additions and three deletions. The command executes the transaction and then deletes the transaction.yaml file.

> sh ./part3_set_cloud_dns.sh
Old LB IP Address: 35.193.208.115
New LB IP Address: 35.238.196.231

Transaction started [transaction.yaml].

dev.api.storefront-demo.com.
Record removal appended to transaction at [transaction.yaml].
Record addition appended to transaction at [transaction.yaml].

test.api.storefront-demo.com.
Record removal appended to transaction at [transaction.yaml].
Record addition appended to transaction at [transaction.yaml].

uat.api.storefront-demo.com.
Record removal appended to transaction at [transaction.yaml].
Record addition appended to transaction at [transaction.yaml].

Executed transaction [transaction.yaml] for managed-zone [storefront-demo-com-zone].
Created [https://www.googleapis.com/dns/v1/projects/gke-confluent-atlas/managedZones/storefront-demo-com-zone/changes/53].

ID  START_TIME                STATUS
55  2019-01-16T04:54:14.984Z  pending

Based on my own domain and cluster details, the transaction.yaml file looks as follows. Again, note the six DNS changes, three additions, followed by three deletions (gist).

additions:
kind: dns#resourceRecordSet
name: storefront-demo.com.
rrdatas:
ns-cloud-a1.googledomains.com. cloud-dns-hostmaster.google.com. 25 21600 3600
259200 300
ttl: 21600
type: SOA
kind: dns#resourceRecordSet
name: dev.api.storefront-demo.com.
rrdatas:
35.238.196.231
ttl: 300
type: A
kind: dns#resourceRecordSet
name: test.api.storefront-demo.com.
rrdatas:
35.238.196.231
ttl: 300
type: A
kind: dns#resourceRecordSet
name: uat.api.storefront-demo.com.
rrdatas:
35.238.196.231
ttl: 300
type: A
deletions:
kind: dns#resourceRecordSet
name: storefront-demo.com.
rrdatas:
ns-cloud-a1.googledomains.com. cloud-dns-hostmaster.google.com. 24 21600 3600
259200 300
ttl: 21600
type: SOA
kind: dns#resourceRecordSet
name: dev.api.storefront-demo.com.
rrdatas:
35.193.208.115
ttl: 300
type: A
kind: dns#resourceRecordSet
name: test.api.storefront-demo.com.
rrdatas:
35.193.208.115
ttl: 300
type: A
kind: dns#resourceRecordSet
name: uat.api.storefront-demo.com.
rrdatas:
35.193.208.115
ttl: 300
type: A

view raw
transactions.yaml
hosted with ❤ by GitHub

Confirm DNS Changes

Use the dig command to confirm the DNS records are now correct and that DNS propagation has occurred. The IP address returned by dig should be the external IP address assigned to the front-end of the Google Cloud Load Balancer.

> dig dev.api.storefront-demo.com +short
35.238.196.231

Or, all the three records.

echo \
  "dev.api.storefront-demo.com\n" \
  "test.api.storefront-demo.com\n" \
  "uat.api.storefront-demo.com" \
  > records.txt | dig -f records.txt +short

35.238.196.231
35.238.196.231
35.238.196.231

Optionally, more verbosely by removing the +short option.

> dig +nocmd dev.api.storefront-demo.com

;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 30763
;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 512
;; QUESTION SECTION:
;dev.api.storefront-demo.com.   IN  A

;; ANSWER SECTION:
dev.api.storefront-demo.com. 299 IN A   35.238.196.231

;; Query time: 27 msec
;; SERVER: 8.8.8.8#53(8.8.8.8)
;; WHEN: Wed Jan 16 18:00:49 EST 2019
;; MSG SIZE  rcvd: 72

The resulting records in the Google Cloud DNS management console should look as follows.

screen_shot_2019-01-15_at_11.57.12_pm

JWT-based Authentication

As discussed in the previous post, Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0, it is typical to limit restrict access to the Kubernetes cluster, Namespaces within the cluster, or Services running within Namespaces to end-users, whether they are humans or other applications. In that previous post, we saw an example of applying a machine-to-machine (M2M) Istio Authentication Policy to only the uat Namespace. This scenario is common when you want to control access to resources in non-production environments, such as UAT, to outside test teams, accessing the uat Namespace through an external application. To simulate this scenario, we will apply the following Istio Authentication Policy to the uat Namespace. (gist).

apiVersion: authentication.istio.io/v1alpha1
kind: Policy
metadata:
name: default
namespace: uat
spec:
peers:
mtls: {}
origins:
jwt:
audiences:
"storefront-api-uat"
issuer: "https://storefront-demo.auth0.com/"
jwksUri: "https://storefront-demo.auth0.com/.well-known/jwks.json"
principalBinding: USE_ORIGIN

view raw
auth-policy-uat.yaml
hosted with ❤ by GitHub

For the dev and test Namespaces, we will apply an additional, different Istio Authentication Policy. This policy will protect against the possibility of dev and test M2M API consumers interfering with uat M2M API consumers and vice-versa. Below is the dev and test version of the Policy (gist).

apiVersion: authentication.istio.io/v1alpha1
kind: Policy
metadata:
name: default
namespace: dev
spec:
peers:
mtls: {}
origins:
jwt:
audiences:
"storefront-api-dev-test"
issuer: "https://storefront-demo.auth0.com/"
jwksUri: "https://storefront-demo.auth0.com/.well-known/jwks.json"
principalBinding: USE_ORIGIN

view raw
auth-policy-dev.yaml
hosted with ❤ by GitHub

Testing Authentication

Using Postman, with the ‘Bearer Token’ type authentication method, as detailed in the previous post, a call a Storefront API resource in the uat Namespace should succeed. This also confirms DNS and HTTPS are working properly.

screen_shot_2019-01-15_at_11.58.41_pm

The dev and test Namespaces require different authentication. Trying to use no Authentication, or authenticating as a UAT API consumer, will result in a 401 Unauthorized HTTP status, along with the Origin authentication failed. error message.

screen_shot_2019-01-16_at_12.00.55_am

Conclusion

In this brief post, we demonstrated how to create a GKE cluster with Istio 1.0.x, containing three virtual clusters, or Namespaces. Each Namespace represents an environment, which is part of an application’s SDLC. We enforced HTTP over TLS (HTTPS) using a wildcard SSL/TLS certificate. We also enforced end-user authentication using JWT-based OAuth 2.0 with Auth0. Lastly, we provided user-friendly DNS routing to each environment, using Google Cloud DNS. Short of a fully managed API Gateway, like Apigee, and automating the execution of the scripts with Jenkins or Spinnaker, this cluster is ready to provide a functional path to Production for developing our Storefront API.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , ,

2 Comments

Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine

Leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications. In previous posts, we have integrated other SaaS products, including as MongoDB Atlas fully-managed MongoDB-as-a-service, ElephantSQL fully-manage PostgreSQL-as-a-service, and CloudAMQP RabbitMQ-as-a-service, into cloud-native applications on Azure, AWS, GCP, and PCF.

In this post, we will build and deploy an existing, Spring Framework, microservice-based, cloud-native API to Google Kubernetes Engine (GKE), replete with Istio 1.0, on Google Cloud Platform (GCP). The API will rely on Confluent Cloud to provide a fully-managed, Kafka-based messaging-as-a-service (MaaS). Similarly, the API will rely on MongoDB Atlas to provide a fully-managed, MongoDB-based Database-as-a-service (DBaaS).

Background

In a previous two-part post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1 and Part 2, we examined the role of Apache Kafka in an event-driven, eventually consistent, distributed system architecture. The system, an online storefront RESTful API simulation, was composed of multiple, Java Spring Boot microservices, each with their own MongoDB database. The microservices used a publish/subscribe model to communicate with each other using Kafka-based messaging. The Spring services were built using the Spring for Apache Kafka and Spring Data MongoDB projects.

Given the use case of placing an order through the Storefront API, we examined the interactions of three microservices, the Accounts, Fulfillment, and Orders service. We examined how the three services used Kafka to communicate state changes to each other, in a fully-decoupled manner.

The Storefront API’s microservices were managed behind an API Gateway, Netflix’s Zuul. Service discovery and load balancing were handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. In that post, the entire containerized system was deployed to Docker Swarm.

Kafka-Eventual-Cons-Swarm.png

Developing the services, not operationalizing the platform, was the primary objective of the previous post.

Featured Technologies

The following technologies are featured prominently in this post.

Confluent Cloud

confluent_cloud_apache-300x228

In May 2018, Google announced a partnership with Confluence to provide Confluent Cloud on GCP, a managed Apache Kafka solution for the Google Cloud Platform. Confluent, founded by the creators of Kafka, Jay Kreps, Neha Narkhede, and Jun Rao, is known for their commercial, Kafka-based streaming platform for the Enterprise.

Confluent Cloud is a fully-managed, cloud-based streaming service based on Apache Kafka. Confluent Cloud delivers a low-latency, resilient, scalable streaming service, deployable in minutes. Confluent deploys, upgrades, and maintains your Kafka clusters. Confluent Cloud is currently available on both AWS and GCP.

Confluent Cloud offers two plans, Professional and Enterprise. The Professional plan is optimized for projects under development, and for smaller organizations and applications. Professional plan rates for Confluent Cloud start at $0.55/hour. The Enterprise plan adds full enterprise capabilities such as service-level agreements (SLAs) with a 99.95% uptime and virtual private cloud (VPC) peering. The limitations and supported features of both plans are detailed, here.

MongoDB Atlas

mongodb

Similar to Confluent Cloud, MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime SLAs, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

MongoDB Atlas has been featured in several past posts, including Deploying and Configuring Istio on Google Kubernetes Engine (GKE) and Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.

Kubernetes Engine

gkeAccording to Google, Google Kubernetes Engine (GKE) provides a fully-managed, production-ready Kubernetes environment for deploying, managing, and scaling your containerized applications using Google infrastructure. GKE consists of multiple Google Compute Engine instances, grouped together to form a cluster.

A forerunner to other managed Kubernetes platforms, like EKS (AWS), AKS (Azure), PKS (Pivotal), and IBM Cloud Kubernetes Service, GKE launched publicly in 2015. GKE was built on Google’s experience of running hyper-scale services like Gmail and YouTube in containers for over 12 years.

GKE’s pricing is based on a pay-as-you-go, per-second-billing plan, with no up-front or termination fees, similar to Confluent Cloud and MongoDB Atlas. Cluster sizes range from 1 – 1,000 nodes. Node machine types may be optimized for standard workloads, CPU, memory, GPU, or high-availability. Compute power ranges from 1 – 96 vCPUs and memory from 1 – 624 GB of RAM.

Demonstration

In this post, we will deploy the three Storefront API microservices to a GKE cluster on GCP. Confluent Cloud on GCP will replace the previous Docker-based Kafka implementation. Similarly, MongoDB Atlas will replace the previous Docker-based MongoDB implementation.

ConfluentCloud-v3a.png

Kubernetes and Istio 1.0 will replace Netflix’s Zuul and  Eureka for API management, load-balancing, routing, and service discovery. Google Stackdriver will provide logging and monitoring. Docker Images for the services will be stored in Google Container Registry. Although not fully operationalized, the Storefront API will be closer to a Production-like platform, than previously demonstrated on Docker Swarm.

ConfluentCloudRouting.png

For brevity, we will not enable standard API security features like HTTPS, OAuth for authentication, and request quotas and throttling, all of which are essential in Production. Nor, will we integrate a full lifecycle API management tool, like Google Apigee.

Source Code

The source code for this demonstration is contained in four separate GitHub repositories, storefront-kafka-dockerstorefront-demo-accounts, storefront-demo-orders, and, storefront-demo-fulfillment. However, since the Docker Images for the three storefront services are available on Docker Hub, it is only necessary to clone the storefront-kafka-docker project. This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources (gist).

git clone –branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/storefront-kafka-docker.git
# optional repositories
git clone –branch gke –single-branch –depth 1 –no-tags \
https://github.com/garystafford/storefront-demo-accounts.git
git clone –branch gke –single-branch –depth 1 –no-tags \
https://github.com/garystafford/storefront-demo-orders.git
git clone –branch gke –single-branch –depth 1 –no-tags \
https://github.com/garystafford/storefront-demo-fulfillment.git

Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

Setup Process

The setup of the Storefront API platform is divided into a few logical steps:

  1. Create the MongoDB Atlas cluster;
  2. Create the Confluent Cloud Kafka cluster;
  3. Create Kafka topics;
  4. Modify the Kubernetes resources;
  5. Modify the microservices to support Confluent Cloud configuration;
  6. Create the GKE cluster with Istio on GCP;
  7. Apply the Kubernetes resources to the GKE cluster;
  8. Test the Storefront API, Kafka, and MongoDB are functioning properly;

MongoDB Atlas Cluster

This post assumes you already have a MongoDB Atlas account and an existing project created. MongoDB Atlas accounts are free to set up if you do not already have one. Account creation does require the use of a Credit Card.

For minimal latency, we will be creating the MongoDB Atlas, Confluent Cloud Kafka, and GKE clusters, all on the Google Cloud Platform’s us-central1 Region. Available GCP Regions and Zones for MongoDB Atlas, Confluent Cloud, and GKE, vary, based on multiple factors.

screen_shot_2018-12-23_at_6.48.12_pm

For this demo, I suggest creating a free, M0-sized MongoDB cluster. The M0-sized 3-data node cluster, with shared RAM and 512 MB of storage, and currently running MongoDB 4.0.4, is fine for individual development. The us-central1 Region is the only available US Region for the free-tier M0-cluster on GCP. An M0-sized Atlas cluster may take between 7-10 minutes to provision.

screen_shot_2018-12-23_at_6.49.24_pm

MongoDB Atlas’ Web-based management console provides convenient links to cluster details, metrics, alerts, and documentation.

screen_shot_2018-12-23_at_6.51.41_pm

Once the cluster is ready, you can review details about the cluster and each individual cluster node.

screen_shot_2018-12-23_at_6.51.54_pm

In addition to the account owner, create a demo_user account. This account will be used to authenticate and connect with the MongoDB databases from the storefront services. For this demo, we will use the same, single user account for all three services. In Production, you would most likely have individual users for each service.

screen_shot_2018-12-23_at_6.52.18_pm

Again, for security purposes, Atlas requires you to whitelist the IP address or CIDR block from which the storefront services will connect to the cluster. For now, open the access to your specific IP address using whatsmyip.com, or much less-securely, to all IP addresses (0.0.0.0/0). Once the GKE cluster and external static IP addresses are created, make sure to come back and update this value; do not leave this wide open to the Internet.

screen_shot_2018-12-23_at_6.52.36_pm

The Java Spring Boot storefront services use a Spring Profile, gke. According to Spring, Spring Profiles provide a way to segregate parts of your application configuration and make it available only in certain environments. The gke Spring Profile’s configuration values may be set in a number of ways. For this demo, the majority of the values will be set using Kubernetes Deployment, ConfigMap and Secret resources, shown later.

The first two Spring configuration values will need are the MongoDB Atlas cluster’s connection string and the demo_user account password. Note these both for later use.

screen_shot_2018-12-23_at_6.53.00_pm

Confluent Cloud Kafka Cluster

Similar to MongoDB Atlas, this post assumes you already have a Confluent Cloud account and an existing project. It is free to set up a Professional account and a new project if you do not already have one. Atlas account creation does require the use of a Credit Card.

The Confluent Cloud web-based management console is shown below. Experienced users of other SaaS platforms may find the Confluent Cloud web-based console a bit sparse on features. In my opinion, the console lacks some necessary features, like cluster observability, individual Kafka topic management, detailed billing history (always says $0?), and persistent history of cluster activities, which survives cluster deletion. It seems like Confluent prefers users to download and configure their Confluent Control Center to get the functionality you might normally expect from a web-based Saas management tool.

screen_shot_2018-12-23_at_6.34.18_pm

As explained earlier, for minimal latency, I suggest creating the MongoDB Atlas cluster, Confluent Cloud Kafka cluster, and the GKE cluster, all on the Google Cloud Platform’s us-central1 Region. For this demo, choose the smallest cluster size available on GCP, in the us-central1 Region, with 1 MB/s R/W throughput and 500 MB of storage. As shown below, the cost will be approximately $0.55/hour. Don’t forget to delete this cluster when you are done with the demonstration, or you will continue to be charged.

screen_shot_2018-12-23_at_6.34.56_pm

Cluster creation of the minimally-sized Confluent Cloud cluster is pretty quick.

screen_shot_2018-12-23_at_6.39.52_pmOnce the cluster is ready, Confluent provides instructions on how to interact with the cluster via the Confluent Cloud CLI. Install the Confluent Cloud CLI, locally, for use later.

screen_shot_2018-12-23_at_6.35.56_pm

As explained earlier, the Java Spring Boot storefront services use a Spring Profile, gke. Like MongoDB Atlas, the Confluent Cloud Kafka cluster configuration values will be set using Kubernetes ConfigMap and Secret resources, shown later. There are several Confluent Cloud Java configuration values shown in the Client Config Java tab; we will need these for later use.

screen_shot_2018-12-23_at_6.36.12_pm

SASL and JAAS

Some users may not be familiar with the terms, SASL and JAAS. According to Wikipedia, Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols. According to Confluent, Kafka brokers support client authentication via SASL. SASL authentication can be enabled concurrently with SSL encryption (SSL client authentication will be disabled).

There are numerous SASL mechanisms.  The PLAIN SASL mechanism (SASL/PLAIN), used by Confluent, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use. The SASL/PLAIN mechanism should only be used with SSL as a transport layer to ensure that clear passwords are not transmitted on the wire without encryption.

According to Wikipedia, Java Authentication and Authorization Service (JAAS) is the Java implementation of the standard Pluggable Authentication Module (PAM) information security framework. According to Confluent, Kafka uses the JAAS for SASL configuration. You must provide JAAS configurations for all SASL authentication mechanisms.

Cluster Authentication

Similar to MongoDB Atlas, we need to authenticate with the Confluent Cloud cluster from the storefront services. The authentication to Confluent Cloud is done with an API Key. Create a new API Key, and note the Key and Secret; these two additional pieces of configuration will be needed later.

screen_shot_2018-12-23_at_6.38.09_pm

Confluent Cloud API Keys can be created and deleted as necessary. For security in Production, API Keys should be created for each service and regularly rotated.

screen_shot_2018-12-23_at_6.38.21_pm

Kafka Topics

With the cluster created, create the storefront service’s three Kafka topics manually, using the Confluent Cloud’s ccloud CLI tool. First, configure the Confluent Cloud CLI using the ccloud init command, using your new cluster’s Bootstrap Servers address, API Key, and API Secret. The instructions are shown above Clusters Client Config tab of the Confluent Cloud web-based management interface.

screen_shot_2018-12-26_at_2.05.09_pm

Create the storefront service’s three Kafka topics using the ccloud topic create command. Use the list command to confirm they are created.

# manually create kafka topics
ccloud topic create accounts.customer.change
ccloud topic create fulfillment.order.change
ccloud topic create orders.order.fulfill
  
# list kafka topics
ccloud topic list
  
accounts.customer.change
fulfillment.order.change
orders.order.fulfill

Another useful ccloud command, topic describe, displays topic replication details. The new topics will have a replication factor of 3 and a partition count of 12.

screen_shot_2018-12-26_at_5.03.11_pm

Adding the --verbose flag to the command, ccloud --verbose topic describe, displays low-level topic and cluster configuration details, as well as a log of all topic-related activities.

screen_shot_2018-12-26_at_5.07.20_pm

Kubernetes Resources

The deployment of the three storefront microservices to the dev Namespace will minimally require the following Kubernetes configuration resources.

  • (1) Kubernetes Namespace;
  • (3) Kubernetes Deployments;
  • (3) Kubernetes Services;
  • (1) Kubernetes ConfigMap;
  • (2) Kubernetes Secrets;
  • (1) Istio 1.0 Gateway;
  • (1) Istio 1.0 VirtualService;
  • (2) Istio 1.0 ServiceEntry;

The Istio networking.istio.io v1alpha3 API introduced the last three configuration resources in the list, to control traffic routing into, within, and out of the mesh. There are a total of four new io networking.istio.io v1alpha3 API routing resources: Gateway, VirtualService, DestinationRule, and ServiceEntry.

Creating and managing such a large number of resources is a common complaint regarding the complexity of Kubernetes. Imagine the resource sprawl when you have dozens of microservices replicated across several namespaces. Fortunately, all resource files for this post are included in the storefront-kafka-docker project’s gke directory.

To follow along with the demo, you will need to make minor modifications to a few of these resources, including the Istio Gateway, Istio VirtualService, two Istio ServiceEntry resources, and two Kubernetes Secret resources.

Istio Gateway & VirtualService

Both the Istio Gateway and VirtualService configuration resources are contained in a single file, istio-gateway.yaml. For the demo, I am using a personal domain, storefront-demo.com, along with the sub-domain, api.dev, to host the Storefront API. The domain’s primary A record (‘@’) and sub-domain A record are both associated with the external IP address on the frontend of the load balancer. In the file, this host is configured for the Gateway and VirtualService resources. You can choose to replace the host with your own domain, or simply remove the host block altogether on lines 13–14 and 21–22. Removing the host blocks, you would then use the external IP address on the frontend of the load balancer (explained later in the post) to access the Storefront API (gist).

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: storefront-gateway
spec:
selector:
istio: ingressgateway
servers:
port:
number: 80
name: http
protocol: HTTP
hosts:
api.dev.storefront-demo.com
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-dev
spec:
hosts:
api.dev.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.dev.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.dev.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.dev.svc.cluster.local

view raw
istio-gateway.yaml
hosted with ❤ by GitHub

Istio ServiceEntry

There are two Istio ServiceEntry configuration resources. Both ServiceEntry resources control egress traffic from the Storefront API services, both of their ServiceEntry Location items are set to MESH_INTERNAL. The first ServiceEntry, mongodb-atlas-external-mesh.yaml, defines MongoDB Atlas cluster egress traffic from the Storefront API (gist).

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: mongdb-atlas-external-mesh
spec:
hosts:
<your_atlas_url.gcp.mongodb.net>
ports:
name: mongo
number: 27017
protocol: MONGO
location: MESH_EXTERNAL
resolution: NONE

The other ServiceEntry, confluent-cloud-external-mesh.yaml, defines Confluent Cloud Kafka cluster egress traffic from the Storefront API (gist).

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: confluent-cloud-external-mesh
spec:
hosts:
<your_cluster_url.us-central1.gcp.confluent.cloud>
ports:
name: kafka
number: 9092
protocol: TLS
location: MESH_EXTERNAL
resolution: NONE

Both need to have their host items replaced with the appropriate Atlas and Confluent URLs.

Inspecting Istio Resources

The easiest way to view Istio resources is from the command line using the istioctl and kubectl CLI tools.

istioctl get gateway
istioctl get virtualservices
istioctl get serviceentry
  
kubectl describe gateway
kubectl describe virtualservices
kubectl describe serviceentry

Multiple Namespaces

In this demo, we are only deploying to a single Kubernetes Namespace, dev. However, Istio will also support routing traffic to multiple namespaces. For example, a typical non-prod Kubernetes cluster might support devtest, and uat, each associated with a different sub-domain. One way to support multiple Namespaces with Istio 1.0 is to add each host to the Istio Gateway (lines 14–16, below), then create a separate Istio VirtualService for each Namespace. All the VirtualServices are associated with the single Gateway. In the VirtualService, each service’s host address is the fully qualified domain name (FQDN) of the service. Part of the FQDN is the Namespace, which we change for each for each VirtualService (gist).

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: storefront-gateway
spec:
selector:
istio: ingressgateway
servers:
port:
number: 80
name: http
protocol: HTTP
hosts:
api.dev.storefront-demo.com
api.test.storefront-demo.com
api.uat.storefront-demo.com
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-dev
spec:
hosts:
api.dev.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.dev.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.dev.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.dev.svc.cluster.local
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-test
spec:
hosts:
api.test.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.test.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.test.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.test.svc.cluster.local
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: storefront-uat
spec:
hosts:
api.uat.storefront-demo.com
gateways:
storefront-gateway
http:
match:
uri:
prefix: /accounts
route:
destination:
port:
number: 8080
host: accounts.uat.svc.cluster.local
match:
uri:
prefix: /fulfillment
route:
destination:
port:
number: 8080
host: fulfillment.uat.svc.cluster.local
match:
uri:
prefix: /orders
route:
destination:
port:
number: 8080
host: orders.uat.svc.cluster.local

MongoDB Atlas Secret

There is one Kubernetes Secret for the sensitive MongoDB configuration and one Secret for the sensitive Confluent Cloud configuration. The Kubernetes Secret object type is intended to hold sensitive information, such as passwords, OAuth tokens, and SSH keys.

The mongodb-atlas-secret.yaml file contains the MongoDB Atlas cluster connection string, with the demo_user username and password, one for each of the storefront service’s databases (gist).

apiVersion: v1
kind: Secret
metadata:
name: mongodb-atlas
namespace: dev
type: Opaque
data:
mongodb.uri.accounts: your_base64_encoded_value
mongodb.uri.fulfillment: your_base64_encoded_value
mongodb.uri.orders: your_base64_encoded_value

Kubernetes Secrets are Base64 encoded. The easiest way to encode the secret values is using the Linux base64 program. The base64 program encodes and decodes Base64 data, as specified in RFC 4648. Pass each MongoDB URI string to the base64 program using echo -n.

MONGODB_URI=mongodb+srv://demo_user:your_password@your_cluster_address/accounts?retryWrites=true
echo -n $MONGODB_URI | base64

bW9uZ29kYitzcnY6Ly9kZW1vX3VzZXI6eW91cl9wYXNzd29yZEB5b3VyX2NsdXN0ZXJfYWRkcmVzcy9hY2NvdW50cz9yZXRyeVdyaXRlcz10cnVl

Repeat this process for the three MongoDB connection strings.

screen_shot_2018-12-26_at_2.15.21_pm

Confluent Cloud Secret

The confluent-cloud-kafka-secret.yaml file contains two data fields in the Secret’s data map, bootstrap.servers and sasl.jaas.config. These configuration items were both listed in the Client Config Java tab of the Confluent Cloud web-based management console, as shown previously. The sasl.jaas.config data field requires the Confluent Cloud cluster API Key and Secret you created earlier. Again, use the base64 encoding process for these two data fields (gist).

apiVersion: v1
kind: Secret
metadata:
name: confluent-cloud-kafka
namespace: dev
type: Opaque
data:
bootstrap.servers: your_base64_encoded_value
sasl.jaas.config: your_base64_encoded_value

Confluent Cloud ConfigMap

The remaining five Confluent Cloud Kafka cluster configuration values are not sensitive, and therefore, may be placed in a Kubernetes ConfigMapconfluent-cloud-kafka-configmap.yaml (gist).

apiVersion: v1
kind: ConfigMap
metadata:
name: confluent-cloud-kafka
data:
ssl.endpoint.identification.algorithm: "https"
sasl.mechanism: "PLAIN"
request.timeout.ms: "20000"
retry.backoff.ms: "500"
security.protocol: "SASL_SSL"

Accounts Deployment Resource

To see how the services consume the ConfigMap and Secret values, review the Accounts Deployment resource, shown below. Note the environment variables section, on lines 44–90, are a mix of hard-coded values and values referenced from the ConfigMap and two Secrets, shown above (gist).

apiVersion: v1
kind: Service
metadata:
name: accounts
labels:
app: accounts
spec:
ports:
name: http
port: 8080
selector:
app: accounts
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: accounts
labels:
app: accounts
spec:
replicas: 2
strategy:
type: Recreate
selector:
matchLabels:
app: accounts
template:
metadata:
labels:
app: accounts
annotations:
sidecar.istio.io/inject: "true"
spec:
containers:
name: accounts
image: garystafford/storefront-accounts:gke-2.2.0
resources:
requests:
memory: "250M"
cpu: "100m"
limits:
memory: "400M"
cpu: "250m"
env:
name: SPRING_PROFILES_ACTIVE
value: "gke"
name: SERVER_SERVLET_CONTEXT-PATH
value: "/accounts"
name: LOGGING_LEVEL_ROOT
value: "INFO"
name: SPRING_DATA_MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb-atlas
key: mongodb.uri.accounts
name: SPRING_KAFKA_BOOTSTRAP-SERVERS
valueFrom:
secretKeyRef:
name: confluent-cloud-kafka
key: bootstrap.servers
name: SPRING_KAFKA_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: ssl.endpoint.identification.algorithm
name: SPRING_KAFKA_PROPERTIES_SASL_MECHANISM
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: sasl.mechanism
name: SPRING_KAFKA_PROPERTIES_REQUEST_TIMEOUT_MS
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: request.timeout.ms
name: SPRING_KAFKA_PROPERTIES_RETRY_BACKOFF_MS
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: retry.backoff.ms
name: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG
valueFrom:
secretKeyRef:
name: confluent-cloud-kafka
key: sasl.jaas.config
name: SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL
valueFrom:
configMapKeyRef:
name: confluent-cloud-kafka
key: security.protocol
ports:
containerPort: 8080
imagePullPolicy: IfNotPresent

view raw
accounts.yaml
hosted with ❤ by GitHub

Modify Microservices for Confluent Cloud

As explained earlier, Confluent Cloud’s Kafka cluster requires some very specific configuration, based largely on the security features of Confluent Cloud. Connecting to Confluent Cloud requires some minor modifications to the existing storefront service source code. The changes are identical for all three services. To understand the service’s code, I suggest reviewing the previous post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1. Note the following changes are already made to the source code in the gke git branch, and not necessary for this demo.

The previous Kafka SenderConfig and ReceiverConfig Java classes have been converted to Java interfaces. There are four new SenderConfigConfluent, SenderConfigNonConfluent, ReceiverConfigConfluent, and ReceiverConfigNonConfluent classes, which implement one of the new interfaces. The new classes contain the Spring Boot Profile class-level annotation. One set of Sender and Receiver classes are assigned the @Profile("gke") annotation, and the others, the @Profile("!gke") annotation. When the services start, one of the two class implementations are is loaded, depending on the Active Spring Profile, gke or not gke. To understand the changes better, examine the Account service’s SenderConfigConfluent.java file (gist).

Line 20: Designates this class as belonging to the gke Spring Profile.

Line 23: The class now implements an interface.

Lines 25–44: Reference the Confluent Cloud Kafka cluster configuration. The values for these variables will come from the Kubernetes ConfigMap and Secret, described previously, when the services are deployed to GKE.

Lines 55–59: Additional properties that have been added to the Kafka Sender configuration properties, specifically for Confluent Cloud.

package com.storefront.config;
import com.storefront.kafka.Sender;
import com.storefront.model.CustomerChangeEvent;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Profile("gke")
@Configuration
@EnableKafka
public class SenderConfigConfluent implements SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.properties.ssl.endpoint.identification.algorithm}")
private String sslEndpointIdentificationAlgorithm;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.properties.request.timeout.ms}")
private String requestTimeoutMs;
@Value("${spring.kafka.properties.retry.backoff.ms}")
private String retryBackoffMs;
@Value("${spring.kafka.properties.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String saslJaasConfig;
@Override
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);
return props;
}
@Override
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Override
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Override
@Bean
public Sender sender() {
return new Sender();
}
}

Once code changes were completed and tested, the Docker Image for each service was rebuilt and uploaded to Docker Hub for public access. When recreating the images, the version of the Java Docker base image was upgraded from the previous post to Alpine OpenJDK 12 (openjdk:12-jdk-alpine).

Google Kubernetes Engine (GKE) with Istio

Having created the MongoDB Atlas and Confluent Cloud clusters, built the Kubernetes and Istio resources, modified the service’s source code, and pushed the new Docker Images to Docker Hub, the GKE cluster may now be built.

For the sake of brevity, we will manually create the cluster and deploy the resources, using the Google Cloud SDK gcloud and Kubernetes kubectl CLI tools, as opposed to automating with CI/CD tools, like Jenkins or Spinnaker. For this demonstration, I suggest a minimally-sized two-node GKE cluster using n1-standard-2 machine-type instances. The latest available release of Kubernetes on GKE at the time of this post was 1.11.5-gke.5 and Istio 1.03 (Istio on GKE still considered beta). Note Kubernetes and Istio are evolving rapidly, thus the configuration flags often change with newer versions. Check the GKE Clusters tab for the latest clusters create command format (gist).

#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Create non-prod Kubernetes cluster on GKE
# Constants – CHANGE ME!
readonly NAMESPACE='dev'
readonly PROJECT='gke-confluent-atlas'
readonly CLUSTER='storefront-api'
readonly REGION='us-central1'
readonly ZONE='us-central1-a'
# Create GKE cluster (time in foreground)
time \
gcloud beta container \
–project $PROJECT clusters create $CLUSTER \
–zone $ZONE \
–username "admin" \
–cluster-version "1.11.5-gke.5" \
–machine-type "n1-standard-2" \
–image-type "COS" \
–disk-type "pd-standard" \
–disk-size "100" \
–scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \
–num-nodes "2" \
–enable-stackdriver-kubernetes \
–enable-ip-alias \
–network "projects/$PROJECT/global/networks/default" \
–subnetwork "projects/$PROJECT/regions/$REGION/subnetworks/default" \
–default-max-pods-per-node "110" \
–addons HorizontalPodAutoscaling,HttpLoadBalancing,Istio \
–istio-config auth=MTLS_PERMISSIVE \
–issue-client-certificate \
–metadata disable-legacy-endpoints=true \
–enable-autoupgrade \
–enable-autorepair
# Get cluster creds
gcloud container clusters get-credentials $CLUSTER \
–zone $ZONE –project $PROJECT
kubectl config current-context
# Create dev Namespace
kubectl apply -f ./resources/other/namespaces.yaml
# Enable Istio automatic sidecar injection in Dev Namespace
kubectl label namespace $NAMESPACE istio-injection=enabled

Executing these commands successfully will build the cluster and the dev Namespace, into which all the resources will be deployed. The two-node cluster creation process takes about three minutes on average.

screen_shot_2018-12-26_at_2.00.56_pm

We can also observe the new GKE cluster from the GKE Clusters Details tab.

screen_shot_2018-12-26_at_2.18.32_pm

Creating the GKE cluster also creates several other GCP resources, including a TCP load balancer and three external IP addresses. Shown below in the VPC network External IP addresses tab, there is one IP address associated with each of the two GKE cluster’s VM instances, and one IP address associated with the frontend of the load balancer.

screen_shot_2018-12-26_at_2.59.38_pm

While the TCP load balancer’s frontend is associated with the external IP address, the load balancer’s backend is a target pool, containing the two GKE cluster node machine instances.

screen_shot_2018-12-26_at_2.58.42_pm

A forwarding rule associates the load balancer’s frontend IP address with the backend target pool. External requests to the frontend IP address will be routed to the GKE cluster. From there, requests will be routed by Kubernetes and Istio to the individual storefront service Pods, and through the Istio sidecar (Envoy) proxies. There is an Istio sidecar proxy deployed to each Storefront service Pod.

screen_shot_2018-12-26_at_2.59.59_pm

Below, we see the details of the load balancer’s target pool, containing the two GKE cluster’s VMs.

screen_shot_2018-12-26_at_3.57.03_pm.png

As shown at the start of the post, a simplified view of the GCP/GKE network routing looks as follows. For brevity, firewall rules and routes are not illustrated in the diagram.

ConfluentCloudRouting

Apply Kubernetes Resources

Again, using kubectl, deploy the three services and associated Kubernetes and Istio resources. Note the Istio Gateway and VirtualService(s) are not deployed to the dev Namespace since their role is to control ingress and route traffic to the dev Namespace and the services within it (gist).

#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Deploy Kubernetes/Istio resources
# Constants – CHANGE ME!
readonly NAMESPACE='dev'
readonly PROJECT='gke-confluent-atlas'
readonly CLUSTER='storefront-api'
readonly REGION='us-central1'
readonly ZONE='us-central1-a'
kubectl apply -f ./resources/other/istio-gateway.yaml
kubectl apply -n $NAMESPACE -f ./resources/other/mongodb-atlas-external-mesh.yaml
kubectl apply -n $NAMESPACE -f ./resources/other/confluent-cloud-external-mesh.yaml
kubectl apply -n $NAMESPACE -f ./resources/config/confluent-cloud-kafka-configmap.yaml
kubectl apply -f ./resources/config/mongodb-atlas-secret.yaml
kubectl apply -f ./resources/config/confluent-cloud-kafka-secret.yaml
kubectl apply -n $NAMESPACE -f ./resources/services/accounts.yaml
kubectl apply -n $NAMESPACE -f ./resources/services/fulfillment.yaml
kubectl apply -n $NAMESPACE -f ./resources/services/orders.yaml

Once these commands complete successfully, on the Workloads tab, we should observe two Pods of each of the three storefront service Kubernetes Deployments deployed to the dev Namespace, all six Pods with a Status of ‘OK’. A Deployment controller provides declarative updates for Pods and ReplicaSets.

screen_shot_2018-12-26_at_2.51.01_pm

On the Services tab, we should observe the three storefront service’s Kubernetes Services. A Service in Kubernetes is a REST object.

screen_shot_2018-12-26_at_2.51.16_pm

On the Configuration Tab, we should observe the Kubernetes ConfigMap and two Secrets also deployed to the dev Environment.

screen_shot_2018-12-26_at_2.51.36_pm

Below, we see the confluent-cloud-kafka ConfigMap resource with its data map of Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.54.51_pm

Below, we see the confluent-cloud-kafka Secret with its data map of sensitive Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.55.17_pm

Test the Storefront API

If you recall from part two of the previous post, there are a set of seven Storefront API endpoints that can be called to create sample data and test the API. The HTTP GET Requests hit each service, generate test data, populate the three MongoDB databases, and produce and consume Kafka messages across all three topics. Making these requests is the easiest way to confirm the Storefront API is working properly.

  1. Sample Customer: accounts/customers/sample
  2. Sample Orders: orders/customers/sample/orders
  3. Sample Fulfillment Requests: orders/customers/sample/fulfill
  4. Sample Processed Order Event: fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Event: fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Event: fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Event: fulfillment/fulfillment/sample/receive

Thee are a wide variety of tools to interact with the Storefront API. The project includes a simple Python script, sample_data.py, which will make HTTP GET requests to each of the above endpoints, after confirming their health, and return a success message.

screen_shot_2018-12-31_at_12.19.50_pm.png

Postman

Postman, my personal favorite, is also an excellent tool to explore the Storefront API resources. I have the above set of the HTTP GET requests saved in a Postman Collection. Using Postman, below, we see the response from an HTTP GET request to the /accounts/customers endpoint.

screen_shot_2018-12-26_at_5.48.34_pm

Postman also allows us to create integration tests and run Collections of Requests in batches using Postman’s Collection Runner. To test the Storefront API, below, I used Collection Runner to run a single series of integration tests, intended to confirm the API’s functionality, by checking for expected HTTP response codes and expected values in the response payloads. Postman also shows the response times from the Storefront API. Since this platform was not built to meet Production SLAs, measuring response times is less critical in the Development environment.

screen_shot_2018-12-26_at_5.47.57_pm

Google Stackdriver

If you recall, the GKE cluster had the Stackdriver Kubernetes option enabled, which gives us, amongst other observability features, access to all cluster, node, pod, and container logs. To confirm data is flowing to the MongoDB databases and Kafka topics, we can check the logs from any of the containers. Below we see the logs from the two Accounts Pod containers. Observe the AfterSaveListener handler firing on an onAfterSave event, which sends a CustomerChangeEvent payload to the accounts.customer.change Kafka topic, without error. These entries confirm that both Atlas and Confluent Cloud are reachable by the GKE-based workloads, and appear to be functioning properly.

screen_shot_2018-12-26_at_8.05.50_pm.png

MongoDB Atlas Collection View

Review the MongoDB Atlas Clusters Collections tab. In this Development environment, the MongoDB databases and collections are created the first time a service tries to connects to them. In Production, the databases would be created and secured in advance of deploying resources. Once the sample data requests are completed successfully, you should now observe the three Storefront API databases, each with collections of documents.

screen_shot_2018-12-26_at_4.56.25_pm

MongoDB Compass

In addition to the Atlas web-based management console, MongoDB Compass is an excellent desktop tool to explore and manage MongoDB databases. Compass is available for Mac, Linux, and Windows. One of the many great features of Compass is the ability to visualize collection schemas and interactively filter documents. Below we see the fulfillment.requests collection schema.

Screen Shot 2019-01-20 at 10.21.54 AM.png

Confluent Control Center

Confluent Control Center is a downloadable, web browser-based tool for managing and monitoring Apache Kafka, including your Confluent Cloud clusters. Confluent Control Center provides rich functionality for building and monitoring production data pipelines and streaming applications. Confluent offers a free 30-day trial of Confluent Control Center. Since the Control Center is provided at an additional fee, and I found difficult to configure for Confluent Cloud clusters based on Confluent’s documentation, I chose not to cover it in detail, for this post.

screen_shot_2018-12-23_at_10.21.41_pm

screen_shot_2018-12-23_at_10.48.49_pm

Tear Down Cluster

Delete your Confluent Cloud and MongoDB clusters using their web-based management consoles. To delete the GKE cluster and all deployed Kubernetes resources, use the cluster delete command. Also, double-check that the external IP addresses and load balancer, associated with the cluster, were also deleted as part of the cluster deletion (gist).

#!/bin/bash
#
# author: Gary A. Stafford
# site: https://programmaticponderings.com
# license: MIT License
# purpose: Tear down GKE cluster and associated resources
# Constants – CHANGE ME!
readonly PROJECT='gke-confluent-atlas'
readonly CLUSTER='storefront-api'
readonly REGION='us-central1'
readonly ZONE='us-central1-a'
# Delete GKE cluster (time in foreground)
time yes | gcloud beta container clusters delete $CLUSTER –zone $ZONE
# Confirm network resources are also deleted
gcloud compute forwarding-rules list
gcloud compute target-pools list
gcloud compute firewall-rules list
# In case target-pool associated with Cluster is not deleted
yes | gcloud compute target-pools delete \
$(gcloud compute target-pools list \
–filter="region:($REGION)" –project $PROJECT \
| awk 'NR==2 {print $1}')

view raw
part4_tear_down.sh
hosted with ❤ by GitHub

Conclusion

In this post, we have seen how easy it is to integrate Cloud-based DBaaS and MaaS products with the managed Kubernetes services from GCP, AWS, and Azure. As this post demonstrated, leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications.

In future posts, we will revisit this Storefront API example, further demonstrating how to enable HTTPS (Securing Your Istio Ingress Gateway with HTTPS) and end-user authentication (Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , , ,

4 Comments

Using the Google Cloud Dataproc WorkflowTemplates API to Automate Spark and Hadoop Workloads on GCP

In the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we explored Google Cloud Dataproc using the Google Cloud Console as well as the Google Cloud SDK and Cloud Dataproc API. We created clusters, then uploaded and ran Spark and PySpark jobs, then deleted clusters, each as discrete tasks. Although each task could be done via the Dataproc API and therefore automatable, they were independent tasks, without awareness of the previous task’s state.

Screen Shot 2018-12-15 at 11.39.26 PM.png

In this brief follow-up post, we will examine the Cloud Dataproc WorkflowTemplates API to more efficiently and effectively automate Spark and Hadoop workloads. According to Google, the Cloud Dataproc WorkflowTemplates API provides a flexible and easy-to-use mechanism for managing and executing Dataproc workflows. A Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. A Workflow is an operation that runs a Directed Acyclic Graph (DAG) of jobs on a cluster. Shown below, we see one of the Workflows that will be demonstrated in this post, displayed in Spark History Server Web UI.

screen-shot-2018-12-16-at-11.07.29-am.png

Here we see a four-stage DAG of one of the three jobs in the workflow, displayed in Spark History Server Web UI.

screen-shot-2018-12-16-at-11.18.45-am

Workflows are ideal for automating large batches of dynamic Spark and Hadoop jobs, and for long-running and unattended job execution, such as overnight.

Demonstration

Using the Python and Java projects from the previous post, we will first create workflow templates using the just the WorkflowTemplates API. We will create the template, set a managed cluster, add jobs to the template, and instantiate the workflow. Next, we will further optimize and simplify our workflow by using a YAML-based workflow template file. The YAML-based template file eliminates the need to make API calls to set the template’s cluster and add the jobs to the template. Finally, to further enhance the workflow and promote re-use of the template, we will incorporate parameterization. Parameters will allow us to pass parameters (key/value) pairs from the command line to workflow template, and on to the Python script as input arguments.

It is not necessary to use the Google Cloud Console for this post. All steps will be done using Google Cloud SDK shell commands. This means all steps may be automated using CI/CD DevOps tools, like Jenkins and Spinnaker on GKE.

Source Code

All open-sourced code for this post can be found on GitHub within three repositories: dataproc-java-demodataproc-python-demo, and dataproc-workflow-templates. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

WorkflowTemplates API

Always start by ensuring you have the latest Google Cloud SDK updates and are working within the correct Google Cloud project.

gcloud components update

export PROJECT_ID=your-project-id 
gcloud config set project $PROJECT

Set the following variables based on your Google environment. The variables will be reused throughout the post for multiple commands.

export REGION=your-region
export ZONE=your-zone
export BUCKET_NAME=your-bucket

The post assumes you still have the Cloud Storage bucket we created in the previous post. In the bucket, you will need the two Kaggle IBRD CSV files, available on Kaggle, the compiled Java JAR file from the dataproc-java-demo project, and a new Python script, international_loans_dataproc.py, from the dataproc-python-demo project.

screen-shot-2018-12-16-at-12.03.51-pm

Use gsutil with the copy (cp) command to upload the four files to your Storage bucket.

gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME
gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME
gsutil cp international_loans_dataproc.py $BUCKET_NAME

Following Google’s suggested process, we create a workflow template using the workflow-templates create command.

export TEMPLATE_ID=template-demo-1
  
gcloud dataproc workflow-templates create \
  $TEMPLATE_ID --region $REGION

Adding a Cluster

Next, we need to set a cluster for the workflow to use, in order to run the jobs. Cloud Dataproc will create and use a Managed Cluster for your workflow or use an existing cluster. If the workflow uses a managed cluster, it creates the cluster, runs the jobs, and then deletes the cluster when the jobs are finished. This means, for many use cases, there is no need to maintain long-lived clusters, they become just an ephemeral part of the workflow.

We set a managed cluster for our Workflow using the workflow-templates set-managed-cluster command. We will re-use the same cluster specifications we used in the previous post, the Standard, 1 master node and 2 worker nodes, cluster type.

gcloud dataproc workflow-templates set-managed-cluster \
  $TEMPLATE_ID \
  --region $REGION \
  --zone $ZONE \
  --cluster-name three-node-cluster \
  --master-machine-type n1-standard-4 \
  --master-boot-disk-size 500 \
  --worker-machine-type n1-standard-4 \
  --worker-boot-disk-size 500 \
  --num-workers 2 \
  --image-version 1.3-deb9

Alternatively, if we already had an existing cluster, we would use the workflow-templates set-cluster-selector command, to associate that cluster with the workflow template.

gcloud dataproc workflow-templates set-cluster-selector \
  $TEMPLATE_ID \
  --region $REGION \
  --cluster-labels goog-dataproc-cluster-uuid=$CLUSTER_UUID

To get the existing cluster’s UUID label value, you could use a command similar to the following.

CLUSTER_UUID=$(gcloud dataproc clusters describe $CLUSTER_2 \
  --region $REGION \
  | grep 'goog-dataproc-cluster-uuid:' \
  | sed 's/.* //')

echo $CLUSTER_UUID

1c27efd2-f296-466e-b14e-c4263d0d7e19

Adding Jobs

Next, we add the jobs we want to run to the template. Each job is considered a step in the template, each step requires a unique step id. We will add three jobs to the template, two Java-based Spark jobs from the previous post, and a new Python-based PySpark job.

First, we add the two Java-based Spark jobs, using the workflow-templates add-job spark command. This command’s flags are nearly identical to the dataproc jobs submit spark command, used in the previous post.

export STEP_ID=ibrd-small-spark
  
gcloud dataproc workflow-templates add-job spark \
  --region $REGION \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --class org.example.dataproc.InternationalLoansAppDataprocSmall \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar

export STEP_ID=ibrd-large-spark
  
gcloud dataproc workflow-templates add-job spark \
  --region $REGION \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --class org.example.dataproc.InternationalLoansAppDataprocLarge \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar

Next, we add the Python-based PySpark job, international_loans_dataproc.py, as the second job in the template. This Python script requires three input arguments, on lines 15–17, which are the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket where the results will be placed (gist).

#!/usr/bin/python
# Author: Gary A. Stafford
# License: MIT
# Arguments Example:
# gs://dataproc-demo-bucket
# ibrd-statement-of-loans-historical-data.csv
# ibrd-summary-large-python
from pyspark.sql import SparkSession
import sys
def main(argv):
storage_bucket = argv[0]
data_file = argv[1]
results_directory = argv[2]
print "Number of arguments: {0} arguments.".format(len(sys.argv))
print "Argument List: {0}".format(str(sys.argv))
spark = SparkSession \
.builder \
.master("yarn") \
.appName('dataproc-python-demo') \
.getOrCreate()
# Defaults to INFO
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Loads CSV file from Google Storage Bucket
df_loans = spark \
.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(storage_bucket + "/" + data_file)
# Creates temporary view using DataFrame
df_loans.withColumnRenamed("Country", "country") \
.withColumnRenamed("Country Code", "country_code") \
.withColumnRenamed("Disbursed Amount", "disbursed") \
.withColumnRenamed("Borrower's Obligation", "obligation") \
.withColumnRenamed("Interest Rate", "interest_rate") \
.createOrReplaceTempView("loans")
# Performs basic analysis of dataset
df_disbursement = spark.sql("""
SELECT country, country_code,
format_number(total_disbursement, 0) AS total_disbursement,
format_number(ABS(total_obligation), 0) AS total_obligation,
format_number(avg_interest_rate, 2) AS avg_interest_rate
FROM (
SELECT country, country_code,
SUM(disbursed) AS total_disbursement,
SUM(obligation) AS total_obligation,
AVG(interest_rate) AS avg_interest_rate
FROM loans
GROUP BY country, country_code
ORDER BY total_disbursement DESC
LIMIT 25)
""").cache()
print "Results:"
df_disbursement.show(25, True)
# Saves results to single CSV file in Google Storage Bucket
df_disbursement.write \
.mode("overwrite") \
.format("parquet") \
.save(storage_bucket + "/" + results_directory)
spark.stop()
if __name__ == "__main__":
main(sys.argv[1:])

We pass the arguments to the Python script as part of the PySpark job, using the workflow-templates add-job pyspark command.

export STEP_ID=ibrd-large-pyspark
  
gcloud dataproc workflow-templates add-job pyspark \
  $BUCKET_NAME/international_loans_dataproc.py \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --region $REGION \
  -- $BUCKET_NAME \
     ibrd-statement-of-loans-historical-data.csv \
     ibrd-summary-large-python

That’s it, we have created our first Cloud Dataproc Workflow Template using the Dataproc WorkflowTemplate API. To view our template we can use the following two commands. First, use the workflow-templates list command to display a list of available templates. The list command output displays the version of the workflow template and how many jobs are in the template.

gcloud dataproc workflow-templates list --region $REGION
  
ID               JOBS  UPDATE_TIME               VERSION
template-demo-1  3     2018-12-15T16:32:06.508Z  5

Then, we use the workflow-templates describe command to show the details of a specific template.

gcloud dataproc workflow-templates describe \
  $TEMPLATE_ID --region $REGION

Using the workflow-templates describe command, we should see output similar to the following (gist).

createTime: '2018-12-15T16:31:21.779Z'
id: template-demo-1
jobs:
sparkJob:
jarFileUris:
gs://dataproc-demo-bucket/dataprocJavaDemo-1.0-SNAPSHOT.jar
mainClass: org.example.dataproc.InternationalLoansAppDataprocSmall
stepId: ibrd-small-spark
sparkJob:
jarFileUris:
gs://dataproc-demo-bucket/dataprocJavaDemo-1.0-SNAPSHOT.jar
mainClass: org.example.dataproc.InternationalLoansAppDataprocLarge
stepId: ibrd-large-spark
pysparkJob:
args:
gs://dataproc-demo-bucket
ibrd-statement-of-loans-historical-data.csv
ibrd-summary-large-python
mainPythonFileUri: gs://dataproc-demo-bucket/international_loans_dataproc.py
stepId: ibrd-large-pyspark
name: projects/dataproc-demo-224523/regions/us-east1/workflowTemplates/template-demo-1
placement:
managedCluster:
clusterName: three-node-cluster
config:
gceClusterConfig:
zoneUri: us-east1-b
masterConfig:
diskConfig:
bootDiskSizeGb: 500
machineTypeUri: n1-standard-4
softwareConfig:
imageVersion: 1.3-deb9
workerConfig:
diskConfig:
bootDiskSizeGb: 500
machineTypeUri: n1-standard-4
numInstances: 2
updateTime: '2018-12-15T16:32:06.508Z'
version: 5

In the template description, notice the template’s id, the managed cluster in the placement section, and the three jobs, all which we added using the above series of workflow-templates commands. Also, notice the creation and update timestamps and version number, which were automatically generated by Dataproc. Lastly, notice the name, which refers to the GCP project and region where this copy of the template is located. Had we used an existing cluster with our workflow, as opposed to a managed cluster, the placement section would have looked as follows.

placement:
  clusterSelector:
    clusterLabels:
      goog-dataproc-cluster-uuid: your_clusters_uuid_label_value

To instantiate the workflow, we use the workflow-templates instantiate command. This command will create the managed cluster, run all the steps (jobs), then delete the cluster. I have added the time command to see how fast the workflow will take to complete.

time gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION #--async

We can observe the progress from the Google Cloud Dataproc Console, or from the command line by omitting the --async flag. Below we see the three jobs completed successfully on the managed cluster.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/e720bb96-9c87-330e-b1cd-efa4612b3c57].
WorkflowTemplate [template-demo-1] RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/e1fe53de-92f2-4f8c-8b3a-fda5e13829b6].
Created cluster: three-node-cluster-ugdo4ygpl52bo.
Job ID ibrd-small-spark-ugdo4ygpl52bo RUNNING
Job ID ibrd-large-spark-ugdo4ygpl52bo RUNNING
Job ID ibrd-large-pyspark-ugdo4ygpl52bo RUNNING
Job ID ibrd-small-spark-ugdo4ygpl52bo COMPLETED
Job ID ibrd-large-spark-ugdo4ygpl52bo COMPLETED
Job ID ibrd-large-pyspark-ugdo4ygpl52bo COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/f2a40c33-3cdf-47f5-92d6-345463fbd404].
WorkflowTemplate [template-demo-1] DONE
Deleted cluster: three-node-cluster-ugdo4ygpl52bo.

1.02s user 0.35s system 0% cpu 5:03.55 total

In the output, you see the creation of the cluster, the three jobs running and completing successfully, and finally the cluster deletion. The entire workflow took approximately 5 minutes to complete. Below is the view of the workflow’s results from the Dataproc Clusters Console Jobs tab.

screen_shot_2018-12-15_at_11.42.44_am

Below we see the output from the PySpark job, run as part of the workflow template, shown in the Dataproc Clusters Console Output tab. Notice the three input arguments we passed to the Python script from the workflow template, listed in the output.

screen_shot_2018-12-15_at_11.43.56_am

We see the arguments passed to the job, from the Jobs Configuration tab.

screen_shot_2018-12-15_at_1.11.11_pm.png

Examining the Google Cloud Dataproc Jobs Console, we will observe that the WorkflowTemplate API automatically adds a unique alphanumeric extension to both the name of the managed clusters we create, as well as to the name of each job that is run. The extension on the cluster name matches the extension on the jobs ran on that cluster.

screen_shot_2018-12-15_at_1.05.41_pm

YAML-based Workflow Template

Although, the above WorkflowTemplates API-based workflow was certainly more convenient than using the individual Cloud Dataproc API commands. At a minimum, we don’t have to remember to delete our cluster when the jobs are complete, as I often do. To further optimize the workflow, we will introduce YAML-based Workflow Template. According to Google, you can define a workflow template in a YAML file, then instantiate the template to run the workflow. You can also import and export a workflow template YAML file to create and update a Cloud Dataproc workflow template resource.

We can export our first workflow template to create our YAML-based template file.

gcloud dataproc workflow-templates export template-demo-1 \
  --destination template-demo-2.yaml \
  --region $REGION

Below is our first YAML-based template, template-demo-2.yaml. You will need to replace the values in the template with your own values, based on your environment (gist).

jobs:
sparkJob:
jarFileUris:
gs://dataproc-demo-bucket/dataprocJavaDemo-1.0-SNAPSHOT.jar
mainClass: org.example.dataproc.InternationalLoansAppDataprocSmall
stepId: ibrd-small-spark
sparkJob:
jarFileUris:
gs://dataproc-demo-bucket/dataprocJavaDemo-1.0-SNAPSHOT.jar
mainClass: org.example.dataproc.InternationalLoansAppDataprocLarge
stepId: ibrd-large-spark
pysparkJob:
args:
gs://dataproc-demo-bucket
ibrd-statement-of-loans-historical-data.csv
ibrd-summary-large-python
mainPythonFileUri: gs://dataproc-demo-bucket/international_loans_dataproc.py
stepId: ibrd-large-pyspark
placement:
managedCluster:
clusterName: three-node-cluster
config:
gceClusterConfig:
zoneUri: us-east1-b
masterConfig:
diskConfig:
bootDiskSizeGb: 500
machineTypeUri: n1-standard-4
softwareConfig:
imageVersion: 1.3-deb9
workerConfig:
diskConfig:
bootDiskSizeGb: 500
machineTypeUri: n1-standard-4
numInstances: 2

view raw
template-demo-2.yaml
hosted with ❤ by GitHub

Note the template looks almost similar to the template we just created previously using the WorkflowTemplates API. The YAML-based template requires the placement and jobs fields. All the available fields are detailed, here.

To run the template we use the workflow-templates instantiate-from-file command. Again, I will use the time command to measure performance.

time gcloud dataproc workflow-templates instantiate-from-file \
  --file template-demo-2.yaml \
  --region $REGION

Running the workflow-templates instantiate-from-file command will run a workflow, nearly identical to the workflow we ran in the previous example, with a similar timing. Below we see the three jobs completed successfully on the managed cluster, in approximately the same time as the previous workflow.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/7ba3c28e-ebfa-32e7-9dd6-d938a1cfe23b].
WorkflowTemplate RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/8d05199f-ed36-4787-8a28-ae784c5bc8ae].
Created cluster: three-node-cluster-5k3bdmmvnna2y.
Job ID ibrd-small-spark-5k3bdmmvnna2y RUNNING
Job ID ibrd-large-spark-5k3bdmmvnna2y RUNNING
Job ID ibrd-large-pyspark-5k3bdmmvnna2y RUNNING
Job ID ibrd-small-spark-5k3bdmmvnna2y COMPLETED
Job ID ibrd-large-spark-5k3bdmmvnna2y COMPLETED
Job ID ibrd-large-pyspark-5k3bdmmvnna2y COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/a436ae82-f171-4b0a-9b36-5e16406c75d5].
WorkflowTemplate DONE
Deleted cluster: three-node-cluster-5k3bdmmvnna2y.

1.16s user 0.44s system 0% cpu 4:48.84 total

Parameterization of Templates

To further optimize the workflow template process for re-use, we have the option of passing parameters to our template. Imagine you now receive new loan snapshot data files every night. Imagine you need to run the same data analysis on the financial transactions of thousands of your customers, nightly. Parameterizing templates makes it more flexible and reusable. By removing hard-codes values, such as Storage bucket paths and data file names, a single template may be re-used for multiple variations of the same job. Parameterization allows you to automate hundreds or thousands of Spark and Hadoop jobs in a workflow or workflows, each with different parameters, programmatically.

To demonstrate the parameterization of a workflow template, we create another YAML-based template with just the Python/PySpark job, template-demo-3.yaml. If you recall from our first example, the Python script, international_loans_dataproc.py, requires three input arguments: the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket, where the results will be placed.

We will replace four of the values in the template with parameters. We will inject those parameter’s values when we instantiate the workflow. Below is the new parameterized template. The template now has a parameters section from lines 26–46. They define parameters that will be used to replace the four values on lines 3–7 (gist).

jobs:
pysparkJob:
args:
storage_bucket_parameter
data_file_parameter
results_directory_parameter
mainPythonFileUri: main_python_file_parameter
stepId: ibrd-pyspark
placement:
managedCluster:
clusterName: three-node-cluster
config:
gceClusterConfig:
zoneUri: us-east1-b
masterConfig:
diskConfig:
bootDiskSizeGb: 500
machineTypeUri: n1-standard-4
softwareConfig:
imageVersion: 1.3-deb9
workerConfig:
diskConfig:
bootDiskSizeGb: 500
machineTypeUri: n1-standard-4
numInstances: 2
parameters:
description: Python script to run
fields:
jobs['ibrd-pyspark'].pysparkJob.mainPythonFileUri
name: MAIN_PYTHON_FILE
description: Storage bucket location of data file and results
fields:
jobs['ibrd-pyspark'].pysparkJob.args[0]
name: STORAGE_BUCKET
validation:
regex:
regexes:
gs://.*
description: IBRD data file
fields:
jobs['ibrd-pyspark'].pysparkJob.args[1]
name: IBRD_DATA_FILE
description: Result directory
fields:
jobs['ibrd-pyspark'].pysparkJob.args[2]
name: RESULTS_DIRECTORY

view raw
template-demo-3.yaml
hosted with ❤ by GitHub

Note the PySpark job’s three arguments and the location of the Python script have been parameterized. Parameters may include validation. As an example of validation, the template uses regex to validate the format of the Storage bucket path. The regex follows Google’s RE2 regular expression library syntax. If you need help with regex, the Regex Tester – Golang website is a convenient way to test your parameter’s regex validations.

First, we import the new parameterized YAML-based workflow template, using the workflow-templates import command. Then, we instantiate the template using the workflow-templates instantiate command. The workflow-templates instantiate command will run the single PySpark job, analyzing the smaller IBRD data file, and placing the resulting Parquet-format file in a directory within the Storage bucket. We pass the Python script location, bucket link, smaller IBRD data file name, and output directory, as parameters to the template, and therefore indirectly, three of these, as input arguments to the Python script.

export TEMPLATE_ID=template-demo-3

gcloud dataproc workflow-templates import $TEMPLATE_ID \
   --region $REGION --source template-demo-3.yaml
  
gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION --async \
  --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-latest-available-snapshot.csv",RESULTS_DIRECTORY="ibrd-summary-small-python"

Next, we will analyze the larger historic data file, using the same parameterized YAML-based workflow template, but changing two of the four parameters we are passing to the template with the workflow-templates instantiate command. This will run a single PySpark job on the larger IBRD data file and place the resulting Parquet-format file in a different directory within the Storage bucket.

time gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION \
  --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-historical-data.csv",RESULTS_DIRECTORY="ibrd-summary-large-python"

This is the power of parameterization—one workflow template and one job script, but two different datasets and two different results.

Below we see the single PySpark job ran on the managed cluster.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/b3c5063f-e3cf-3833-b613-83db12b82f32].
WorkflowTemplate [template-demo-3] RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105].
Created cluster: three-node-cluster-j6q2al2mkkqck.
Job ID ibrd-pyspark-j6q2al2mkkqck RUNNING
Job ID ibrd-pyspark-j6q2al2mkkqck COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/fe4a263e-7c6d-466e-a6e2-52292cbbdc9b].
WorkflowTemplate [template-demo-3] DONE
Deleted cluster: three-node-cluster-j6q2al2mkkqck.

0.98s user 0.40s system 0% cpu 4:19.42 total

Using the workflow-templates list command again, should display a list of two workflow templates.

gcloud dataproc workflow-templates list --region $REGION
  
ID               JOBS  UPDATE_TIME               VERSION
template-demo-3  1     2018-12-15T17:04:39.064Z  2
template-demo-1  3     2018-12-15T16:32:06.508Z  5

Looking within the Google Cloud Storage bucket, we should now see four different folders, the results of the workflows.

screen-shot-2018-12-16-at-11.58.32-am.png

Job Results and Testing

To check on the status of a job, we use the dataproc jobs wait command. This returns the standard output (stdout) and standard error (stderr) for that specific job.

export SET_ID=ibrd-large-dataset-pyspark-cxzzhr2ro3i54
  
gcloud dataproc jobs wait $SET_ID \
  --project $PROJECT_ID \
  --region $REGION

The dataproc jobs wait command is frequently used for automated testing of jobs, often within a CI/CD pipeline. Assume we have expected part of the job output that indicates success, such as a string, boolean, or numeric value. We could any number of test frameworks or other methods to confirm the existence of that expected value or values. Below is a simple example of using the grep command to check for the existence of the expected line ‘ state: FINISHED’ in the standard output of the dataproc jobs wait command.

command=$(gcloud dataproc jobs wait $SET_ID \
--project $PROJECT_ID \
--region $REGION) &>/dev/null

if grep -Fqx "  state: FINISHED" <<< $command &>/dev/null; then
  echo "Job Success!"
else
  echo "Job Failure?"
fi

# single line alternative
if grep -Fqx "  state: FINISHED" <<< $command &>/dev/null;then echo "Job Success!";else echo "Job Failure?";fi

Job Success!

Individual Operations

To view individual workflow operations, use the operations list and operations describe commands. The operations list command will list all operations.

Notice the three distinct series of operations within each workflow, shown with the operations list command: WORKFLOW, CREATE, and DELETE. In the example below, I’ve separated the operations by workflow, for better clarity.

gcloud dataproc operations list --region $REGION

NAME                                  TIMESTAMP                 TYPE      STATE  ERROR  WARNINGS
fe4a263e-7c6d-466e-a6e2-52292cbbdc9b  2018-12-15T17:11:45.178Z  DELETE    DONE
896b7922-da8e-49a9-bd80-b1ac3fda5105  2018-12-15T17:08:38.322Z  CREATE    DONE
b3c5063f-e3cf-3833-b613-83db12b82f32  2018-12-15T17:08:37.497Z  WORKFLOW  DONE
---
be0e5293-275f-46ad-b1f4-696ba44c222e  2018-12-15T17:07:26.305Z  DELETE    DONE
6784078c-cbe3-4c1e-a56e-217149f555a4  2018-12-15T17:04:40.613Z  CREATE    DONE
fcd8039e-a260-3ab3-ad31-01abc1a524b4  2018-12-15T17:04:40.007Z  WORKFLOW  DONE
---
b4b23ca6-9442-4ffb-8aaf-460bac144dd8  2018-12-15T17:02:16.744Z  DELETE    DONE
89ef9c7c-f3c9-4d01-9091-61ed9e1f085d  2018-12-15T17:01:45.514Z  CREATE    DONE
243fa7c1-502d-3d7a-aaee-b372fe317570  2018-12-15T17:01:44.895Z  WORKFLOW  DONE

We use the results of the operations list command to execute the operations describe command to describe a specific operation.

gcloud dataproc operations describe \
  projects/$PROJECT_ID/regions/$REGION/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105

Each type of operation contains different details. Note the fine-grain of detail we get from Dataproc using the operations describe command for a CREATE operation (gist).

projects/$PROJECT_ID/regions/$REGION/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105
done: true
metadata:
'@type': type.googleapis.com/google.cloud.dataproc.v1beta2.ClusterOperationMetadata
clusterName: three-node-cluster-j6q2al2mkkqck
clusterUuid: 10656c6e-ef49-4264-805b-463e1e819626
description: Create cluster with 2 workers
operationType: CREATE
status:
innerState: DONE
state: DONE
stateStartTime: '2018-12-15T17:10:12.722Z'
statusHistory:
state: PENDING
stateStartTime: '2018-12-15T17:08:38.322Z'
state: RUNNING
stateStartTime: '2018-12-15T17:08:38.380Z'
name: projects/dataproc-demo-224523/regions/us-east1/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105
response:
'@type': type.googleapis.com/google.cloud.dataproc.v1beta2.Cluster
clusterName: three-node-cluster-j6q2al2mkkqck
clusterUuid: 10656c6e-ef49-4264-805b-463e1e819626
config:
configBucket: dataproc-5214e13c-d3ea-400b-9c70-11ee08fac5ab-us-east1
gceClusterConfig:
networkUri: https://www.googleapis.com/compute/v1/projects/dataproc-demo-224523/global/networks/default
serviceAccountScopes:
https://www.googleapis.com/auth/bigquery
https://www.googleapis.com/auth/bigtable.admin.table
https://www.googleapis.com/auth/bigtable.data
https://www.googleapis.com/auth/cloud.useraccounts.readonly
https://www.googleapis.com/auth/devstorage.full_control
https://www.googleapis.com/auth/devstorage.read_write
https://www.googleapis.com/auth/logging.write
zoneUri: https://www.googleapis.com/compute/v1/projects/dataproc-demo-224523/zones/us-east1-b
masterConfig:
diskConfig:
bootDiskSizeGb: 500
bootDiskType: pd-standard
imageUri: https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-1-3-deb9-20181206-000000-rc01
machineTypeUri: https://www.googleapis.com/compute/v1/projects/dataproc-demo-224523/zones/us-east1-b/machineTypes/n1-standard-4
minCpuPlatform: AUTOMATIC
numInstances: 1
softwareConfig:
imageVersion: 1.3.19-deb9
properties:
capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy: fair
core:fs.gs.block.size: '134217728'
core:fs.gs.metadata.cache.enable: 'false'
distcp:mapreduce.map.java.opts: -Xmx768m
distcp:mapreduce.map.memory.mb: '1024'
distcp:mapreduce.reduce.java.opts: -Xmx768m
distcp:mapreduce.reduce.memory.mb: '1024'
hdfs:dfs.datanode.address: 0.0.0.0:9866
hdfs:dfs.datanode.http.address: 0.0.0.0:9864
hdfs:dfs.datanode.https.address: 0.0.0.0:9865
hdfs:dfs.datanode.ipc.address: 0.0.0.0:9867
hdfs:dfs.namenode.handler.count: '20'
hdfs:dfs.namenode.http-address: 0.0.0.0:9870
hdfs:dfs.namenode.https-address: 0.0.0.0:9871
hdfs:dfs.namenode.lifeline.rpc-address: three-node-cluster-j6q2al2mkkqck-m:8050
hdfs:dfs.namenode.secondary.http-address: 0.0.0.0:9868
hdfs:dfs.namenode.secondary.https-address: 0.0.0.0:9869
hdfs:dfs.namenode.service.handler.count: '10'
hdfs:dfs.namenode.servicerpc-address: three-node-cluster-j6q2al2mkkqck-m:8051
mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE: '3840'
mapred:mapreduce.job.maps: '21'
mapred:mapreduce.job.reduce.slowstart.completedmaps: '0.95'
mapred:mapreduce.job.reduces: '7'