Posts Tagged Java

Eventual Consistency with Spring for Apache Kafka: Part 2 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

As discussed in Part One of this post, given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components in a local development environment running on Kubernetes with Istio, using minikube. For simplicity’s sake, we will only run a single instance of each service. Additionally, we are not implementing custom domain names, TLS/HTTPS, authentication and authorization, API keys, or restricting access to any sensitive operational API endpoints or ports, all of which we would certainly do in an actual production environment.

To provide operational visibility, we will add Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo ExpressKialiPrometheus, and Grafana to our system.

View of Storefront API traffic from Kiali

Prerequisites

This post will assume a basic level of knowledge of Kubernetes, minikube, Docker, and Istio. Furthermore, the post assumes you have already installed recent versions of minikube, kubectl, Docker, and Istio. Meaning, that the kubectl, istioctl, docker, and minikube commands are all available from the terminal.

Currently installed version of the required applications

For this post demonstration, I am using an Apple MacBook Pro running macOS as my development machine. I have the latest versions of Docker Desktop, minikube, kubectl, and Istio installed as of May 2021.

Source Code

The source code for this post is open-source and is publicly available on GitHub. Clone the GitHub project using the following command:

clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Minikube

Part of the Kubernetes project, minikube is local Kubernetes, focusing on making it easy to learn and develop for Kubernetes. Minikube quickly sets up a local Kubernetes cluster on macOS, Linux, and Windows. Given the number of Kubernetes resources we will be deploying to minikube, I would recommend at least 3 CPUs and 4–5 GBs of memory. If you choose to deploy multiple observability tools, you may want to increase both of these resources if you can afford it. I maxed out both CPUs and memory several times while setting up this demonstration, causing temporary lock-ups of minikube.

minikube --cpus 3 --memory 5g --driver=docker start start

The Docker driver allows you to install Kubernetes into an existing Docker install. If you are using Docker, please be aware that you must have at least an equivalent amount of resources allocated to Docker to apportion to minikube.

Before continuing, confirm minikube is up and running and confirm the current context of kubectl is minikube.

minikube status
kubectl config current-context

The statuses should look similar to the following:

Use the eval below command to point your shell to minikube’s docker-daemon. You can confirm this by using the docker image ls and docker container ls command to view running Kubernetes containers on minikube.

eval $(minikube -p minikube docker-env)
docker image ls
docker container ls

The output should look similar to the following:

You can also check the status of minikube from Docker Desktop. Minikube is running as a container, instantiated from a Docker image, gcr.io/k8s-minikube/kicbase. View the container’s Stats, as shown below.

Istio

Assuming you have downloaded and configured Istio, install it onto minikube. I currently have Istio 1.10.0 installed and have theISTIO_HOME environment variable set in my Oh My Zsh .zshrc file. I have also set Istio’s bin/ subdirectory in my PATH environment variable. The bin/ subdirectory contains the istioctl executable.

echo $ISTIO_HOME                                                                
> /Applications/Istio/istio-1.10.0
where istioctl
> /Applications/Istio/istio-1.10.0/bin/istioctl
istioctl version

> client version: 1.10.0
control plane version: 1.10.0
data plane version: 1.10.0 (4 proxies)

Istio comes with several built-in configuration profiles. The profiles provide customization of the Istio control plane and of the sidecars for the Istio data plane.

istioctl profile list
> Istio configuration profiles:
default
demo
empty
external
minimal
openshift
preview
remote

For this demonstration, we will use the default profile, which installs istiod and an istio-ingressgateway. We will not require the use of an istio-egressgateway, since all components will be installed locally on minikube.

istioctl install --set profile=default -y
> ✔ Istio core installed
✔ Istiod installed
✔ Ingress gateways installed
✔ Installation complete

Minikube Tunnel

kubectl get svc istio-ingressgateway -n istio-system

To associate an IP address, run the minikube tunnel command in a separate terminal tab. Since it requires opening privileged ports 80 and 443 to be exposed, this command will prompt you for your sudo password.

Services of the type LoadBalancer can be exposed by using the minikube tunnel command. It must be run in a separate terminal window to keep the LoadBalancer running. We previously created the istio-ingressgateway. Run the following command and note that the status of EXTERNAL-IP is <pending>. There is currently no external IP address associated with our LoadBalancer.

minikube tunnel

Rerun the previous command. There should now be an external IP address associated with the LoadBalancer. In my case, 127.0.0.1.

kubectl get svc istio-ingressgateway -n istio-system

The external IP address shown is the address we will use to access the resources we chose to expose externally on minikube.

Minikube Dashboard

Once again, in a separate terminal tab, open the Minikube Dashboard (aka Kubernetes Dashboard).

minikube dashboard

The dashboard will give you a visual overview of all your installed Kubernetes components.

Minikube Dashboard showing the istio-system namespace

Namespaces

Kubernetes supports multiple virtual clusters backed by the same physical cluster. These virtual clusters are called namespaces. For this demonstration, we will use four namespaces to organize our deployed resources: dev, mongo, kafka, and storefront-kafka-project. The dev namespace is where we will deploy our Storefront API’s microservices: accounts, orders, and fulfillment. We will deploy MongoDB and Mongo Express to the mongo namespace. Lastly, we will use the kafka and storefront-kafka-project namespaces to deploy Apache Kafka to minikube using Strimzi, a Cloud Native Computing Foundation sandbox project, and CMAK.

kubectl apply -f ./minikube/resources/namespaces.yaml

Automatic Sidecar Injection

In order to take advantage of all of Istio’s features, pods in the mesh must be running an Istio sidecar proxy. When you set the istio-injection=enabled label on a namespace and the injection webhook is enabled, any new pods created in that namespace will automatically have a sidecar added to them. Labeling the dev namespace for automatic sidecar injection ensures that our Storefront API’s microservices — accounts, orders, and fulfillment— will have Istio sidecar proxy automatically injected into their pods.

kubectl label namespace dev istio-injection=enabled

MongoDB

Next, deploy MongoDB and Mongo Express to the mongo namespace on minikube. To ensure a successful connection to MongoDB from Mongo Express, I suggest giving MongoDB a chance to start up fully before deploying Mongo Express.

kubectl apply -f ./minikube/resources/mongodb.yaml -n mongo
sleep 60
kubectl apply -f ./minikube/resources/mongo-express.yaml -n mongo

To confirm the success of the deployments, use the following command:

kubectl get services -n mongo

Or use the Kubernetes Dashboard to confirm deployments.

Mongo Express UI Access

For parts of your application (for example, frontends) you may want to expose a Service onto an external IP address outside of your cluster. Kubernetes ServiceTypes allows you to specify what kind of Service you want; the default is ClusterIP.

Note that while MongoDB uses the ClusterIP, Mongo Express uses NodePort. With NodePort, the Service is exposed on each Node’s IP at a static port (the NodePort). You can contact the NodePort Service, from outside the cluster, by requesting <NodeIP>:<NodePort>.

In a separate terminal tab, open Mongo Express using the following command:

minikube service --url mongo-express -n mongo

You should see output similar to the following:

Click on the link to open Mongo Express. There should already be three MongoDB operational databases shown in the UI. The three Storefront databases and collections will be created automatically, later in the post: accounts, orders, and fulfillment.

Apache Kafka using Strimzi

Next, we will install Apache Kafka and Apache Zookeeper into the kafka and storefront-kafka-project namespaces on minikube, using Strimzi. Since Strimzi has a great, easy-to-use Quick Start guide, I will not detail the complete install complete process in this post. I suggest using their guide to understand the process and what each command does. Then, use the slightly modified Strimzi commands I have included below to install Kafka and Zookeeper.

# assuming 0.23.0 is latest version available
curl -L -O https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.23.0/strimzi-0.23.0.zip
unzip strimzi-0.23.0.zip
cd strimzi-0.23.0
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
# manually change STRIMZI_NAMESPACE value to storefront-kafka-project
nano install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
kubectl create -f install/cluster-operator/ -n kafka
kubectl create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-cluster.yaml -n storefront-kafka-project
kubectl wait kafka/kafka-cluster --for=condition=Ready --timeout=300s -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-topics.yaml -n storefront-kafka-project

Zoo Entrance

We want to install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. However, CMAK required access to Zookeeper. You can not access Strimzi’s Zookeeper directly from CMAK; this is intentional to avoid performance and security issues. See this GitHub issue for a better explanation of why. We will use the appropriately named Zoo Entrance as a proxy for CMAK to Zookeeper to overcome this challenge.

To install Zoo Entrance, review the GitHub project’s install guide, then use the following commands:

git clone https://github.com/scholzj/zoo-entrance.git
cd zoo-entrance
# optional: change my-cluster to kafka-cluster
sed -i '' 's/my-cluster/kafka-cluster/' deploy.yaml
kubectl apply -f deploy.yaml -n storefront-kafka-project

Cluster Manager for Apache Kafka

Next, install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. Run the following command to deploy CMAK into the storefront-kafka-project namespace.

kubectl apply -f ./minikube/resources/cmak.yaml -n storefront-kafka-project

Similar to Mongo Express, we can access CMAK’s UI using its NodePort. In a separate terminal tab, run the following command:

minikube service --url cmak -n storefront-kafka-project

You should see output similar to Mongo Express. Click on the link provided to access CMAK. Choose ‘Add Cluster’ in CMAK to add our existing Kafka cluster to CMAK’s management interface. Use Zoo Enterence’s service address for the Cluster Zookeeper Hosts value.

zoo-entrance.storefront-kafka-project.svc:2181

Once complete, you should see the three Kafka topics we created previously with Strimzi: accounts.customer.change, fulfillment.order.change, and orders.order.change. Each topic will have three partitions, one replica, and one broker. You should also see the _consumer_offsets topic that Kafka uses to store information about committed offsets for each topic:partition per group of consumers (groupID).

Storefront API Microservices

We are finally ready to install our Storefront API’s microservices into the dev namespace. Each service is preconfigured to access Kafka and MongoDB in their respective namespaces.

kubectl apply -f ./minikube/resources/accounts.yaml -n dev
kubectl apply -f ./minikube/resources/orders.yaml -n dev
kubectl apply -f ./minikube/resources/fulfillment.yaml -n dev

Spring Boot services usually take about two minutes to fully start. The time required to download the Docker Images from docker.com and the start-up time means it could take 3–4 minutes for each of the three services to be ready to accept API traffic.

Istio Components

We want to be able to access our Storefront API’s microservices through our Kubernetes LoadBalancer, while also leveraging all the capabilities of Istio as a service mesh. To do so, we need to deploy an Istio Gateway and a VirtualService. We will also need to deploy DestinationRule resources. A Gateway describes a load balancer operating at the edge of the mesh receiving incoming or outgoing HTTP/TCP connections. A VirtualService defines a set of traffic routing rules to apply when a host is addressed. Lastly, a DestinationRule defines policies that apply to traffic intended for a Service after routing has occurred.

kubectl apply -f ./minikube/resources/destination_rules.yaml -n dev
kubectl apply -f ./minikube/resources/istio-gateway.yaml -n dev

Testing the System and Creating Sample Data

I have provided a Python 3 script that runs a series of seven HTTP GET requests, in a specific order, against the Storefront API. These calls will validate the deployments, confirm the API’s services can access Kafka and MongoDB, generate some initial data, and automatically create the MongoDB database collections from the initial Insert statements.

python3 -m pip install -r ./utility_scripts/requirements.txt -U
python3 ./utility_scripts/refresh.py

The script’s output should be as follows:

If we now look at Mongo Express, we should note three new databases: accounts, orders, and fulfillment.

Observability Tools

Istio makes it easy to integrate with a number of common tools, including cert-managerPrometheusGrafanaKialiZipkin, and Jaeger. In order to better observe our Storefront API, we will install three well-known observability tools: Kiali, Prometheus, and Grafana. Luckily, these tools are all included with Istio. You can install any or all of these to minikube. I suggest installing the tools one at a time as not to overwhelm minikube’s CPU and memory resources.

kubectl apply -f ./minikube/resources/prometheus.yaml

kubectl apply -f $ISTIO_HOME/samples/addons/grafana.yaml

kubectl apply -f $ISTIO_HOME/samples/addons/kiali.yaml

Once deployment is complete, to access any of the UI’s for these tools, use the istioctl dashboard command from a new terminal window:

istioctl dashboard kiali

istioctl dashboard prometheus

istioctl dashboard grafana

Kiali

Below we see a view of Kiali with API traffic flowing to Kafka and MongoDB.

View of Storefront API traffic from Kiali

Prometheus

Each of the three Storefront API microservices has a dependency on Micrometer; specifically, a dependency on micrometer-registry-prometheus. As an instrumentation facade, Micrometer allows you to instrument your code with dimensional metrics with a vendor-neutral interface and decide on the monitoring system as a last step. Instrumenting your core library code with Micrometer allows the libraries to be included in applications that ship metrics to different backends. Given the Micrometer Prometheus dependency, each microservice exposes a /prometheus endpoint (e.g., http://127.0.0.1/accounts/actuator/prometheus) as shown below in Postman.

The /prometheus endpoint exposes dozens of useful metrics and is configured to be scraped by Prometheus. These metrics can be displayed in Prometheus and indirectly in Grafana dashboards via Prometheus. I have customized Istio’s version of Prometheus and included it in the project (prometheus.yaml), which now scrapes the Storefront API’s metrics.

scrape_configs:
- job_name: 'spring_micrometer'
metrics_path: '/actuator/prometheus'
scrape_interval: 5s
static_configs:
- targets: ['accounts.dev:8080','orders.dev:8080','fulfillment.dev:8080']

Here we see an example graph of a Spring Kafka Listener metric, spring_kafka_listener_seconds_sum, in Prometheus. There are dozens of metrics exposed to Prometheus from our system that we can observe and alert on.

Grafana

Lastly, here is an example Spring Boot Dashboard in Grafana. More dashboards are available on Grafana’s community dashboard page. The Grafana dashboard uses Prometheus as the source of its metrics data.

Storefront API Endpoints

The three storefront services are fully functional Spring Boot, Spring Data REST, Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. To better understand the Storefront API, each Spring Boot microservice uses SpringFox, which produces automated JSON API documentation for APIs built with Spring. The service builds also include the springfox-swagger-ui web jar, which ships with Swagger UI. Swagger takes the manual work out of API documentation, with a range of solutions for generating, visualizing, and maintaining API docs.

From a web browser, you can use the /swagger-ui/ subdirectory/subpath with any of the three microservices to access the fully-featured Swagger UI (e.g., http://127.0.0.1/accounts/swagger-ui/).

Accounts service Customer entity endpoints

Each service’s data model (POJOs) is also exposed through the Swagger UI.

Accounts service data model

Spring Boot Actuator

Additionally, each service includes Spring Boot Actuator. The Actuator exposes additional operational endpoints, allowing us to observe the running services. With Actuator, you get many features, including access to available operational-oriented endpoints, using the /actuator/ subdirectory/subpath (e.g., http://127.0.0.1/accounts/actuator/). For this demonstration, I have not restricted access to any available Actuator endpoints.

Partial list of Spring Boot Actuator endpoints as seen using Swagger
Partial list of Spring Boot Actuator endpoints as seen using Postman

Conclusion

In this two-part post, we learned how to build an API using Spring Boot. We ensured the API’s distributed data integrity using a pub/sub model with Spring for Apache Kafka Project. When a relevant piece of data was changed by one microservice, that state change triggered a state change event that was shared with other microservices using Kafka topics.

We also learned how to deploy and run the API in a local development environment running on Kubernetes with Istio, using minikube. We have added production-tested observability tools to provide operational visibility, including CMAK, Mongo Express, Kiali, Prometheus, and Grafana.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment

Eventual Consistency with Spring for Apache Kafka: Part 1 of 2

Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices

Given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.

Introduction

Apache Kafka is an open-source distributed event streaming platform capable of handling trillions of messages. According to Confluent, initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform.

Eventual consistency, according to Wikipedia, is a consistency model used in distributed computing to achieve high availability that informally guarantees that if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. I previously covered the topic of eventual consistency in a distributed system using RabbitMQ in the May 2017 post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. The post was featured on Pivotal’s RabbitMQ website.

Domain-driven Design

To ground the discussion, let’s examine a common example — an online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

Given this problem domain, we can assume we have the concept of a Customer. Further, we can assume the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of the Customer will require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program and online shopping activity. Fulfillment may maintain a record of all orders being shipped to the customer. Security likely holds the customer’s access credentials, account access history, and privacy settings.

Below are the Customer data objects are shown in yellow. Orange represents the logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts or even between services within the same context, then we must ensure data consistency. Take, for example, the case where a customer changes their home address or email. Let us assume that the Accounting context is the system of record for these data fields. However, to fulfill orders, the Shipping context might also need to maintain the customer’s current home address. Likewise, the Marketing context, responsible for opt-in email advertising, also needs to be aware of the email change and update its customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change without expecting a response. They are stating a fact, not asking a question. Interested parties can choose if and how to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, defined by Martin Fowler of ThoughtWorks in his insightful post, What do you mean by “Event-Driven”?. Changes to a piece of data can be thought of as a state change event — events that contain details of the data that changed. Coincidentally, Fowler uses a customer’s address change as an example of Event-Carried State Transfer in the post. Fellow former ThoughtWorker Graham Brooks also detailed the concept in his post, Event-Carried State Transfer Pattern.

Consistency Strategies

Multiple architectural approaches can be taken to solve for data consistency in a distributed system. For example, you could use a single relational database with shared schemas to persist data, avoiding the distributed data model altogether. However, it could be argued that using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages persist in Kafka, the service has the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity to the system.

In this post’s somewhat simplistic architecture, the business microservices will maintain consistency across their respective domains by producing and consuming messages from multiple Kafka topics to which they are subscribed. Kafka Producers may also be Consumers within our domain.

Storefront Example

In this post, our online storefront API will be built in Java using Spring Boot and OpenJDK 16. We will ensure the uniformity of distributed data by using a publish/subscribe model with Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot microservice, if appropriate, that state change will trigger a state change event, which will be shared with other microservices using Kafka topics.

View of the Storefront API from Kiali

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from one another while still ensuring the data is distributed.

Given the use case of placing an order, we will examine the interactions of three services that compose our storefront API: the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other in a completely decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at three event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service.

Below is a view of the online storefront through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you an idea of where Kafka and Zookeeper, Kafka’s current cluster manager, might sit in a typical, highly-available, microservice-based, distributed application platform.

This post will focus on the storefront’s backend API — its services, databases, and messaging sub-systems.

Storefront Microservices

We will explore the functionality of each of the three microservices and how they share state change events using Kafka 2.8. Each storefront API service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream are not part of this post.

Source Code

The storefront’s microservices source code is publicly available on GitHub. The four GitHub projects can be cloned using the following commands:

git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-accounts.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-orders.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo-fulfillment.git
git clone --branch 2021-istio \
--single-branch --depth 1 \
https://github.com/garystafford/storefront-demo.git

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. Below we see the representation of a Customer, as a BSON document in the customer.accounts MongoDB database collection.

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

Since the CustomerChangeEvent domain event object does not persist in MongoDB, we can look at its JSON message payload in Kafka to examine its structure. Note the differences in the data structure (schema) between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload.

{
"id": "5b189af9a8d05613315b0212",
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}, {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}]
}

For simplicity, we will assume that other services do not make changes to the customer’s name, contact information, or addresses — this is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub. Use the latest 2021-istio branch of the project.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows:

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. The FulfillmentRequestEvent object only contains the information it needs to share. Our example shares a single Order, along with the customer’s name, contact information, and shipping address.

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at its JSON message payload in Kafka. Again, note the schema differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka.

{
"timestamp": 1528334218821,
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"address": {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
"order": {
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvents": [{
"timestamp": 1528333926586,
"orderStatusType": "CREATED",
"note": null
}, {
"timestamp": 1528333926586,
"orderStatusType": "APPROVED",
"note": null
}],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": 11.99
},
"quantity": 5
}]
}
}

Source code for the Orders service is available on GitHub. Use the latest 2021-istio branch of the project.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store the latest shipping status, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping address are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object represented as a BSON document in the fulfillment.requests database collection looks as follows:

{
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"),
"timestamp": NumberLong("1528553706260"),
"name": {
"title": "Ms.",
"firstName": "Susan",
"lastName": "Blackstone"
},
"contact": {
"primaryPhone": "433-544-6555",
"secondaryPhone": "223-445-6767",
"email": "susan.m.blackstone@emailisus.com"
},
"address": {
"type": "SHIPPING",
"description": "Home Sweet Home",
"address1": "33 Oak Avenue",
"city": "Nowhere",
"state": "VT",
"postalCode": "444556-9090"
},
"order": {
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44",
"orderStatusEvents": [{
"timestamp": NumberLong("1528558453686"),
"orderStatusType": "RECEIVED"
}],
"orderItems": [{
"product": {
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa",
"title": "Purple Widget"
},
"quantity": 2
},
{
"product": {
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897",
"title": "Blue Widget"
},
"quantity": 5
},
{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget"
},
"quantity": 2
}
]
},
"shippingMethod": "Drone",
"_class": "com.storefront.model.Fulfillment"
}

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, again, we can again look at its JSON message payload in Kafka.

{
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvent": {
"timestamp": 1528334452746,
"orderStatusType": "PROCESSING",
"note": null
}
}

Source code for the Fulfillment service is available on GitHub. Use the latest 2021-istio branch of the project.

State Change Event Messaging Flows

There are three state change event messaging flows illustrated in this post.

  1. Changes to a Customer triggers an event message produced by the Accounts service, which is published on the accounts.customer.change Kafka topic and consumed by the Orders service;
  2. Order Approved triggers an event message produced by the Orders service, which is published on the orders.order.fulfill Kafka topic, and is consumed by the Fulfillment service;
  3. Changes to the status of an Order triggers an event message produced by the Fulfillment Service, which is published on the fulfillment.order.change Kafka topic, and is consumed by the Orders service;

Each of these state change event messaging flows follows the same architectural pattern on both the Kafka topic’s producer and consumer sides.

Let us examine each state change event messaging flow and the code behind it.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. By way of Kafka, it can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information.

There are different methods to trigger a message to be sent to Kafka. For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity.

@Slf4j
@Controller
public class AfterSaveListener extends AbstractMongoEventListener<Customer> {
@Value("${spring.kafka.topic.accounts-customer}")
private String topic;
private Sender sender;
@Autowired
public AfterSaveListener(Sender sender) {
this.sender = sender;
}
@Override
public void onAfterSave(AfterSaveEvent<Customer> event) {
log.info("onAfterSave event='{}'", event);
Customer customer = event.getSource();
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent();
customerChangeEvent.setId(customer.getId());
customerChangeEvent.setName(customer.getName());
customerChangeEvent.setContact(customer.getContact());
customerChangeEvent.setAddresses(customer.getAddresses());
sender.send(topic, customerChangeEvent);
}
}

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate;
public void send(String topic, CustomerChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender uses a KafkaTemplate to send the message to the accounts.customer.change Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

The Orders service’s Receiver class consumes the CustomerChangeEvent messages produced by the Accounts service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Orders service’s Receiver class is configured differently compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as input, denoting the object type into which the message payload needs to be deserialized. This way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file in each service’s resources directory contains the Kafka configuration (lines 11–19).

server:
port: 8080
spring:
main:
allow-bean-definition-overriding: true
application:
name: orders
data:
mongodb:
uri: mongodb://mongo:27017/orders
kafka:
bootstrap-servers: kafka:9092
topic:
accounts-customer: accounts.customer.change
orders-order: orders.order.fulfill
fulfillment-order: fulfillment.order.change
consumer:
group-id: orders
auto-offset-reset: earliest
zipkin:
sender:
type: kafka
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: INFO
spring:
config:
activate:
on-profile: local
data:
mongodb:
uri: mongodb://localhost:27017/orders
kafka:
bootstrap-servers: localhost:9092
server:
port: 8090
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
spring:
config:
activate:
on-profile: confluent
server:
port: 8080
logging:
level:
root: INFO
server:
port: 8080
spring:
config:
activate:
on-profile: minikube
data:
mongodb:
uri: mongodb://mongo.dev:27017/orders
kafka:
bootstrap-servers: kafka-cluster.dev:9092
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG

Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the orders.order.fulfill Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate;
public void send(String topic, FulfillmentRequestEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender class uses a KafkaTemplate to send the message to the orders.order.fulfill Kafka topic, as shown below. Since message order is not critical, messages can be sent to a topic with multiple partitions if the volume of messages required it.

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. The Fulfillment object includes the order to be fulfilled and the customer’s contact and shipping information.

@Slf4j
@Component
public class Receiver {
@Autowired
private FulfillmentRepository fulfillmentRepository;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.orders-order}")
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) {
log.info("received payload='{}'", fulfillmentRequestEvent.toString());
latch.countDown();
Fulfillment fulfillment = new Fulfillment();
fulfillment.setId(fulfillmentRequestEvent.getId());
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp());
fulfillment.setName(fulfillmentRequestEvent.getName());
fulfillment.setContact(fulfillmentRequestEvent.getContact());
fulfillment.setAddress(fulfillmentRequestEvent.getAddress());
fulfillment.setOrder(fulfillmentRequestEvent.getOrder());
fulfillmentRepository.save(fulfillment);
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(FulfillmentRequestEvent.class));
}
@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Fulfillment Order Status State Change

When the Order status in a Fulfillment entity is changed to anything other than Approved, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial Created status to the final Received status.

The Fulfillment service exposes several endpoints via the FulfillmentController class, which simulates a change in order status. They allow an order’s status to be changed from Approved to Processing, to Shipped, to In Transit, and finally to Received. This change applies to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates a Kafka message, containing the OrderStatusChangeEvent in the message payload. The Fulfillment service’s Sender class handles this.

Note in this example that these two events are not handled in an atomic transaction. Either updating the database or sending the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure that both these independent actions succeed or fail as a single transaction to ensure data consistency, using any of a handful of common architectural patterns.

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;
public void send(String topic, OrderStatusChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The SenderConfig class handles the configuration of the Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services.

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender class uses a KafkaTemplate to send the message to the fulfillment.order.change Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages can be sent to a topic with multiple partitions if the volume of messages requires it.

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message produced by the Fulfillment service.

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service receives messages from more than one topic. The ReceiverConfig class deserializes all messages using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory class’s setMessageConverter method, which allows for dynamic object type matching.

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, we will review how to deploy and run the storefront API components into a local development environment running on Kubernetes with Istio, using Minikube. To provide operational visibility, we will add observability tools, like Yahoo’s CMAK (Cluster Manager for Apache Kafka), Mongo Express, Kiali, Prometheus, and Grafana to our system.

View of the Storefront API from Kiali

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

1 Comment

Java Development with Microsoft SQL Server: Calling Microsoft SQL Server Stored Procedures from Java Applications Using JDBC

Introduction

Enterprise software solutions often combine multiple technology platforms. Accessing an Oracle database via a Microsoft .NET application and vice-versa, accessing Microsoft SQL Server from a Java-based application is common. In this post, we will explore the use of the JDBC (Java Database Connectivity) API to call stored procedures from a Microsoft SQL Server 2017 database and return data to a Java 11-based console application.

View of the post’s Java project from JetBrains’ IntelliJ IDE

The objectives of this post include:

  • Demonstrate the differences between using static SQL statements and stored procedures to return data.
  • Demonstrate three types of JDBC statements to return data: Statement, PreparedStatement, and CallableStatement.
  • Demonstrate how to call stored procedures with input and output parameters.
  • Demonstrate how to return single values and a result set from a database using stored procedures.

Why Stored Procedures?

To access data, many enterprise software organizations require their developers to call stored procedures within their code as opposed to executing static T-SQL (Transact-SQL) statements against the database. There are several reasons stored procedures are preferred:

  • Optimization: Stored procedures are often written by DBAs or database developers who specialize in database development. They understand the best way to construct queries for optimal performance and minimal load on the database server. Think of it as a developer using an API to interact with the database.
  • Safety and Security: Stored procedures are considered safer and more secure than static SQL statements. The stored procedure provides tight control over the content of the queries, preventing malicious or unintentionally destructive code from being executed against the database.
  • Error Handling: Stored procedures can contain logic for handling errors before they bubble up to the application layer and possibly to the end-user.

AdventureWorks 2017 Database

For brevity, I will use an existing and well-known Microsoft SQL Server database, AdventureWorks. The AdventureWorks database was originally published by Microsoft for SQL Server 2008. Although a bit dated architecturally, the database comes prepopulated with plenty of data for demonstration purposes.

The HumanResources schema, one of five schemas within the AdventureWorks database

For the demonstration, I have created an Amazon RDS for SQL Server 2017 Express Edition instance on AWS. You have several options for deploying SQL Server, including AWS, Microsoft Azure, Google Cloud, or installed on your local workstation.

There are many methods to deploy the AdventureWorks database to Microsoft SQL Server. For this post’s demonstration, I used the AdventureWorks2017.bak backup file, which I copied to Amazon S3. Then, I enabled and configured the native backup and restore feature of Amazon RDS for SQL Server to import and install the backup.

DROP DATABASE IF EXISTS AdventureWorks;
GO

EXECUTE msdb.dbo.rds_restore_database
@restore_db_name='AdventureWorks',
@s3_arn_to_restore_from='arn:aws:s3:::my-bucket/AdventureWorks2017.bak',
@type='FULL',
@with_norecovery=0;

-- get task_id from output (e.g. 1)

EXECUTE msdb.dbo.rds_task_status
@db_name='AdventureWorks',
@task_id=1;

Install Stored Procedures

For the demonstration, I have added four stored procedures to the AdventureWorks database to use in this post. To follow along, you will need to install these stored procedures, which are included in the GitHub project.

View of the new stored procedures from JetBrains’ IntelliJ IDE Database tab

Data Sources, Connections, and Properties

Using the latest Microsoft JDBC Driver 8.4 for SQL Server (ver. 8.4.1.jre11), we create a SQL Server data source, com.microsoft.sqlserver.jdbc.SQLServerDataSource, and database connection, java.sql.Connection. There are several patterns for creating and working with JDBC data sources and connections. This post does not necessarily focus on the best practices for creating or using either. In this example, the application instantiates a connection class, SqlConnection.java, which in turn instantiates the java.sql.Connection and com.microsoft.sqlserver.jdbc.SQLServerDataSource objects. The data source’s properties are supplied from an instance of a singleton class, ProjectProperties.java. This properties class instantiates the java.util.Properties class, which reads values from a configuration properties file, config.properties. On startup, the application creates the database connection, calls each of the example methods, and then closes the connection.

Examples

For each example, I will show the stored procedure, if applicable, followed by the Java method that calls the procedure or executes the static SQL statement. I have left out the data source and connection code in the article. Again, a complete copy of all the code for this article is available on GitHub, including Java source code, SQL statements, helper SQL scripts, and a set of basic JUnit tests.

To run the JUnit unit tests, using Gradle, which the project is based on, use the ./gradlew cleanTest test --warning-mode none command.

A successful run of the JUnit tests

To build and run the application, using Gradle, which the project is based on, use the ./gradlew run --warning-mode none command.

The output of the Java console application

Example 1: SQL Statement

Before jumping into stored procedures, we will start with a simple static SQL statement. This example’s method, getAverageProductWeightST, uses the java.sql.Statement class. According to Oracle’s JDBC documentation, the Statement object is used for executing a static SQL statement and returning the results it produces. This SQL statement calculates the average weight of all products in the AdventureWorks database. It returns a solitary double numeric value. This example demonstrates one of the simplest methods for returning data from SQL Server.

/**
* Statement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightST() {
double averageWeight = 0;
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.getConnection().createStatement();
String sql = "WITH Weights_CTE(AverageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2)" +
"FROM [Weights_CTE];";
rs = stmt.executeQuery(sql);
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 2: Prepared Statement

Next, we will execute almost the same static SQL statement as in Example 1. The only change is the addition of the column name, averageWeight. This allows us to parse the results by column name, making the code easier to understand as opposed to using the numeric index of the column as in Example 1.

Also, instead of using the java.sql.Statement class, we use the java.sql.PreparedStatement class. According to Oracle’s documentation, a SQL statement is precompiled and stored in a PreparedStatement object. This object can then be used to execute this statement multiple times efficiently.

/**
* PreparedStatement example, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightPS() {
double averageWeight = 0;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
String sql = "WITH Weights_CTE(averageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2) AS [averageWeight]" +
"FROM [Weights_CTE];";
pstmt = connection.getConnection().prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
averageWeight = rs.getDouble("averageWeight");
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 3: Callable Statement

In this example, the average product weight query has been moved into a stored procedure. The procedure is identical in functionality to the static statement in the first two examples. To call the stored procedure, we use the java.sql.CallableStatement class. According to Oracle’s documentation, the CallableStatement extends PreparedStatement. It is the interface used to execute SQL stored procedures. The CallableStatement accepts both input and output parameters; however, this simple example does not use either. Like the previous two examples, the procedure returns a double numeric value.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeight]
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement, no parameters, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
ResultSet rs = null;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeight]}");
cstmt.execute();
rs = cstmt.getResultSet();
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 4: Calling a Stored Procedure with an Output Parameter

In this example, we use almost the same stored procedure as in Example 3. The only difference is the inclusion of an output parameter. This time, instead of returning a result set with a value in a single unnamed column, the column has a name, averageWeight. We can now call that column by name when retrieving the value.

The stored procedure patterns found in Examples 3 and 4 are both commonly used. One procedure uses an output parameter, and one not, both return the same value(s). You can use the CallableStatement to for either type.

CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeightOUT]@averageWeight DECIMAL(8, 2) OUT
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT @averageWeight = ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) output parameter, returns Integer
*
*
@return Average weight of all products
*/
public double getAverageProductWeightOutCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeightOUT](?)}");
cstmt.registerOutParameter("averageWeight", Types.DECIMAL);
cstmt.execute();
averageWeight = cstmt.getDouble("averageWeight");
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}

Example 5: Calling a Stored Procedure with an Input Parameter

In this example, the procedure returns a result set, java.sql.ResultSet, of employees whose last name starts with a particular sequence of characters (e.g., ‘M’ or ‘Sa’). The sequence of characters is passed as an input parameter, lastNameStartsWith, to the stored procedure using the CallableStatement.

The method making the call iterates through the rows of the result set returned by the stored procedure, concatenating multiple columns to form the employee’s full name as a string. Each full name string is then added to an ordered collection of strings, a List<String> object. The List instance is returned by the method. You will notice this procedure takes a little longer to run because of the use of the LIKE operator. The database server has to perform pattern matching on each last name value in the table to determine the result set.

CREATE OR
ALTER PROCEDURE [HumanResources].[uspGetEmployeesByLastName]
@lastNameStartsWith VARCHAR(20) = 'A'
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[FirstName], p.[MiddleName], p.[LastName], p.[Suffix], e.[JobTitle], m.[EmailAddress]
FROM [HumanResources].[Employee] AS e
LEFT JOIN [Person].[Person] p ON e.[BusinessEntityID] = p.[BusinessEntityID]
LEFT JOIN [Person].[EmailAddress] m ON e.[BusinessEntityID] = m.[BusinessEntityID]
WHERE e.[CurrentFlag] = 1
AND p.[PersonType] = 'EM'
AND p.[LastName] LIKE @lastNameStartsWith + '%'
ORDER BY p.[LastName], p.[FirstName], p.[MiddleName]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (1) input parameter, returns ResultSet
*
*
@param lastNameStartsWith
*
@return List of employee names
*/
public List<String> getEmployeesByLastNameCS(String lastNameStartsWith) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<String> employeeFullName = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [HumanResources].[uspGetEmployeesByLastName](?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("lastNameStartsWith", lastNameStartsWith);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
employeeFullName.add(
rs.getString("LastName") + ", "
+ rs.getString("FirstName") + " "
+ rs.getString("MiddleName"));
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return employeeFullName;
}

Example 6: Converting a Result Set to Ordered Collection of Objects

In this last example, we pass two input parameters, productColor and productSize, to a slightly more complex stored procedure. The stored procedure returns a result set containing several columns of product information. This time, the example’s method iterates through the result set returned by the procedure and constructs an ordered collection of products, List<Product> object. The Product objects in the list are instances of the Product.java POJO class. The method converts each results set’s row’s field value into a Product property (e.g., Product.Size, Product.Model). Using a collection is a common method for persisting data from a result set in an application.

CREATE OR
ALTER PROCEDURE [Production].[uspGetProductsByColorAndSize]
@productColor VARCHAR(20),
@productSize INTEGER
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[ProductNumber], m.[Name] AS [Model], p.[Name] AS [Product], p.[Color], p.[Size]
FROM [Production].[ProductModel] AS m
INNER JOIN
[Production].[Product] AS p ON m.[ProductModelID] = p.[ProductModelID]
WHERE (p.[Color] = @productColor)
AND (p.[Size] = @productSize)
ORDER BY p.[ProductNumber], [Model], [Product]
END
GO

The calling Java method is shown below.

/**
* CallableStatement example, (2) input parameters, returns ResultSet
*
*
@param color
*
@param size
*
@return List of Product objects
*/
public List<Product> getProductsByColorAndSizeCS(String color, String size) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<Product> productList = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetProductsByColorAndSize](?, ?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("productColor", color);
cstmt.setString("productSize", size);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
Product product = new Product(
rs.getString("Product"),
rs.getString("ProductNumber"),
rs.getString("Color"),
rs.getString("Size"),
rs.getString("Model"));
productList.add(product);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return productList;
}

Proper T-SQL: Schema Reference and Brackets

You will notice in all T-SQL statements, I refer to the schema as well as the table or stored procedure name (e.g., {call [Production].[uspGetAverageProductWeightOUT](?)}). According to Microsoft, it is always good practice to refer to database objects by a schema name and the object name, separated by a period; that even includes the default schema (e.g., dbo).

You will also notice I wrap the schema and object names in square brackets (e.g., SELECT [ProductNumber] FROM [Production].[ProductModel]). The square brackets are to indicate that the name represents an object and not a reserved word (e.g, CURRENT or NATIONAL). By default, SQL Server adds these to make sure the scripts it generates run correctly.

Running the Examples

The application will display the name of the method being called, a description, the duration of time it took to retrieve the data, and the results returned by the method.

package com.article.examples;
import java.util.List;
/**
* Main class that calls all example methods
*
* @author Gary A. Stafford
*/
public class RunExamples {
private static final Examples examples = new Examples();
private static final ProcessTimer timer = new ProcessTimer();
/**
* @param args the command line arguments
* @throws Exception
*/
public static void main(String[] args) throws Exception {
System.out.println();
System.out.println("SQL SERVER STATEMENT EXAMPLES");
System.out.println("======================================");
// Statement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
double averageWeight = examples.getAverageProductWeightST();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightST");
System.out.println("Description: Statement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("—");
// PreparedStatement example, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightPS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightPS");
System.out.println("Description: PreparedStatement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("—");
// CallableStatement, no parameters, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightCS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightCS");
System.out.println("Description: CallableStatement, no parameters, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.println("—");
// CallableStatement example, (1) output parameter, returns Integer
timer.setStartTime(System.nanoTime());
averageWeight = examples.getAverageProductWeightOutCS();
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetAverageProductWeightOutCS");
System.out.println("Description: CallableStatement, (1) output parameter, returns Integer");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Average product weight (lb): %s%n", averageWeight);
System.out.println("—");
// CallableStatement example, (1) input parameter, returns ResultSet
timer.setStartTime(System.nanoTime());
String lastNameStartsWith = "Sa";
List<String> employeeFullName =
examples.getEmployeesByLastNameCS(lastNameStartsWith);
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetEmployeesByLastNameCS");
System.out.println("Description: CallableStatement, (1) input parameter, returns ResultSet");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
System.out.printf("Results: Last names starting with '%s': %d%n", lastNameStartsWith, employeeFullName.size());
if (employeeFullName.size() > 0) {
System.out.printf(" Last employee found: %s%n", employeeFullName.get(employeeFullName.size() – 1));
} else {
System.out.printf("No employees found with last name starting with '%s'%n", lastNameStartsWith);
}
System.out.println("—");
// CallableStatement example, (2) input parameters, returns ResultSet
timer.setStartTime(System.nanoTime());
String color = "Red";
String size = "44";
List<Product> productList =
examples.getProductsByColorAndSizeCS(color, size);
timer.setEndTime(System.nanoTime());
System.out.println("Method: GetProductsByColorAndSizeCS");
System.out.println("Description: CallableStatement, (2) input parameter, returns ResultSet");
System.out.printf("Duration (ms): %d%n", timer.getDuration());
if (productList.size() > 0) {
System.out.printf("Results: Products found (color: '%s', size: '%s'): %d%n", color, size, productList.size());
System.out.printf(" First product: %s (%s)%n", productList.get(0).getProduct(), productList.get(0).getProductNumber());
} else {
System.out.printf("No products found with color '%s' and size '%s'%n", color, size);
}
System.out.println("—");
examples.closeConnection();
}
}

Below, we see the results.

SQL Statement Performance

This post is certainly not about SQL performance, demonstrated by the fact I am only using Amazon RDS for SQL Server 2017 Express Edition on a single, very underpowered db.t2.micro Amazon RDS instance types. However, I have added a timer feature, ProcessTimer.java class, to capture the duration of time each example takes to return data, measured in milliseconds. The ProcessTimer.java class is part of the project code. Using the timer, you should observe significant differences between the first run and proceeding runs of the application for several of the called methods. The time difference is a result of several factors, primarily pre-compilation of the SQL statements and SQL Server plan caching.

The effects of these two factors are easily demonstrated by clearing the SQL Server plan cache (see SQL script below) using DBCC (Database Console Commands) statements. and then running the application twice in a row. The second time, pre-compilation and plan caching should result in significantly faster times for the prepared statements and callable statements, in Examples 2–6. In the two random runs shown below, we see up to a 497% improvement in query time.

USE AdventureWorks;
DBCC FREESYSTEMCACHE('SQL Plans');
GO
CHECKPOINT;
GO
-- Impossible to run with Amazon RDS for Microsoft SQL Server on AWS
-- DBCC DROPCLEANBUFFERS;
-- GO

The first run results are shown below.

SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 122
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 146
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 72
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 623
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 830
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 427
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---

The second run results are shown below.

SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 116
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 89
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 80
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 340
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 139
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 208
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---

Conclusion

This post has demonstrated several methods for querying and calling stored procedures from a SQL Server 2017 database using JDBC with the Microsoft JDBC Driver 8.4 for SQL Server. Although the examples are quite simple, the same patterns can be used with more complex stored procedures, with multiple input and output parameters, which not only select, but insert, update, and delete data.

There are some limitations of the Microsoft JDBC Driver for SQL Server you should be aware of by reading the documentation. However, for most tasks that require database interaction, the Driver provides adequate functionality with SQL Server.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

Leave a comment

Updating and Maintaining Gradle Project Dependencies

As a DevOps Consultant, I often encounter codebases that have not been properly kept up-to-date. Likewise, I’ve authored many open-source projects on GitHub, which I use for training, presentations, and articles. Those projects often sit dormant for months at a time, #myabandonware.

Poorly maintained and dormant projects often become brittle or break, as their dependencies and indirect dependencies continue to be updated. However, blindly updating project dependencies is often the quickest way to break, or further break an application. Ask me, I’ve given in to temptation and broken my fair share of applications as a result. Nonetheless, it is helpful to be able to quickly analyze a project’s dependencies and discover available updates. Defects, performance issues, and most importantly, security vulnerabilities, are often fixed with dependency updates.

For Node.js projects, I prefer David to discover dependency updates. I have other favorites for Ruby, .NET, and Python, including OWASP Dependency-Check, great for vulnerabilities. In a similar vein, for Gradle-based Java Spring projects, I recently discovered Ben Manes’ Gradle Versions Plugin, gradle-versions-plugin. The plugin is described as a ‘Gradle plugin to discover dependency updates’. The plugin’s GitHub project has over 1,350 stars! According to the plugin project’s README file, this plugin is similar to the Versions Maven Plugin. The project further indicates there are similar Gradle plugins available, including gradle-use-latest-versionsgradle-libraries-plugin, and gradle-update-notifier.

To try the Gradle Versions Plugin, I chose a recent Gradle-based Java Spring Boot API project. I added the plugin to the gradle.build file with a single line of code.

plugins {
  id 'com.github.ben-manes.versions' version '0.17.0'
}

By executing the single Gradle task, dependencyUpdates, the plugin generates a report detailing the status of all project’s dependencies, including plugins. The plugin includes a revision task property, which controls the resolution strategy of determining what constitutes the latest version of a dependency. The property supports three strategies: release, milestone (default), and integration (i.e. SNAPSHOT), which are detailed in the plugin project’s README file.

As expected, the plugin will properly resolve any variables. Using a variable is an efficient practice for setting the Spring Boot versions for multiple dependencies (i.e. springBootVersion).

ext {
    springBootVersion = '2.0.1.RELEASE'
}

dependencies {
    compile('com.h2database:h2:1.4.197')
    compile("io.springfox:springfox-swagger-ui:2.8.0")
    compile("io.springfox:springfox-swagger2:2.8.0")
    compile("org.liquibase:liquibase-core:3.5.5")
    compile("org.sonarsource.scanner.gradle:sonarqube-gradle-plugin:2.6.2")
    compile("org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}")
    compile("org.springframework.boot:spring-boot-starter-data-jpa:${springBootVersion}")
    compile("org.springframework.boot:spring-boot-starter-data-rest:${springBootVersion}")
    compile("org.springframework.boot:spring-boot-starter-hateoas:${springBootVersion}")
    compile("org.springframework.boot:spring-boot-starter-web:${springBootVersion}")
    compileOnly('org.projectlombok:lombok:1.16.20')
    runtime("org.postgresql:postgresql:42.2.2")
    testCompile("org.springframework.boot:spring-boot-starter-test:${springBootVersion}")
}

My first run, using the default revision level, resulted in the following output. The report indicated three of my project’s dependencies were slightly out of date:

> Configure project :
Inferred project: spring-postgresql-demo, version: 4.3.0-dev.2.uncommitted+929c56e

> Task :dependencyUpdates
Failed to resolve ::apiElements
Failed to resolve ::implementation
Failed to resolve ::runtimeElements
Failed to resolve ::runtimeOnly
Failed to resolve ::testImplementation
Failed to resolve ::testRuntimeOnly

------------------------------------------------------------
: Project Dependency Updates (report to plain text file)
------------------------------------------------------------

The following dependencies are using the latest milestone version:
- com.github.ben-manes.versions:com.github.ben-manes.versions.gradle.plugin:0.17.0
- com.netflix.nebula:gradle-ospackage-plugin:4.9.0-rc.1
- com.h2database:h2:1.4.197
- io.spring.dependency-management:io.spring.dependency-management.gradle.plugin:1.0.5.RELEASE
- org.projectlombok:lombok:1.16.20
- com.netflix.nebula:nebula-release-plugin:6.3.3
- org.sonarqube:org.sonarqube.gradle.plugin:2.6.2
- org.springframework.boot:org.springframework.boot.gradle.plugin:2.0.1.RELEASE
- org.postgresql:postgresql:42.2.2
- org.sonarsource.scanner.gradle:sonarqube-gradle-plugin:2.6.2
- org.springframework.boot:spring-boot-starter-actuator:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-data-jpa:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-data-rest:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-hateoas:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-test:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-web:2.0.1.RELEASE

The following dependencies have later milestone versions:
- org.liquibase:liquibase-core [3.5.5 -> 3.6.1]
- io.springfox:springfox-swagger-ui [2.8.0 -> 2.9.0]
- io.springfox:springfox-swagger2 [2.8.0 -> 2.9.0]

Generated report file build/dependencyUpdates/report.txt

After reading the release notes for the three available updates, and confident I had sufficient unit, smoke, and integration tests to validate any project changes, I manually updated the dependencies. Re-running the Gradle task generated the following abridged output.

------------------------------------------------------------
: Project Dependency Updates (report to plain text file)
------------------------------------------------------------

The following dependencies are using the latest milestone version:
- com.github.ben-manes.versions:com.github.ben-manes.versions.gradle.plugin:0.17.0
- com.netflix.nebula:gradle-ospackage-plugin:4.9.0-rc.1
- com.h2database:h2:1.4.197
- io.spring.dependency-management:io.spring.dependency-management.gradle.plugin:1.0.5.RELEASE
- org.liquibase:liquibase-core:3.6.1
- org.projectlombok:lombok:1.16.20
- com.netflix.nebula:nebula-release-plugin:6.3.3
- org.sonarqube:org.sonarqube.gradle.plugin:2.6.2
- org.springframework.boot:org.springframework.boot.gradle.plugin:2.0.1.RELEASE
- org.postgresql:postgresql:42.2.2
- org.sonarsource.scanner.gradle:sonarqube-gradle-plugin:2.6.2
- org.springframework.boot:spring-boot-starter-actuator:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-data-jpa:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-data-rest:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-hateoas:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-test:2.0.1.RELEASE
- org.springframework.boot:spring-boot-starter-web:2.0.1.RELEASE
- io.springfox:springfox-swagger-ui:2.9.0
- io.springfox:springfox-swagger2:2.9.0

Generated report file build/dependencyUpdates/report.txt

BUILD SUCCESSFUL in 3s
1 actionable task: 1 executed

After running a series of automated unit, smoke, and integration tests, to confirm no conflicts with the updates, I committed my changes to GitHub. The Gradle Versions Plugin is a simple and effective solution to Gradle dependency management.

All opinions expressed in this post are my own, and not necessarily the views of my current or past employers, or their clients.

Gradle logo courtesy Gradle.org, © Gradle Inc. 

, , , , , , ,

Leave a comment

Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ

RabbitMQEnventCons.png

Introduction

In a recent post, Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ, we moved away from synchronous REST HTTP for inter-process communications (IPC) toward message-based IPC. Moving to asynchronous message-based communications allows us to decouple services from one another. It makes it easier to build, test, and release our individual services. In that post, we did not achieve fully asynchronous communications. Although, we did achieve a higher level of service decoupling using message-based Remote Procedure Call (RPC) IPC.

In this post, we will fully decouple our services using the distributed computing model of eventual consistency. More specifically, we will use a message-based, event-driven, loosely-coupled, eventually consistent architectural approach for communications between services.

What is eventual consistency? One of the best definitions of eventual consistency I have read was posted on microservices.io. To paraphrase, ‘using an event-driven, eventually consistent approach, each service publishes an event whenever it updates its data. Other services subscribe to events. When an event is received, a service updates its data.

Example of Eventual Consistency

Imagine, Service A, the Customer service, inserts a new customer record into its database. Based on that ‘customer created’ event, Service A publishes a message containing the new customer object, serialized to JSON, to the lightweight, persistent, New Customer message queue.

Service B, the Customer Onboarding service, a subscriber to the New Customer queue, consumes and deserializes Service A’s message. Service B may or may not perform a data transformation of the Customer object to its own Customer data model. Service B then inserts the new customer record into its own database.

In the above example, it can be said that the customer records in Service B’s database are eventually consistent with the customer records in Service A’s database. Service A makes a change and publishes a message in response to the event. Service B consumes the message and makes the same change. Eventually (likely within milliseconds), Service B’s customer records are consistent with Service A’s customer records.

Why Eventual Consistency?

So what does this apparent added complexity and duplication of data buy us? Consider the advantages. Service B, the Onboarding service, requires no knowledge of, or a dependency on, Service A, the Customer service. Still, Service B has a current record of all the customers that Service A maintains. Instead of making repeated and potentially costly RESTful HTTP calls or RPC message-based calls to or from Service A to Service B for new customers, Service B queries its database for a list of customers.

The value of eventual consistency increases factorially as you scale a distributed system. Imagine dozens of distinct microservices, many requiring data from other microservices. Further, imagine multiple instances of each of those services all running in parallel. Decoupling services from one another, through asynchronous forms of IPC, messaging, and event-driven eventual consistency greatly simplifies the software development lifecycle and operations.

Demonstration

In this post, we could use a few different architectural patterns to demonstrate message passing with RabbitMQ and Spring AMQP. They including Work Queues, Publish/Subscribe, Routing, or Topics. To keep things as simple as possible, we will have a single Producer, publish messages to a single durable and persistent message queue. We will have a single Subscriber, a Consumer, consume the messages from that queue. We focus on a single type of event message.

Sample Code

To demonstrate Spring AMQP-based messaging with RabbitMQ, we will use a reference set of three Spring Boot microservices. The Election ServiceCandidate Service, and Voter Service are all backed by MongoDB. The services and MongoDB, along with RabbitMQ and Voter API Gateway, are all part of the Voter API.

The Voter API Gateway, based on HAProxy, serves as a common entry point to all three services, as well as serving as a reverse proxy and load balancer. The API Gateway provides round-robin load-balanced access to multiple instances of each service.

Voter_API_Architecture

All the source code found this post’s example is available on GitHub, within a few different project repositories. The Voter Service repository contains the Voter service source code, along with the scripts and Docker Compose files required to deploy the project. The Election Service repository, Candidate Service repository, and Voter API Gateway repository are also available on GitHub. There is also a new AngularJS/Node.js Web Client, to demonstrate how to use the Voter API.

For this post, you only need to clone the Voter Service repository.

Deploying Voter API

All components, including the Spring Boot services, MongoDB, RabbitMQ, API Gateway, and the Web Client, are individually deployed using Docker. Each component is publicly available as a Docker Image, on Docker Hub. The Voter Service repository contains scripts to deploy the entire set of Dockerized components, locally. The repository also contains optional scripts to provision a Docker Swarm, using Docker’s newer swarm mode, and deploy the components. We will only deploy the services locally for this post.

To clone and deploy the components locally, including the Spring Boot services, MongoDB, RabbitMQ, and the API Gateway, execute the following commands. If this is your first time running the commands, it may take a few minutes for your system to download all the required Docker Images from Docker Hub.


git clone –depth 1 –branch rabbitmq \
https://github.com/garystafford/voter-service.git
cd voter-service/scripts-services
sh ./stack_deploy_local.sh

If everything was deployed successfully, you should observe six running Docker containers, similar to the output, below.


CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
32d73282ff3d garystafford/voter-api-gateway:rabbitmq "/docker-entrypoin…" 8 seconds ago Up 5 seconds 0.0.0.0:8080->8080/tcp voterstack_voter-api-gateway_1
1ece438c5da4 garystafford/candidate-service:rabbitmq "java -Dspring.pro…" 10 seconds ago Up 7 seconds 0.0.0.0:8097->8080/tcp voterstack_candidate_1
30391faa3422 garystafford/voter-service:rabbitmq "java -Dspring.pro…" 10 seconds ago Up 7 seconds 0.0.0.0:8099->8080/tcp voterstack_voter_1
35063ccfe706 garystafford/election-service:rabbitmq "java -Dspring.pro…" 12 seconds ago Up 10 seconds 0.0.0.0:8095->8080/tcp voterstack_election_1
23eae86967a2 rabbitmq:management-alpine "docker-entrypoint…" 14 seconds ago Up 11 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp voterstack_rabbitmq_1
7e77ddecddbb mongo:latest "docker-entrypoint…" 24 seconds ago Up 21 seconds 0.0.0.0:27017->27017/tcp voterstack_mongodb_1

Using Voter API

The Voter Service, Election Service, and Candidate Service GitHub repositories each contain README files, which detail all the API endpoints each service exposes, and how to call them.

In addition to casting votes for candidates, the Voter service can simulate election results. Calling the /simulation endpoint, and indicating the desired election, the Voter service will randomly generate a number of votes for each candidate in that election. This will save us the burden of casting votes for this demonstration. However, the Voter service has no knowledge of elections or candidates. The Voter service depends on the Candidate service to obtain a list of candidates.

The Candidate service manages electoral candidates, their political affiliation, and the election in which they are running. Like the Voter service, the Candidate service also has a /simulation endpoint. The service will create a list of candidates based on the 2012 and 2016 US Presidential Elections. The simulation capability of the service saves us the burden of inputting candidates for this demonstration.

The Election service manages elections, their polling dates, and the type of election (federal, state, or local). Like the other services, the Election service also has a /simulation endpoint, which will create a list of sample elections. The Election service will not be discussed in this post’s demonstration. We will examine communications between the Candidate and Voter services, only.

REST HTTP Endpoint

As you recall from our previous post, Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ, the Voter service exposes multiple, almost identical endpoints. Each endpoint uses a different means of IPC to retrieve candidates and generates random votes.

Calling the /voter/simulation/http/{election} endpoint and providing a specific election, prompts the Voter service to request a list of candidates from the Candidate service, based on the election parameter you input. This request is done using synchronous REST HTTP. The Voter service uses the HTTP GET method to request the data from the Candidate service. The Voter service then waits for a response.

The Candidate service receives the HTTP request. The Candidate service responds to the Voter service with a list of candidates in JSON format. The Voter service receives the response payload containing the list of candidates. The Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message-based RPC Endpoint

Similarly, calling the /voter/simulation/rpc/{election} endpoint and providing a specific election, prompts the Voter service to request the same list of candidates. However, this time, the Voter service (the client) produces a request message and places in RabbitMQ’s voter.rpc.requests queue. The Voter service then waits for a response. The Voter service has no direct dependency on the Candidate service; it only depends on a response to its request message. In this way, it is still a form of synchronous IPC, but the Voter service is now decoupled from the Candidate service.

The request message is consumed by the Candidate service (the server), who is listening to that queue. In response, the Candidate service produces a message containing the list of candidates serialized to JSON. The Candidate service (the server) sends a response back to the Voter service (the client) through RabbitMQ. This is done using the Direct reply-to feature of RabbitMQ or using a unique response queue, specified in the reply-to header of the request message, sent by the Voter Service.

The Voter service receives the message containing the list of candidates. The Voter service deserializes the JSON payload to candidate objects. The Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, identical to the previous example, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters database.

New Endpoint

Calling the new /voter/simulation/db/{election} endpoint and providing a specific election, prompts the Voter service to query its own MongoDB database for a list of candidates.

But wait, where did the candidates come from? The Voter service didn’t call the Candidate service? The answer is message-based eventual consistency. Whenever a new candidate is created, using a REST HTTP POST request to the Candidate service’s /candidate/candidates endpoint, a Spring Data Rest Repository Event Handler responds. Responding to the candidate created event, the event handler publishes a message, containing a serialized JSON representation of the new candidate object, to a durable and persistent RabbitMQ queue.

The Voter service is listening to that queue. The Voter service consumes messages off the queue, deserializes the candidate object, and saves it to its own voters database, to the candidate collection. For this example, we are saving the incoming candidate object as is, with no transformations. The candidate object model for both services is identical.

When /voter/simulation/db/{election} endpoint is called, the Voter service queries its voters database for a list of candidates. They Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, identical to the previous two examples, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message_Queue_Diagram_Final3B

Exploring the Code

We will not review the REST HTTP or RPC IPC code in this post. It was covered in detail, in the previous post. Instead, we will explore the new code required for eventual consistency.

Spring Dependencies

To use AMQP with RabbitMQ, we need to add a project dependency on org.springframework.boot.spring-boot-starter-amqp. Below is a snippet from the Candidate service’s build.gradle file, showing project dependencies. The Voter service’s dependencies are identical.


dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-actuator-docs'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-rest'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-hateoas'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-logging'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web'
compile group: 'org.webjars', name: 'hal-browser'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test'
}

view raw

build.gradle

hosted with ❤ by GitHub

AMQP Configuration

Next, we need to add a small amount of RabbitMQ AMQP configuration to both services. We accomplish this by using Spring’s @Configuration annotation on our configuration classes. Below is the abridged configuration class for the Voter service.


package com.voterapi.voter.configuration;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class VoterConfig {
@Bean
public Queue candidateQueue() {
return new Queue("candidates.queue");
}
}

And here, the abridged configuration class for the Candidate service.


package com.voterapi.candidate.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CandidateConfig {
@Bean
public Queue candidateQueue() {
return new Queue("candidates.queue");
}
}

Event Handler

With our dependencies and configuration in place, we will define the CandidateEventHandler class. This class is annotated with the Spring Data Rest @RepositoryEventHandler and Spring’s @Component. The @Component annotation ensures the event handler is registered.

The class contains the handleCandidateSave method, which is annotated with the Spring Data Rest @HandleAfterCreate. The event handler acts on the Candidate object, which is the first parameter in the method signature.

Responding to the candidate created event, the event handler publishes a message, containing a serialized JSON representation of the new candidate object, to the candidates.queue queue. This was the queue we configured earlier.


package com.voterapi.candidate.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.voterapi.candidate.domain.Candidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.rest.core.annotation.HandleAfterCreate;
import org.springframework.data.rest.core.annotation.RepositoryEventHandler;
import org.springframework.stereotype.Component;
@Component
@RepositoryEventHandler
public class CandidateEventHandler {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private RabbitTemplate rabbitTemplate;
private Queue candidateQueue;
@Autowired
public CandidateEventHandler(RabbitTemplate rabbitTemplate, Queue candidateQueue) {
this.rabbitTemplate = rabbitTemplate;
this.candidateQueue = candidateQueue;
}
@HandleAfterCreate
public void handleCandidateSave(Candidate candidate) {
sendMessage(candidate);
}
private void sendMessage(Candidate candidate) {
rabbitTemplate.convertAndSend(
candidateQueue.getName(), serializeToJson(candidate));
}
private String serializeToJson(Candidate candidate) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(candidate);
} catch (JsonProcessingException e) {
logger.info(String.valueOf(e));
}
logger.debug("Serialized message payload: {}", jsonInString);
return jsonInString;
}
}

Consuming Messages

Next, we let’s switch to the Voter service’s CandidateListService class. Below is an abridged version of the class with two new methods. First, the getCandidateMessage method listens to the candidates.queue queue. This was the queue we configured earlier. The method is annotated with theSpring AMQP Rabbit @RabbitListener annotation.

The getCandidateMessage retrieves the new candidate object from the message, deserializes the message’s JSON payload, maps it to the candidate object model and saves it to the Voter service’s database.

The second method, getCandidatesQueueDb, retrieves the candidates from the Voter service’s database. The method makes use of the Spring Data MongoDB Aggregation package to return a list of candidates from MongoDB.


/**
* Consumes a new candidate message, deserializes, and save to MongoDB
* @param candidateMessage
*/
@RabbitListener(queues = "#{candidateQueue.name}")
public void getCandidateMessage(String candidateMessage) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
TypeReference<Candidate> mapType = new TypeReference<Candidate>() {};
Candidate candidate = null;
try {
candidate = objectMapper.readValue(candidateMessage, mapType);
} catch (IOException e) {
logger.info(String.valueOf(e));
}
candidateRepository.save(candidate);
logger.debug("Candidate {} saved to MongoDB", candidate.toString());
}
/**
* Retrieves candidates from MongoDB and transforms to voter view
* @param election
* @return List of candidates
*/
public List<CandidateVoterView> getCandidatesQueueDb(String election) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("election").is(election)),
project("firstName", "lastName", "politicalParty", "election")
.andExpression("concat(firstName,' ', lastName)")
.as("fullName"),
sort(Sort.Direction.ASC, "lastName")
);
AggregationResults<CandidateVoterView> groupResults
= mongoTemplate.aggregate(aggregation, Candidate.class, CandidateVoterView.class);
return groupResults.getMappedResults();
}

RabbitMQ Management Console

The easiest way to observe what is happening with the messages is using the RabbitMQ Management Console. To access the console, point your web browser to localhost, on port 15672. The default login credentials for the console are guest/guest. As you successfully produce and consume messages with RabbitMQ, you should see activity on the Overview tab.

RabbitMQ_EC_Durable3.png

Recall we said the queue, in this example, was durable. That means messages will survive the RabbitMQ broker stopping and starting. In the below view of the RabbitMQ Management Console, note the six messages persisted in memory. The Candidate service produced the messages in response to six new candidates being created. However, the Voter service was not running, and therefore, could not consume the messages. In addition, the RabbitMQ server was restarted, after receiving the candidate messages. The messages were persisted and still present in the queue after the successful reboot of RabbitMQ.

RabbitMQ_EC_Durable

Once RabbitMQ and the Voter service instance were back online, the Voter service successfully consumed the six waiting messages from the queue.

RabbitMQ_EC_Durable2.png

Service Logs

In addition to using the RabbitMQ Management Console, we may obverse communications between the two services by looking at the Voter and Candidate service’s logs. I have grabbed a snippet of both service’s logs and added a few comments to show where different processes are being executed.

First the Candidate service logs. We observe a REST HTTP POST request containing a new candidate. We then observe the creation of the new candidate object in the Candidate service’s database, followed by the event handler publishing a message on the queue. Finally, we observe the response is returned in reply to the initial REST HTTP POST request.


# REST HTTP POST Request received
2017-05-11 22:43:46.667 DEBUG 18702 — [nio-8097-exec-5] o.a.coyote.http11.Http11InputBuffer : Received [POST /candidate/candidates HTTP/1.1
Host: localhost:8097
User-Agent: HTTPie/0.9.8
Accept-Encoding: gzip, deflate
Accept: application/json, */*
Connection: keep-alive
Content-Type: application/json
Content-Length: 127
{"firstName": "Hillary", "lastName": "Clinton", "politicalParty": "Democratic Party", "election": "2016 Presidential Election"}]
2017-05-11 22:43:46.667 DEBUG 18702 — [nio-8097-exec-5] o.a.c.authenticator.AuthenticatorBase : Security checking request POST /candidate/candidates
# Inserting new Candidate into database
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] o.s.data.mongodb.core.MongoTemplate : Inserting DBObject containing fields: [_class, _id, firstName, lastName, politicalParty, election] in collection: candidate
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[candidates]
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] org.mongodb.driver.protocol.insert : Inserting 1 documents into namespace candidates.candidate on connection [connectionId{localValue:2, serverValue:147}] to server localhost:27017
2017-05-11 22:43:46.677 DEBUG 18702 — [nio-8097-exec-5] org.mongodb.driver.protocol.insert : Insert completed
# Publishing message on queue
2017-05-11 22:43:46.678 DEBUG 18702 — [nio-8097-exec-5] o.s.d.r.c.e.AnnotatedEventHandlerInvoker : Invoking AfterCreateEvent handler for Hillary Clinton (Democratic Party).
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] c.v.c.service.CandidateEventHandler : Serialized message payload: {"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@1bfb15f5 Shared Rabbit Connection: SimpleConnection@37d1ba14 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59422]
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message on exchange [], routingKey = [candidates.queue]
# Response to HTTP POST
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'persistentEntities'
2017-05-11 22:43:46.681 DEBUG 18702 — [nio-8097-exec-5] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'org.springframework.boot.actuate.autoconfigure.EndpointWebMvcHypermediaManagementContextConfiguration$ActuatorEndpointLinksAdvice'
2017-05-11 22:43:46.682 DEBUG 18702 — [nio-8097-exec-5] s.d.r.w.j.PersistentEntityJackson2Module : Serializing PersistentEntity org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity@1a4d1ab7.
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.w.s.m.m.a.HttpEntityMethodProcessor : Written [Resource { content: Hillary Clinton (Democratic Party), links: [<http://localhost:8097/candidate/candidates/591521621162e1490eb0d537&gt;;rel="self", <http://localhost:8097/candidate/candidates/591521621162e1490eb0d537{?projection}>;rel="candidate"] }] as "application/json" using [org.springframework.data.rest.webmvc.config.RepositoryRestMvcConfiguration$ResourceSupportHttpMessageConverter@27329d2a]
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.web.servlet.DispatcherServlet : Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.web.servlet.DispatcherServlet : Successfully completed request

Now the Voter service logs. At the exact same second as the message and the response sent by the Candidate service, the Voter service consumes the message off the queue. The Voter service then deserializes the new candidate object and inserts it into its database.


# Retrieving message from queue
2017-05-11 22:43:46.242 DEBUG 19001 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@78910096: tags=[{amq.ctag-WCLRWmQ6WRkGgxg-enVslA=candidates.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@386143c0 Shared Rabbit Connection: SimpleConnection@2d187d86 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59586], acknowledgeMode=AUTO local queue size=0
2017-05-11 22:43:46.684 DEBUG 19001 — [pool-1-thread-9] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for Consumer@78910096: tags=[{amq.ctag-WCLRWmQ6WRkGgxg-enVslA=candidates.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@386143c0 Shared Rabbit Connection: SimpleConnection@2d187d86 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59586], acknowledgeMode=AUTO local queue size=0
2017-05-11 22:43:46.685 DEBUG 19001 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:'{"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=candidates.queue, receivedDelay=null, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-WCLRWmQ6WRkGgxg-enVslA, consumerQueue=candidates.queue])
2017-05-11 22:43:46.686 DEBUG 19001 — [cTaskExecutor-1] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload={"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=candidates.queue, amqp_contentEncoding=UTF-8, amqp_deliveryTag=6, amqp_consumerQueue=candidates.queue, amqp_redelivered=false, id=608b990a-919b-52c1-fb64-4af4be03b306, amqp_consumerTag=amq.ctag-WCLRWmQ6WRkGgxg-enVslA, contentType=text/plain, timestamp=1494557026686}]]
# Inserting new Candidate into database
2017-05-11 22:43:46.687 DEBUG 19001 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoTemplate : Saving DBObject containing fields: [_class, _id, firstName, lastName, politicalParty, election]
2017-05-11 22:43:46.687 DEBUG 19001 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[voters]
2017-05-11 22:43:46.688 DEBUG 19001 — [cTaskExecutor-1] org.mongodb.driver.protocol.update : Updating documents in namespace voters.candidate on connection [connectionId{localValue:2, serverValue:151}] to server localhost:27017
2017-05-11 22:43:46.703 DEBUG 19001 — [cTaskExecutor-1] org.mongodb.driver.protocol.update : Update completed
2017-05-11 22:43:46.703 DEBUG 19001 — [cTaskExecutor-1] c.v.voter.service.CandidateListService : Candidate Hillary Clinton (Democratic Party) saved to MongoDB

MongoDB

Using the mongo Shell, we can observe six new 2016 Presidential Election candidates in the Candidate service’s database.


> show dbs
candidates 0.000GB
voters 0.000GB
> use candidates
switched to db candidates
> show collections
candidate
> db.candidate.find({})
{ "_id" : ObjectId("5915220e1162e14b2a42e65e"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Donald", "lastName" : "Trump", "politicalParty" : "Republican Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("5915220f1162e14b2a42e65f"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Chris", "lastName" : "Keniston", "politicalParty" : "Veterans Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e660"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Jill", "lastName" : "Stein", "politicalParty" : "Green Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e661"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Gary", "lastName" : "Johnson", "politicalParty" : "Libertarian Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e662"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Darrell", "lastName" : "Castle", "politicalParty" : "Constitution Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e663"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Hillary", "lastName" : "Clinton", "politicalParty" : "Democratic Party", "election" : "2016 Presidential Election" }

Now, looking at the Voter service’s database, we should find the same six 2016 Presidential Election candidates. Note the Object IDs are the same between the two service’s document sets, as are the rest of the fields (first name, last name, political party, and election). However, the class field is different between the two service’s records.


> show dbs
candidates 0.000GB
voters 0.000GB
> use voters
> db.candidate.find({})
{ "_id" : ObjectId("5915220e1162e14b2a42e65e"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Donald", "lastName" : "Trump", "politicalParty" : "Republican Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("5915220f1162e14b2a42e65f"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Chris", "lastName" : "Keniston", "politicalParty" : "Veterans Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e660"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Jill", "lastName" : "Stein", "politicalParty" : "Green Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e661"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Gary", "lastName" : "Johnson", "politicalParty" : "Libertarian Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e662"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Darrell", "lastName" : "Castle", "politicalParty" : "Constitution Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e663"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Hillary", "lastName" : "Clinton", "politicalParty" : "Democratic Party", "election" : "2016 Presidential Election" }

Production Considerations

The post demonstrated a simple example of message-based, event-driven eventual consistency. In an actual Production environment, there are a few things that must be considered.

  • We only addressed a ‘candidate created’ event. We would also have to code for other types of events, such as a ‘candidate deleted’ event and a ‘candidate updated’ event.
  • If a candidate is added, deleted, then re-added, are the events published and consumed in the right order? What about with multiple instances of the Voter service running? Does this pattern guarantee event ordering?
  • How should the Candidate service react on startup if RabbitMQ is not available
  • What if RabbitMQ fails after the Candidate services have started?
  • How should the Candidate service react if a new candidate record is added to the database, but a ‘candidate created’ event message cannot be published to RabbitMQ? The two actions are not wrapped in a single transaction.
  • In all of the above scenarios, what response should be returned to the API end user?

Conclusion

In this post, using eventual consistency, we successfully decoupled our two microservices and achieved asynchronous inter-process communications. Adopting a message-based, event-driven, loosely-coupled architecture, wherever possible, in combination with REST HTTP when it makes sense, will improve the overall manageability and scalability of a microservices-based platform.

References

All opinions in this post are my own and not necessarily the views of my current employer or their clients.

, , , , , , , , , , ,

5 Comments

Spring Music Revisited: Java-Spring-MongoDB Web App with Docker 1.12

Build, test, deploy, and monitor a multi-container, MongoDB-backed, Java Spring web application, using the new Docker 1.12.

Spring Music Infrastructure

Introduction

** This post and associated project code were updated 9/3/2016 to use Tomcat 8.5.4 with OpenJDK 8.**

This post and the post’s example project represent an update to a previous post, Build and Deploy a Java-Spring-MongoDB Application using Docker. This new post incorporates many improvements made in Docker 1.12, including the use of the new Docker Compose v2 YAML format. The post’s project was also updated to use Filebeat with ELK, as opposed to Logspout, which was used previously.

In this post, we will demonstrate how to build, test, deploy, and manage a Java Spring web application, hosted on Apache Tomcat, load-balanced by NGINX, monitored by ELK with Filebeat, and all containerized with Docker.

We will use a sample Java Spring application, Spring Music, available on GitHub from Cloud Foundry. The Spring Music sample record album collection application was originally designed to demonstrate the use of database services on Cloud Foundry, using the Spring Framework. Instead of Cloud Foundry, we will host the Spring Music application locally, using Docker on VirtualBox, and optionally on AWS.

All files necessary to build this project are stored on the docker_v2 branch of the garystafford/spring-music-docker repository on GitHub. The Spring Music source code is stored on the springmusic_v2 branch of the garystafford/spring-music repository, also on GitHub.

Spring Music Application

Application Architecture

The Java Spring Music application stack contains the following technologies: JavaSpring Framework, AngularJS, Bootstrap, jQueryNGINXApache TomcatMongoDB, the ELK Stack, and Filebeat. Testing frameworks include the Spring MVC Test Framework, Mockito, Hamcrest, and JUnit.

A few changes were made to the original Spring Music application to make it work for this demonstration, including:

  • Move from Java 1.7 to 1.8 (including newer Tomcat version)
  • Add unit tests for Continuous Integration demonstration purposes
  • Modify MongoDB configuration class to work with non-local, containerized MongoDB instances
  • Add Gradle warNoStatic task to build WAR without static assets
  • Add Gradle zipStatic task to ZIP up the application’s static assets for deployment to NGINX
  • Add Gradle zipGetVersion task with a versioning scheme for build artifacts
  • Add context.xml file and MANIFEST.MF file to the WAR file
  • Add Log4j RollingFileAppender appender to send log entries to Filebeat
  • Update versions of several dependencies, including Gradle, Spring, and Tomcat

We will use the following technologies to build, publish, deploy, and host the Java Spring Music application: GradlegitGitHubTravis CIOracle VirtualBoxDockerDocker ComposeDocker MachineDocker Hub, and optionally, Amazon Web Services (AWS).

NGINX
To increase performance, the Spring Music web application’s static content will be hosted by NGINX. The application’s WAR file will be hosted by Apache Tomcat 8.5.4. Requests for non-static content will be proxied through NGINX on the front-end, to a set of three load-balanced Tomcat instances on the back-end. To further increase application performance, NGINX will also be configured for browser caching of the static content. In many enterprise environments, the use of a Java EE application server, like Tomcat, is still not uncommon.

Reverse proxying and caching are configured thought NGINX’s default.conf file, in the server configuration section:

server {
listen 80;
server_name proxy;
location ~* \/assets\/(css|images|js|template)\/* {
root /usr/share/nginx/;
expires max;
add_header Pragma public;
add_header Cache-Control "public, must-revalidate, proxy-revalidate";
add_header Vary Accept-Encoding;
access_log off;
}
view raw nginx_v2.txt hosted with ❤ by GitHub

The three Tomcat instances will be manually configured for load-balancing using NGINX’s default round-robin load-balancing algorithm. This is configured through the default.conf file, in the upstream configuration section:

upstream backend {
server music_app_1:8080;
server music_app_2:8080;
server music_app_3:8080;
}
view raw nginx2_v2.txt hosted with ❤ by GitHub

Client requests are received through port 80 on the NGINX server. NGINX redirects requests, which are not for non-static assets, to one of the three Tomcat instances on port 8080.

MongoDB
The Spring Music application was designed to work with a number of data stores, including MySQL, Postgres, Oracle, MongoDB, Redis, and H2, an in-memory Java SQL database. Given the choice of both SQL and NoSQL databases, we will select MongoDB.

The Spring Music application, hosted by Tomcat, will store and modify record album data in a single instance of MongoDB. MongoDB will be populated with a collection of album data from a JSON file, when the Spring Music application first creates the MongoDB database instance.

ELK
Lastly, the ELK Stack with Filebeat, will aggregate NGINX, Tomcat, and Java Log4j log entries, providing debugging and analytics to our demonstration. A similar method for aggregating logs, using Logspout instead of Filebeat, can be found in this previous post.

Kibana 4 Web Console

Continuous Integration

In this post’s example, two build artifacts, a WAR file for the application and ZIP file for the static web content, are built automatically by Travis CI, whenever source code changes are pushed to the springmusic_v2 branch of the garystafford/spring-music repository on GitHub.

Travis CI Output

Following a successful build and a small number of unit tests, Travis CI pushes the build artifacts to the build-artifacts branch on the same GitHub project. The build-artifacts branch acts as a pseudo binary repository for the project, much like JFrog’s Artifactory. These artifacts are used later by Docker to build the project’s immutable Docker images and containers.

Build Artifact Repository

Build Notifications
Travis CI pushes build notifications to a Slack channel, which eliminates the need to actively monitor Travis CI.

Travis CI Slack Notifications

Automation Scripting
The .travis.yaml file, custom gradle.build Gradle tasks, and the deploy_travisci.sh script handles the Travis CI automation described, above.

Travis CI .travis.yaml file:

language: java
jdk: oraclejdk8
before_install:
- chmod +x gradlew
before_deploy:
- chmod ugo+x deploy_travisci.sh
script:
- "./gradlew clean build"
- "./gradlew warNoStatic warCopy zipGetVersion zipStatic"
- sh ./deploy_travisci.sh
env:
global:
- GH_REF: github.com/garystafford/spring-music.git
- secure: <GH_TOKEN_secure_hash_here>
- secure: <COMMIT_AUTHOR_EMAIL_secure_hash_here>
notifications:
slack:
- secure: <SLACK_secure_hash_here>
view raw travis.yaml hosted with ❤ by GitHub

Custom gradle.build tasks:

// new Gradle build tasks
task warNoStatic(type: War) {
// omit the version from the war file name
version = ''
exclude '**/assets/**'
manifest {
attributes
'Manifest-Version': '1.0',
'Created-By': currentJvm,
'Gradle-Version': GradleVersion.current().getVersion(),
'Implementation-Title': archivesBaseName + '.war',
'Implementation-Version': artifact_version,
'Implementation-Vendor': 'Gary A. Stafford'
}
}
task warCopy(type: Copy) {
from 'build/libs'
into 'build/distributions'
include '**/*.war'
}
task zipGetVersion (type: Task) {
ext.versionfile =
new File("${projectDir}/src/main/webapp/assets/buildinfo.properties")
versionfile.text = 'build.version=' + artifact_version
}
task zipStatic(type: Zip) {
from 'src/main/webapp/assets'
appendix = 'static'
version = ''
}

The deploy.sh file:

#!/bin/bash
set -e
cd build/distributions
git init
git config user.name "travis-ci"
git config user.email "${COMMIT_AUTHOR_EMAIL}"
git add .
git commit -m "Deploy Travis CI Build #${TRAVIS_BUILD_NUMBER} artifacts to GitHub"
git push --force --quiet "https://${GH_TOKEN}@${GH_REF}" master:build-artifacts > /dev/null 2>&1
view raw build_v2.sh hosted with ❤ by GitHub

You can easily replicate the project’s continuous integration automation using your choice of toolchains. GitHub or BitBucket are good choices for distributed version control. For continuous integration and deployment, I recommend Travis CI, Semaphore, Codeship, or Jenkins. Couple those with a good persistent chat application, such as Glider Labs’ Slack or Atlassian’s HipChat.

Building the Docker Environment

Make sure VirtualBox, Docker, Docker Compose, and Docker Machine, are installed and running. At the time of this post, I have the following versions of software installed on my Mac:

  • Mac OS X 10.11.6
  • VirtualBox 5.0.26
  • Docker 1.12.1
  • Docker Compose 1.8.0
  • Docker Machine 0.8.1

To build the project’s VirtualBox VM, Docker images, and Docker containers, execute the build script, using the following command: sh ./build_project.sh. A build script is useful when working with CI/CD automation tools, such as Jenkins CI or ThoughtWorks go. However, to understand the build process, I suggest first running the individual commands, locally.

#!/bin/sh
set -ex
# clone project
git clone -b docker_v2 --single-branch \
https://github.com/garystafford/spring-music-docker.git music \
&& cd "$_"
# provision VirtualBox VM
docker-machine create --driver virtualbox springmusic
# set new environment
docker-machine env springmusic \
&& eval "$(docker-machine env springmusic)"
# mount a named volume on host to store mongo and elk data
# ** assumes your project folder is 'music' **
docker volume create --name music_data
docker volume create --name music_elk
# create bridge network for project
# ** assumes your project folder is 'music' **
docker network create -d bridge music_net
# build images and orchestrate start-up of containers (in this order)
docker-compose -p music up -d elk && sleep 15 \
&& docker-compose -p music up -d mongodb && sleep 15 \
&& docker-compose -p music up -d app \
&& docker-compose scale app=3 && sleep 15 \
&& docker-compose -p music up -d proxy && sleep 15
# optional: configure local DNS resolution for application URL
#echo "$(docker-machine ip springmusic) springmusic.com" | sudo tee --append /etc/hosts
# run a simple connectivity test of application
for i in {1..9}; do curl -I $(docker-machine ip springmusic); done

Deploying to AWS
By simply changing the Docker Machine driver to AWS EC2 from VirtualBox, and providing your AWS credentials, the springmusic environment may also be built on AWS.

Build Process
Docker Machine provisions a single VirtualBox springmusic VM on which host the project’s containers. VirtualBox provides a quick and easy solution that can be run locally for initial development and testing of the application.

Next, the script creates a Docker data volume and project-specific Docker bridge network.

Next, using the project’s individual Dockerfiles, Docker Compose pulls base Docker images from Docker Hub for NGINX, Tomcat, ELK, and MongoDB. Project-specific immutable Docker images are then built for NGINX, Tomcat, and MongoDB. While constructing the project-specific Docker images for NGINX and Tomcat, the latest Spring Music build artifacts are pulled and installed into the corresponding Docker images.

Docker Compose builds and deploys (6) containers onto the VirtualBox VM: (1) NGINX, (3) Tomcat, (1) MongoDB, and (1) ELK.

The NGINX Dockerfile:

# NGINX image with build artifact
FROM nginx:latest
MAINTAINER Gary A. Stafford <garystafford@rochester.rr.com>
ENV REFRESHED_AT 2016-09-17
ENV GITHUB_REPO https://github.com/garystafford/spring-music/raw/build-artifacts
ENV STATIC_FILE spring-music-static.zip
RUN apt-get update -qq \
&& apt-get install -qqy curl wget unzip nano \
&& apt-get clean \
\
&& wget -O /tmp/${STATIC_FILE} ${GITHUB_REPO}/${STATIC_FILE} \
&& unzip /tmp/${STATIC_FILE} -d /usr/share/nginx/assets/
COPY default.conf /etc/nginx/conf.d/default.conf
# tweak nginx image set-up, remove log symlinks
RUN rm /var/log/nginx/access.log /var/log/nginx/error.log
# install Filebeat
ENV FILEBEAT_VERSION=filebeat_1.2.3_amd64.deb
RUN curl -L -O https://download.elastic.co/beats/filebeat/${FILEBEAT_VERSION} \
&& dpkg -i ${FILEBEAT_VERSION} \
&& rm ${FILEBEAT_VERSION}
# configure Filebeat
ADD filebeat.yml /etc/filebeat/filebeat.yml
# CA cert
RUN mkdir -p /etc/pki/tls/certs
ADD logstash-beats.crt /etc/pki/tls/certs/logstash-beats.crt
# start Filebeat
ADD ./start.sh /usr/local/bin/start.sh
RUN chmod +x /usr/local/bin/start.sh
CMD [ "/usr/local/bin/start.sh" ]

The Tomcat Dockerfile:

# Apache Tomcat image with build artifact
FROM tomcat:8.5.4-jre8
MAINTAINER Gary A. Stafford <garystafford@rochester.rr.com>
ENV REFRESHED_AT 2016-09-17
ENV GITHUB_REPO https://github.com/garystafford/spring-music/raw/build-artifacts
ENV APP_FILE spring-music.war
ENV TERM xterm
ENV JAVA_OPTS -Djava.security.egd=file:/dev/./urandom
RUN apt-get update -qq \
&& apt-get install -qqy curl wget \
&& apt-get clean \
\
&& touch /var/log/spring-music.log \
&& chmod 666 /var/log/spring-music.log \
\
&& wget -q -O /usr/local/tomcat/webapps/ROOT.war ${GITHUB_REPO}/${APP_FILE} \
&& mv /usr/local/tomcat/webapps/ROOT /usr/local/tomcat/webapps/_ROOT
COPY tomcat-users.xml /usr/local/tomcat/conf/tomcat-users.xml
# install Filebeat
ENV FILEBEAT_VERSION=filebeat_1.2.3_amd64.deb
RUN curl -L -O https://download.elastic.co/beats/filebeat/${FILEBEAT_VERSION} \
&& dpkg -i ${FILEBEAT_VERSION} \
&& rm ${FILEBEAT_VERSION}
# configure Filebeat
ADD filebeat.yml /etc/filebeat/filebeat.yml
# CA cert
RUN mkdir -p /etc/pki/tls/certs
ADD logstash-beats.crt /etc/pki/tls/certs/logstash-beats.crt
# start Filebeat
ADD ./start.sh /usr/local/bin/start.sh
RUN chmod +x /usr/local/bin/start.sh
CMD [ "/usr/local/bin/start.sh" ]

Docker Compose v2 YAML
This post was recently updated for Docker 1.12, and to use Docker Compose v2 YAML file format. The post’s docker-compose.yml takes advantage of improvements in Docker 1.12 and Docker Compose v2 YAML. Improvements to the YAML file include eliminating the need to link containers and expose ports, and the addition of named networks and volumes.

version: '2'
services:
proxy:
build: nginx/
ports:
- 80:80
networks:
- net
depends_on:
- app
hostname: proxy
container_name: proxy
app:
build: tomcat/
ports:
- 8080
networks:
- net
depends_on:
- mongodb
hostname: app
mongodb:
build: mongodb/
ports:
- 27017:27017
networks:
- net
depends_on:
- elk
hostname: mongodb
container_name: mongodb
volumes:
- music_data:/data/db
- music_data:/data/configdb
elk:
image: sebp/elk:latest
ports:
- 5601:5601
- 9200:9200
- 5044:5044
- 5000:5000
networks:
- net
volumes:
- music_elk:/var/lib/elasticsearch
hostname: elk
container_name: elk
volumes:
music_data:
external: true
music_elk:
external: true
networks:
net:
driver: bridge

The Results

Spring Music Infrastructure

Below are the results of building the project.

# Resulting Docker Machine VirtualBox VM:
$ docker-machine ls
NAME ACTIVE DRIVER STATE URL SWARM DOCKER ERRORS
springmusic * virtualbox Running tcp://192.168.99.100:2376 v1.12.1
# Resulting external volume:
$ docker volume ls
DRIVER VOLUME NAME
local music_data
local music_elk
# Resulting bridge network:
$ docker network ls
NETWORK ID NAME DRIVER SCOPE
f564dfa1b440 music_net bridge local
# Resulting Docker images - (4) base images and (3) project images:
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
music_proxy latest 7a8dd90bcf32 About an hour ago 250.2 MB
music_app latest c93c713d03b8 About an hour ago 393 MB
music_mongodb latest fbcbbe9d4485 25 hours ago 366.4 MB
tomcat 8.5.4-jre8 98cc750770ba 2 days ago 334.5 MB
mongo latest 48b8b08dca4d 2 days ago 366.4 MB
nginx latest 4efb2fcdb1ab 10 days ago 183.4 MB
sebp/elk latest 07a3e78b01f5 13 days ago 884.5 MB
# Resulting (6) Docker containers
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b33922767517 music_proxy "/usr/local/bin/start" 3 hours ago Up 13 minutes 0.0.0.0:80->80/tcp, 443/tcp proxy
e16d2372f2df music_app "/usr/local/bin/start" 3 hours ago Up About an hour 0.0.0.0:32770->8080/tcp music_app_3
6b7accea7156 music_app "/usr/local/bin/start" 3 hours ago Up About an hour 0.0.0.0:32769->8080/tcp music_app_2
2e94f766df1b music_app "/usr/local/bin/start" 3 hours ago Up About an hour 0.0.0.0:32768->8080/tcp music_app_1
71f8dc574148 sebp/elk:latest "/usr/local/bin/start" 3 hours ago Up About an hour 0.0.0.0:5000->5000/tcp, 0.0.0.0:5044->5044/tcp, 0.0.0.0:5601->5601/tcp, 0.0.0.0:9200->9200/tcp, 9300/tcp elk
f7e7d1af7cca music_mongodb "/entrypoint.sh mongo" 20 hours ago Up About an hour 0.0.0.0:27017->27017/tcp mongodb

Testing the Application

Below are partial results of the curl test, hitting the NGINX endpoint. Note the different IP addresses in the Upstream-Address field between requests. This test proves NGINX’s round-robin load-balancing is working across the three Tomcat application instances: music_app_1, music_app_2, and music_app_3.

Also, note the sharp decrease in the Request-Time between the first three requests and subsequent three requests. The Upstream-Response-Time to the Tomcat instances doesn’t change, yet the total Request-Time is much shorter, due to caching of the application’s static assets by NGINX.

for i in {1..6}; do curl -I $(docker-machine ip springmusic);done
HTTP/1.1 200
Server: nginx/1.11.4
Date: Sat, 17 Sep 2016 18:33:50 GMT
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 2094
Connection: keep-alive
Accept-Ranges: bytes
ETag: W/"2094-1473924940000"
Last-Modified: Thu, 15 Sep 2016 07:35:40 GMT
Content-Language: en
Request-Time: 0.575
Upstream-Address: 172.18.0.4:8080
Upstream-Response-Time: 1474137230.048
HTTP/1.1 200
Server: nginx/1.11.4
Date: Sat, 17 Sep 2016 18:33:51 GMT
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 2094
Connection: keep-alive
Accept-Ranges: bytes
ETag: W/"2094-1473924940000"
Last-Modified: Thu, 15 Sep 2016 07:35:40 GMT
Content-Language: en
Request-Time: 0.711
Upstream-Address: 172.18.0.5:8080
Upstream-Response-Time: 1474137230.865
HTTP/1.1 200
Server: nginx/1.11.4
Date: Sat, 17 Sep 2016 18:33:52 GMT
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 2094
Connection: keep-alive
Accept-Ranges: bytes
ETag: W/"2094-1473924940000"
Last-Modified: Thu, 15 Sep 2016 07:35:40 GMT
Content-Language: en
Request-Time: 0.326
Upstream-Address: 172.18.0.6:8080
Upstream-Response-Time: 1474137231.812
# assets now cached...
HTTP/1.1 200
Server: nginx/1.11.4
Date: Sat, 17 Sep 2016 18:33:53 GMT
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 2094
Connection: keep-alive
Accept-Ranges: bytes
ETag: W/"2094-1473924940000"
Last-Modified: Thu, 15 Sep 2016 07:35:40 GMT
Content-Language: en
Request-Time: 0.012
Upstream-Address: 172.18.0.4:8080
Upstream-Response-Time: 1474137233.111
HTTP/1.1 200
Server: nginx/1.11.4
Date: Sat, 17 Sep 2016 18:33:53 GMT
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 2094
Connection: keep-alive
Accept-Ranges: bytes
ETag: W/"2094-1473924940000"
Last-Modified: Thu, 15 Sep 2016 07:35:40 GMT
Content-Language: en
Request-Time: 0.017
Upstream-Address: 172.18.0.5:8080
Upstream-Response-Time: 1474137233.350
HTTP/1.1 200
Server: nginx/1.11.4
Date: Sat, 17 Sep 2016 18:33:53 GMT
Content-Type: text/html;charset=ISO-8859-1
Content-Length: 2094
Connection: keep-alive
Accept-Ranges: bytes
ETag: W/"2094-1473924940000"
Last-Modified: Thu, 15 Sep 2016 07:35:40 GMT
Content-Language: en
Request-Time: 0.013
Upstream-Address: 172.18.0.6:8080
Upstream-Response-Time: 1474137233.594

Spring Music Application Links

Assuming the springmusic VM is running at 192.168.99.100, the following links can be used to access various project endpoints. Note the (3) Tomcat instances each map to randomly exposed ports. These ports are not required by NGINX, which maps to port 8080 for each instance. The port is only required if you want access to the Tomcat Web Console. The port, shown below, 32771, is merely used as an example.

* The Tomcat user name is admin and the password is t0mcat53rv3r.

Helpful Links

TODOs

  • Automate the Docker image build and publish processes
  • Automate the Docker container build and deploy processes
  • Automate post-deployment verification testing of project infrastructure
  • Add Docker Swarm multi-host capabilities with overlay networking
  • Update Spring Music with latest CF project revisions
  • Include scripting example to stand-up project on AWS
  • Add Consul and Consul Template for NGINX configuration

, , , , , , , , , , , ,

11 Comments

Diving Deeper into ‘Getting Started with Spring Cloud’

Spring_Cloud_Config_2

Explore the integration of Spring Cloud and Spring Cloud Netflix tooling, through a deep dive into Pivotal’s ‘Getting Started with Spring Cloud’ presentation.

Introduction

Keeping current with software development and DevOps trends can often make us feel we are, as the overused analogy describes, drinking from a firehose, often several hoses at once. Recently joining a large client engagement, I found it necessary to supplement my knowledge of cloud-native solutions, built with the support of Spring Cloud and Spring Cloud Netflix technologies. One of my favorite sources of information on these subjects is presentations by people like Josh Long, Dr. Dave Syer, and Cornelia Davis of Pivotal Labs, and Jon Schneider and Taylor Wicksell of Netflix.

One presentation, in particular, Getting Started with Spring Cloud, by Long and Syer, provides an excellent end-to-end technical overview of the latest Spring and Netflix technologies. Josh Long’s fast-paced, eighty-minute presentation, available on YouTube, was given at SpringOne2GX 2015 with co-presenter, Dr. Dave Syer, founder of Spring Cloud, Spring Boot, and Spring Batch.

As the presenters of Getting Started with Spring Cloud admit, the purpose of the presentation was to get people excited about Spring Cloud and Netflix technologies, not to provide a deep dive into each technology. However, I believe the presentation’s Reservation Service example provides an excellent learning opportunity. In the following post, we will examine the technologies, components, code, and configuration presented in Getting Started with Spring Cloud. The goal of the post is to provide a greater understanding of the Spring Cloud and Spring Cloud Netflix technologies.

System Overview

Technologies

The presentation’s example introduces a dizzying array of technologies, which include:

Spring Boot
Stand-alone, production-grade Spring-based applications

Spring Data REST / Spring HATEOAS
Spring-based applications following HATEOAS principles

Spring Cloud Config
Centralized external configuration management, backed by Git

Netflix Eureka
REST-based service discovery and registration for failover and load-balancing

Netflix Ribbon
IPC library with built-in client-side software load-balancers

Netflix Zuul
Dynamic routing, monitoring, resiliency, security, and more

Netflix Hystrix
Latency and fault tolerance for distributed system

Netflix Hystrix Dashboard
Web-based UI for monitoring Hystrix

Spring Cloud Stream
Messaging microservices, backed by Redis

Spring Data Redis
Configuration and access to Redis from a Spring app, using Jedis

Spring Cloud Sleuth
Distributed tracing solution for Spring Cloud, sends traces via Thrift to the Zipkin collector service

Twitter Zipkin
Distributed tracing system, backed by Apache Cassandra

H2
In-memory Java SQL database, embedded and server modes

Docker
Package applications with dependencies into standardized Linux containers

System Components

Several components and component sub-systems comprise the presentation’s overall Reservation Service example. Each component implements a combination of the technologies mentioned above. Below is a high-level architectural diagram of the presentation’s example. It includes a few additional features, added as part of this post.

Overall Reservation System Diagram

Individual system components include:

Spring Cloud Config Server
Stand-alone Spring Boot application provides centralized external configuration to multiple Reservation system components

Spring Cloud Config Git Repo
Git repository containing multiple Reservation system components configuration files, served by Spring Cloud Config Server

H2 Java SQL Database Server (New)
This post substitutes the original example’s use of H2’s embedded version with a TCP Server instance, shared by Reservation Service instances

Reservation Service
Multi load-balanced instances of stand-alone Spring Boot application, backed by H2 database

Reservation Client
Stand-alone Spring Boot application (aka edge service or client-side proxy), forwards client-side load-balanced requests to the Reservation Service, using Eureka, Zuul, and Ribbon

Reservation Data Seeder (New)
Stand-alone Spring Boot application, seeds H2 with initial data, instead of the Reservation Service

Eureka Service
Stand-alone Spring Boot application provides service discovery and registration for failover and load-balancing

Hystrix Dashboard
Stand-alone Spring Boot application provides web-based Hystrix UI for monitoring system performance and Hystrix circuit-breakers

Zipkin
Zipkin Collector, Query, and Web, and Cassandra database, receives, correlates, and displays traces from Spring Cloud Sleuth

Redis
In-memory data structure store, acting as message broker/transport for Spring Cloud Stream

Github

All the code for this post is available on Github, split between two repositories. The first repository, spring-cloud-demo, contains the source code for all of the components listed above, except the Spring Cloud Config Git Repo. To function correctly, the configuration files, consumed by the Spring Cloud Config Server, needs to be placed into a separate repository, spring-cloud-demo-config-repo.

The first repository contains a git submodule , docker-zipkin. If you are not familiar with submodules, you may want to take a moment to read the git documentation. The submodule contains a dockerized version of Twitter’s OpenZipkin, docker-zipkin. To  clone the two repositories, use the following commands. The --recursive option is required to include the docker-zipkin submodule in the project.

Configuration

To try out the post’s Reservation system example, you need to configure at least one property. The Spring Cloud Config Server needs to know the location of the Spring Cloud Config Repository, which is the second GitHub repository you cloned, spring-cloud-demo-config-repo. From the root of the spring-cloud-demo repo, edit the Spring Cloud Config Server application.properties file, located in config-server/src/main/resources/application.properties. Change the following property’s value to your local path to the spring-cloud-demo-config-repo repository:

spring.cloud.config.server.git.uri=file:<YOUR_PATH_GOES_HERE>/spring-cloud-demo-config-repo

Startup

There are a few ways you could run the multiple components that make up the post’s example. I suggest running one component per terminal window, in the foreground. In this way, you can monitor the output from the bootstrap and startup processes of the system’s components. Furthermore, you can continue to monitor the system’s components once they are up and running, and receiving traffic. Yes, that is twelve terminal windows…

ReservationServices.png

There is a required startup order for the components. For example, Spring Cloud Config Server needs to start before the other components that rely on it for configuration. Netflix’s Eureka needs to start before the Reservation Client and ReservationServices, so they can register with Eureka on startup. Similarly, Zipkin needs to be started in its Docker container before the Reservation Client and Services, so Spring Cloud Sleuth can start sending traces. Redis needs to be started in its Docker container before Spring Cloud Stream tries to create the message queue. All instances of the Reservation Service needs to start before the Reservation Client. Once every component is started, the Reservation Data Seeder needs to be run once to create initial data in H2. For best results, follow the instructions below. Let each component start completely, before starting the next component.

# IMPORTANT: set this to the spring-cloud-demo repo directory
export SPRING_DEMO=<YOUR_PATH_GOES_HERE>/spring-cloud-demo
# Redis - on Mac, in Docker Quickstart Terminal
cd ${SPRING_DEMO}/docker-redis/
docker-compose up
# Zipkin - on Mac, in Docker Quickstart Terminal
cd ${SPRING_DEMO}/docker-zipkin/
docker-compose up
# *** MAKE SURE ZIPKIN STARTS SUCCESSFULLY! ***
# *** I HAVE TO RESTART >50% OF TIME... ***
# H2 Database Server - in new terminal window
cd ${SPRING_DEMO}/h2-server
java -cp h2*.jar org.h2.tools.Server -webPort 6889
# Spring Cloud Config Server - in new terminal window
cd ${SPRING_DEMO}/config-server
mvn clean package spring-boot:run
# Eureka Service - in new terminal window
cd ${SPRING_DEMO}/eureka-server
mvn clean package spring-boot:run
# Hystrix Dashboard - in new terminal window
cd ${SPRING_DEMO}/hystrix-dashboard
mvn clean package spring-boot:run
# Reservation Service - instance 1 - in new terminal window
cd ${SPRING_DEMO}/reservation-service
mvn clean package
mvn spring-boot:run -Drun.jvmArguments='-Dserver.port=8000'
# Reservation Service - instance 2 - in new terminal window
cd ${SPRING_DEMO}/reservation-service
mvn spring-boot:run -Drun.jvmArguments='-Dserver.port=8001'
# Reservation Service - instance 3 - in new terminal window
cd ${SPRING_DEMO}/reservation-service
mvn spring-boot:run -Drun.jvmArguments='-Dserver.port=8002'
# Reservation Client - in new terminal window
cd ${SPRING_DEMO}/reservation-client
mvn clean package spring-boot:run
# Load seed data into H2 - in new terminal window
cd ${SPRING_DEMO}/reservation-data-seeder
mvn clean package spring-boot:run
# Redis redis-cli monitor - on Mac, in new Docker Quickstart Terminal
docker exec -it dockerredis_redis_1 redis-cli
127.0.0.1:6379> monitor

Docker

Both Zipkin and Redis run in Docker containers. Redis runs in a single container. Zipkin’s four separate components run in four separate containers. Be advised, Zipkin seems to have trouble successfully starting all four of its components on a consistent basis. I believe it’s a race condition caused by Docker Compose simultaneously starting the four Docker containers, ignoring a proper startup order. More than half of the time, I have to stop Zipkin and rerun the docker command to get Zipkin to start without any errors.

If you’ve followed the instructions above, you should see the following Docker images and Docker containers installed and running in your local environment.

docker is configured to use the default machine with IP 192.168.99.100
gstafford@nagstaffo:~$ docker images
REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
redis latest 8bccd73928d9 5 weeks ago 151.3 MB
openzipkin/zipkin-cassandra 1.30.0 8bbc92bceff0 5 weeks ago 221.9 MB
openzipkin/zipkin-web 1.30.0 c854ecbcef86 5 weeks ago 155.1 MB
openzipkin/zipkin-query 1.30.0 f0c45a26988a 5 weeks ago 180.3 MB
openzipkin/zipkin-collector 1.30.0 5fcf0ba455a0 5 weeks ago 183.8 MB
gstafford@nagstaffo:~$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1fde6bb2dc99 openzipkin/zipkin-web:1.30.0 "/usr/local/bin/run" 3 weeks ago Up 11 days 0.0.0.0:8080->8080/tcp, 0.0.0.0:9990->9990/tcp dockerzipkin_web_1
16e65180296e openzipkin/zipkin-query:1.30.0 "/usr/local/bin/run.s" 3 weeks ago Up 11 days 0.0.0.0:9411->9411/tcp, 0.0.0.0:9901->9901/tcp dockerzipkin_query_1
b1ca16408274 openzipkin/zipkin-collector:1.30.0 "/usr/local/bin/run.s" 3 weeks ago Up 11 days 0.0.0.0:9410->9410/tcp, 0.0.0.0:9900->9900/tcp dockerzipkin_collector_1
c7195ee9c4ff openzipkin/zipkin-cassandra:1.30.0 "/bin/sh -c /usr/loca" 3 weeks ago Up 11 days 7000-7001/tcp, 7199/tcp, 9160/tcp, 0.0.0.0:9042->9042/tcp dockerzipkin_cassandra_1
fe0396908e29 redis "/entrypoint.sh redis" 3 weeks ago Up 11 days 0.0.0.0:6379->6379/tcp dockerredis_redis_1

Components

Spring Cloud Config Server

At the center of the Reservation system is Spring Cloud Config. Configuration, typically found in the application.properties file, for the Reservation Services, Reservation Client, Reservation Data Seeder, Eureka Service, and Hystix Dashboard, has been externalized with Spring Cloud Config.

Spring_Cloud_Config_2

Each component has a bootstrap.properties file, which modifies its startup behavior during the bootstrap phase of an application context. Each bootstrap.properties file contains the component’s name and the address of the Spring Cloud Config Server. Components retrieve their configuration from the Spring Cloud Config Server at runtime. Below, is an example of the Reservation Client’s bootstrap.properties file.

# reservation client bootstrap props
spring.application.name=reservation-client
spring.cloud.config.uri=http://localhost:8888

Spring Cloud Config Git Repo

In the presentation, as in this post, the Spring Cloud Config Server is backed by a locally cloned Git repository, the Spring Cloud Config Git Repo. The Spring Cloud Config Server’s application.properties file contains the address of the Git repository. Each properties file within the Git repository corresponds to a system component. Below, is an example of the reservation-client.properties file, from the Spring Cloud Config Git Repo.

# reservation client app props
server.port=8050
message=Spring Cloud Config: Reservation Client
# spring cloud stream / redis
spring.cloud.stream.bindings.output=reservations
spring.redis.host=192.168.99.100
spring.redis.port=6379
# zipkin / spring cloud sleuth
spring.zipkin.host=192.168.99.100
spring.zipkin.port=9410

As shown in the original presentation, the configuration files can be viewed using HTTP endpoints of the Spring Cloud Config Server. To view the Reservation Service’s configuration stored in the Spring Cloud Config Git Repo, issue an HTTP GET request to http://localhost:8888/reservation-service/master. The master URI refers to the Git repo branch in which the configuration resides. This will return the configuration, in the response body, as JSON:

SpringCloudConfig

In a real Production environment, the Spring Cloud Config Server would be backed by a highly-available Git Server or GitHub repository.

Reservation Service

The Reservation Service is the core component in the presentation’s example. The Reservation Service is a stand-alone Spring Boot application. By implementing Spring Data REST and Spring HATEOAS, Spring automatically creates REST representations from the Reservation JPA Entity class of the Reservation Service. There is no need to write a Spring Rest Controller and explicitly code each endpoint.

HATEOAS

Spring HATEOAS allows us to interact with the Reservation Entity, using HTTP methods, such as GET and POST. These endpoints, along with all addressable endpoints, are displayed in the terminal output when a Spring Boot application starts. For example, we can use an HTTP GET request to call the reservations/{id} endpoint, such as:

curl -X GET -H "Content-Type: application/json" \
--url 'http://localhost:8000/reservations'
curl -X GET -H "Content-Type: application/json" \
--url 'http://localhost:8000/reservations/2'

The Reservation Service also makes use of the Spring RepositoryRestResource annotation. By annotating the RepositoryReservation Interface, which extends JpaRepository, we can customize export mapping and relative paths of the Reservation JPA Entity class. As shown below, the RepositoryReservation Interface contains the findByReservationName method signature, annotated with /by-name endpoint, which accepts the rn input parameter.

@RepositoryRestResource
interface ReservationRepository extends JpaRepository<Reservation, Long> {
@RestResource(path = "by-name")
Collection<Reservation> findByReservationName(@Param("rn") String rn);
}

Calling the findByReservationName method, we can search for a particular reservation by using an HTTP GET request to call the reservations/search/by-name?rn={reservationName} endpoint.

curl -X GET -H "Content-Type: application/json" \
--url 'http://localhost:8000/reservations/search/by-name?rn=Amit'

Spring Screengrab 04

Reservation Client

Querying the Reservation Service directly is possible, however, is not the recommended. Instead, the presentation suggests using the Reservation Client as a proxy to the Reservation Service. The presentation offers three examples of using the Reservation Client as a proxy.

The first demonstration of the Reservation Client uses the /message endpoint on the Reservation Client to return a string from the Reservation Service. The message example has been modified to include two new endpoints on the Reservation Client. The first endpoint, /reservations/client-message, returns a message directly from the Reservation Client. The second endpoint, /reservations/service-message, returns a message indirectly from the Reservation Service. To retrieve the message from the Reservation Service, the Reservation Client sends a request to the endpoint Reservation Service’s /message endpoint.

@RequestMapping(path = "/client-message", method = RequestMethod.GET)
public String getMessage() {
return this.message;
}
@RequestMapping(path = "/service-message", method = RequestMethod.GET)
public String getReservationServiceMessage() {
return this.restTemplate.getForObject(
"http://reservation-service/message&quot;,
String.class);
}

To retrieve both messages, send separate HTTP GET requests to each endpoint:

Spring Screengrab 02

The second demonstration of the Reservation Client uses a Data Transfer Object (DTO). Calling the Reservation Client’s reservations/names endpoint, invokes the getReservationNames method. This method, in turn, calls the Reservation Service’s /reservations endpoint. The response object returned from the Reservation Service, a JSON array of reservation records, is deserialized and mapped to the Reservation Client’s Reservation DTO. Finally, the method returns a collection of strings, representing just the names from the reservations.

@RequestMapping(path = "/names", method = RequestMethod.GET)
public Collection<String> getReservationNames() {
ParameterizedTypeReference<Resources<Reservation>> ptr =
new ParameterizedTypeReference<Resources<Reservation>>() {};
return this.restTemplate.exchange("http://reservation-service/reservations&quot;, GET, null, ptr)
.getBody()
.getContent()
.stream()
.map(Reservation::getReservationName)
.collect(toList());
}

To retrieve the collection of reservation names, an HTTP GET request is sent to the /reservations/names endpoint:

Spring Screengrab 05

Spring Cloud Stream

One of the more interesting technologies in the presentation is Spring’s Spring Cloud Stream. The Spring website describes Spring Cloud Stream as a project that allows users  to develop and run messaging microservices using Spring Integration. In other words, it provides native Spring messaging capabilities, backed by a choice of message buses, including Redis, RabbitMQ, and Apache Kafka, to Spring Boot applications.

A detailed explanation of Spring Cloud Stream would take an entire post. The best technical demonstration I have found is the presentation, Message Driven Microservices in the Cloud, by speakers Dr. David Syer and Dr. Mark Pollack, given in January 2016, also at SpringOne2GX 2015.

Diagram_03

In the presentation, a new reservation is submitted via an HTTP POST to the acceptNewReservations method of the Reservation Client. The method, in turn, builds (aka produces) a message, containing the new reservation, and publishes that message to the queue.reservation queue.

@Autowired
@Output(OUTPUT)
private MessageChannel messageChannel;
@Value("${message}")
private String message;
@Description("Post new reservations using Spring Cloud Stream")
@RequestMapping(method = POST)
public void acceptNewReservations(@RequestBody Reservation r) {
Message<String> build = withPayload(r.getReservationName()).build();
this.messageChannel.send(build);
}

The queue.reservation queue is located in Redis, which is running inside a Docker container. To view the messages being published to the queue in real-time, use the redis-cli, with the monitor command, from within the Redis Docker container. Below is an example of tests messages pushed (LPUSH) to the reservations queue from the Reservation Client.

gstafford@nagstaffo:~$ docker exec -it dockerredis_redis_1 redis-cli
127.0.0.1:6379> monitor
OK
1455332771.709412 [0 192.168.99.1:62177] "BRPOP" "queue.reservations" "1"
1455332772.110386 [0 192.168.99.1:59782] "BRPOP" "queue.reservations" "1"
1455332773.689777 [0 192.168.99.1:62183] "LPUSH" "queue.reservations" "\xff\x04\x0bcontentType\x00\x00\x00\x0c\"text/plain\"\tX-Span-Id\x00\x00\x00&\"49a5b1d1-e7e9-46de-9d9f-647d5b19a77b\"\nX-Trace-Id\x00\x00\x00&\"49a5b1d1-e7e9-46de-9d9f-647d5b19a77b\"\x0bX-Span-Name\x00\x00\x00\x13\"http/reservations\"Test-Name-01"
1455332777.124788 [0 192.168.99.1:59782] "BRPOP" "queue.reservations" "1"
1455332777.425655 [0 192.168.99.1:59776] "BRPOP" "queue.reservations" "1"
1455332777.581693 [0 192.168.99.1:62183] "LPUSH" "queue.reservations" "\xff\x04\x0bcontentType\x00\x00\x00\x0c\"text/plain\"\tX-Span-Id\x00\x00\x00&\"32db0e25-982a-422f-88bb-2e7c2e4ce393\"\nX-Trace-Id\x00\x00\x00&\"32db0e25-982a-422f-88bb-2e7c2e4ce393\"\x0bX-Span-Name\x00\x00\x00\x13\"http/reservations\"Test-Name-02"
1455332781.442398 [0 192.168.99.1:59776] "BRPOP" "queue.reservations" "1"
1455332781.643077 [0 192.168.99.1:62177] "BRPOP" "queue.reservations" "1"
1455332781.669264 [0 192.168.99.1:62183] "LPUSH"