Posts Tagged Kafka

Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine

Leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications. In previous posts, we have integrated other SaaS products, including as MongoDB Atlas fully-managed MongoDB-as-a-service, ElephantSQL fully-manage PostgreSQL-as-a-service, and CloudAMQP RabbitMQ-as-a-service, into cloud-native applications on Azure, AWS, GCP, and PCF.

In this post, we will build and deploy an existing, Spring Framework, microservice-based, cloud-native API to Google Kubernetes Engine (GKE), replete with Istio 1.0, on Google Cloud Platform (GCP). The API will rely on Confluent Cloud to provide a fully-managed, Kafka-based messaging-as-a-service (MaaS). Similarly, the API will rely on MongoDB Atlas to provide a fully-managed, MongoDB-based Database-as-a-service (DBaaS).

Background

In a previous two-part post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1 and Part 2, we examined the role of Apache Kafka in an event-driven, eventually consistent, distributed system architecture. The system, an online storefront RESTful API simulation, was composed of multiple, Java Spring Boot microservices, each with their own MongoDB database. The microservices used a publish/subscribe model to communicate with each other using Kafka-based messaging. The Spring services were built using the Spring for Apache Kafka and Spring Data MongoDB projects.

Given the use case of placing an order through the Storefront API, we examined the interactions of three microservices, the Accounts, Fulfillment, and Orders service. We examined how the three services used Kafka to communicate state changes to each other, in a fully-decoupled manner.

The Storefront API’s microservices were managed behind an API Gateway, Netflix’s Zuul. Service discovery and load balancing were handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. In that post, the entire containerized system was deployed to Docker Swarm.

Kafka-Eventual-Cons-Swarm.png

Developing the services, not operationalizing the platform, was the primary objective of the previous post.

Featured Technologies

The following technologies are featured prominently in this post.

Confluent Cloud

confluent_cloud_apache-300x228

In May 2018, Google announced a partnership with Confluence to provide Confluent Cloud on GCP, a managed Apache Kafka solution for the Google Cloud Platform. Confluent, founded by the creators of Kafka, Jay Kreps, Neha Narkhede, and Jun Rao, is known for their commercial, Kafka-based streaming platform for the Enterprise.

Confluent Cloud is a fully-managed, cloud-based streaming service based on Apache Kafka. Confluent Cloud delivers a low-latency, resilient, scalable streaming service, deployable in minutes. Confluent deploys, upgrades, and maintains your Kafka clusters. Confluent Cloud is currently available on both AWS and GCP.

Confluent Cloud offers two plans, Professional and Enterprise. The Professional plan is optimized for projects under development, and for smaller organizations and applications. Professional plan rates for Confluent Cloud start at $0.55/hour. The Enterprise plan adds full enterprise capabilities such as service-level agreements (SLAs) with a 99.95% uptime and virtual private cloud (VPC) peering. The limitations and supported features of both plans are detailed, here.

MongoDB Atlas

mongodb

Similar to Confluent Cloud, MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime SLAs, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

MongoDB Atlas has been featured in several past posts, including Deploying and Configuring Istio on Google Kubernetes Engine (GKE) and Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.

Kubernetes Engine

gkeAccording to Google, Google Kubernetes Engine (GKE) provides a fully-managed, production-ready Kubernetes environment for deploying, managing, and scaling your containerized applications using Google infrastructure. GKE consists of multiple Google Compute Engine instances, grouped together to form a cluster.

A forerunner to other managed Kubernetes platforms, like EKS (AWS), AKS (Azure), PKS (Pivotal), and IBM Cloud Kubernetes Service, GKE launched publicly in 2015. GKE was built on Google’s experience of running hyper-scale services like Gmail and YouTube in containers for over 12 years.

GKE’s pricing is based on a pay-as-you-go, per-second-billing plan, with no up-front or termination fees, similar to Confluent Cloud and MongoDB Atlas. Cluster sizes range from 1 – 1,000 nodes. Node machine types may be optimized for standard workloads, CPU, memory, GPU, or high-availability. Compute power ranges from 1 – 96 vCPUs and memory from 1 – 624 GB of RAM.

Demonstration

In this post, we will deploy the three Storefront API microservices to a GKE cluster on GCP. Confluent Cloud on GCP will replace the previous Docker-based Kafka implementation. Similarly, MongoDB Atlas will replace the previous Docker-based MongoDB implementation.

ConfluentCloud-v3a.png

Kubernetes and Istio 1.0 will replace Netflix’s Zuul and  Eureka for API management, load-balancing, routing, and service discovery. Google Stackdriver will provide logging and monitoring. Docker Images for the services will be stored in Google Container Registry. Although not fully operationalized, the Storefront API will be closer to a Production-like platform, than previously demonstrated on Docker Swarm.

ConfluentCloudRouting.png

For brevity, we will not enable standard API security features like HTTPS, OAuth for authentication, and request quotas and throttling, all of which are essential in Production. Nor, will we integrate a full lifecycle API management tool, like Google Apigee.

Source Code

The source code for this demonstration is contained in four separate GitHub repositories, storefront-kafka-dockerstorefront-demo-accounts, storefront-demo-orders, and, storefront-demo-fulfillment. However, since the Docker Images for the three storefront services are available on Docker Hub, it is only necessary to clone the storefront-kafka-docker project. This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources (gist).

Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

Setup Process

The setup of the Storefront API platform is divided into a few logical steps:

  1. Create the MongoDB Atlas cluster;
  2. Create the Confluent Cloud Kafka cluster;
  3. Create Kafka topics;
  4. Modify the Kubernetes resources;
  5. Modify the microservices to support Confluent Cloud configuration;
  6. Create the GKE cluster with Istio on GCP;
  7. Apply the Kubernetes resources to the GKE cluster;
  8. Test the Storefront API, Kafka, and MongoDB are functioning properly;

MongoDB Atlas Cluster

This post assumes you already have a MongoDB Atlas account and an existing project created. MongoDB Atlas accounts are free to set up if you do not already have one. Account creation does require the use of a Credit Card.

For minimal latency, we will be creating the MongoDB Atlas, Confluent Cloud Kafka, and GKE clusters, all on the Google Cloud Platform’s us-central1 Region. Available GCP Regions and Zones for MongoDB Atlas, Confluent Cloud, and GKE, vary, based on multiple factors.

screen_shot_2018-12-23_at_6.48.12_pm

For this demo, I suggest creating a free, M0-sized MongoDB cluster. The M0-sized 3-data node cluster, with shared RAM and 512 MB of storage, and currently running MongoDB 4.0.4, is fine for individual development. The us-central1 Region is the only available US Region for the free-tier M0-cluster on GCP. An M0-sized Atlas cluster may take between 7-10 minutes to provision.

screen_shot_2018-12-23_at_6.49.24_pm

MongoDB Atlas’ Web-based management console provides convenient links to cluster details, metrics, alerts, and documentation.

screen_shot_2018-12-23_at_6.51.41_pm

Once the cluster is ready, you can review details about the cluster and each individual cluster node.

screen_shot_2018-12-23_at_6.51.54_pm

In addition to the account owner, create a demo_user account. This account will be used to authenticate and connect with the MongoDB databases from the storefront services. For this demo, we will use the same, single user account for all three services. In Production, you would most likely have individual users for each service.

screen_shot_2018-12-23_at_6.52.18_pm

Again, for security purposes, Atlas requires you to whitelist the IP address or CIDR block from which the storefront services will connect to the cluster. For now, open the access to your specific IP address using whatsmyip.com, or much less-securely, to all IP addresses (0.0.0.0/0). Once the GKE cluster and external static IP addresses are created, make sure to come back and update this value; do not leave this wide open to the Internet.

screen_shot_2018-12-23_at_6.52.36_pm

The Java Spring Boot storefront services use a Spring Profile, gke. According to Spring, Spring Profiles provide a way to segregate parts of your application configuration and make it available only in certain environments. The gke Spring Profile’s configuration values may be set in a number of ways. For this demo, the majority of the values will be set using Kubernetes Deployment, ConfigMap and Secret resources, shown later.

The first two Spring configuration values will need are the MongoDB Atlas cluster’s connection string and the demo_user account password. Note these both for later use.

screen_shot_2018-12-23_at_6.53.00_pm

Confluent Cloud Kafka Cluster

Similar to MongoDB Atlas, this post assumes you already have a Confluent Cloud account and an existing project. It is free to set up a Professional account and a new project if you do not already have one. Atlas account creation does require the use of a Credit Card.

The Confluent Cloud web-based management console is shown below. Experienced users of other SaaS platforms may find the Confluent Cloud web-based console a bit sparse on features. In my opinion, the console lacks some necessary features, like cluster observability, individual Kafka topic management, detailed billing history (always says $0?), and persistent history of cluster activities, which survives cluster deletion. It seems like Confluent prefers users to download and configure their Confluent Control Center to get the functionality you might normally expect from a web-based Saas management tool.

screen_shot_2018-12-23_at_6.34.18_pm

As explained earlier, for minimal latency, I suggest creating the MongoDB Atlas cluster, Confluent Cloud Kafka cluster, and the GKE cluster, all on the Google Cloud Platform’s us-central1 Region. For this demo, choose the smallest cluster size available on GCP, in the us-central1 Region, with 1 MB/s R/W throughput and 500 MB of storage. As shown below, the cost will be approximately $0.55/hour. Don’t forget to delete this cluster when you are done with the demonstration, or you will continue to be charged.

screen_shot_2018-12-23_at_6.34.56_pm

Cluster creation of the minimally-sized Confluent Cloud cluster is pretty quick.

screen_shot_2018-12-23_at_6.39.52_pmOnce the cluster is ready, Confluent provides instructions on how to interact with the cluster via the Confluent Cloud CLI. Install the Confluent Cloud CLI, locally, for use later.

screen_shot_2018-12-23_at_6.35.56_pm

As explained earlier, the Java Spring Boot storefront services use a Spring Profile, gke. Like MongoDB Atlas, the Confluent Cloud Kafka cluster configuration values will be set using Kubernetes ConfigMap and Secret resources, shown later. There are several Confluent Cloud Java configuration values shown in the Client Config Java tab; we will need these for later use.

screen_shot_2018-12-23_at_6.36.12_pm

SASL and JAAS

Some users may not be familiar with the terms, SASL and JAAS. According to Wikipedia, Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols. According to Confluent, Kafka brokers support client authentication via SASL. SASL authentication can be enabled concurrently with SSL encryption (SSL client authentication will be disabled).

There are numerous SASL mechanisms.  The PLAIN SASL mechanism (SASL/PLAIN), used by Confluent, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use. The SASL/PLAIN mechanism should only be used with SSL as a transport layer to ensure that clear passwords are not transmitted on the wire without encryption.

According to Wikipedia, Java Authentication and Authorization Service (JAAS) is the Java implementation of the standard Pluggable Authentication Module (PAM) information security framework. According to Confluent, Kafka uses the JAAS for SASL configuration. You must provide JAAS configurations for all SASL authentication mechanisms.

Cluster Authentication

Similar to MongoDB Atlas, we need to authenticate with the Confluent Cloud cluster from the storefront services. The authentication to Confluent Cloud is done with an API Key. Create a new API Key, and note the Key and Secret; these two additional pieces of configuration will be needed later.

screen_shot_2018-12-23_at_6.38.09_pm

Confluent Cloud API Keys can be created and deleted as necessary. For security in Production, API Keys should be created for each service and regularly rotated.

screen_shot_2018-12-23_at_6.38.21_pm

Kafka Topics

With the cluster created, create the storefront service’s three Kafka topics manually, using the Confluent Cloud’s ccloud CLI tool. First, configure the Confluent Cloud CLI using the ccloud init command, using your new cluster’s Bootstrap Servers address, API Key, and API Secret. The instructions are shown above Clusters Client Config tab of the Confluent Cloud web-based management interface.

screen_shot_2018-12-26_at_2.05.09_pm

Create the storefront service’s three Kafka topics using the ccloud topic create command. Use the list command to confirm they are created.

# manually create kafka topics
ccloud topic create accounts.customer.change
ccloud topic create fulfillment.order.change
ccloud topic create orders.order.fulfill
  
# list kafka topics
ccloud topic list
  
accounts.customer.change
fulfillment.order.change
orders.order.fulfill

Another useful ccloud command, topic describe, displays topic replication details. The new topics will have a replication factor of 3 and a partition count of 12.

screen_shot_2018-12-26_at_5.03.11_pm

Adding the --verbose flag to the command, ccloud --verbose topic describe, displays low-level topic and cluster configuration details, as well as a log of all topic-related activities.

screen_shot_2018-12-26_at_5.07.20_pm

Kubernetes Resources

The deployment of the three storefront microservices to the dev Namespace will minimally require the following Kubernetes configuration resources.

  • (1) Kubernetes Namespace;
  • (3) Kubernetes Deployments;
  • (3) Kubernetes Services;
  • (1) Kubernetes ConfigMap;
  • (2) Kubernetes Secrets;
  • (1) Istio 1.0 Gateway;
  • (1) Istio 1.0 VirtualService;
  • (2) Istio 1.0 ServiceEntry;

The Istio networking.istio.io v1alpha3 API introduced the last three configuration resources in the list, to control traffic routing into, within, and out of the mesh. There are a total of four new io networking.istio.io v1alpha3 API routing resources: Gateway, VirtualService, DestinationRule, and ServiceEntry.

Creating and managing such a large number of resources is a common complaint regarding the complexity of Kubernetes. Imagine the resource sprawl when you have dozens of microservices replicated across several namespaces. Fortunately, all resource files for this post are included in the storefront-kafka-docker project’s gke directory.

To follow along with the demo, you will need to make minor modifications to a few of these resources, including the Istio Gateway, Istio VirtualService, two Istio ServiceEntry resources, and two Kubernetes Secret resources.

Istio Gateway & VirtualService

Both the Istio Gateway and VirtualService configuration resources are contained in a single file, istio-gateway.yaml. For the demo, I am using a personal domain, storefront-demo.com, along with the sub-domain, api.dev, to host the Storefront API. The domain’s primary A record (‘@’) and sub-domain A record are both associated with the external IP address on the frontend of the load balancer. In the file, this host is configured for the Gateway and VirtualService resources. You can choose to replace the host with your own domain, or simply remove the host block altogether on lines 13–14 and 21–22. Removing the host blocks, you would then use the external IP address on the frontend of the load balancer (explained later in the post) to access the Storefront API (gist).

Istio ServiceEntry

There are two Istio ServiceEntry configuration resources. Both ServiceEntry resources control egress traffic from the Storefront API services, both of their ServiceEntry Location items are set to MESH_INTERNAL. The first ServiceEntry, mongodb-atlas-external-mesh.yaml, defines MongoDB Atlas cluster egress traffic from the Storefront API (gist).

The other ServiceEntry, confluent-cloud-external-mesh.yaml, defines Confluent Cloud Kafka cluster egress traffic from the Storefront API (gist).

Both need to have their host items replaced with the appropriate Atlas and Confluent URLs.

Inspecting Istio Resources

The easiest way to view Istio resources is from the command line using the istioctl and kubectl CLI tools.

istioctl get gateway
istioctl get virtualservices
istioctl get serviceentry
  
kubectl describe gateway
kubectl describe virtualservices
kubectl describe serviceentry

Multiple Namespaces

In this demo, we are only deploying to a single Kubernetes Namespace, dev. However, Istio will also support routing traffic to multiple namespaces. For example, a typical non-prod Kubernetes cluster might support devtest, and uat, each associated with a different sub-domain. One way to support multiple Namespaces with Istio 1.0 is to add each host to the Istio Gateway (lines 14–16, below), then create a separate Istio VirtualService for each Namespace. All the VirtualServices are associated with the single Gateway. In the VirtualService, each service’s host address is the fully qualified domain name (FQDN) of the service. Part of the FQDN is the Namespace, which we change for each for each VirtualService (gist).

MongoDB Atlas Secret

There is one Kubernetes Secret for the sensitive MongoDB configuration and one Secret for the sensitive Confluent Cloud configuration. The Kubernetes Secret object type is intended to hold sensitive information, such as passwords, OAuth tokens, and SSH keys.

The mongodb-atlas-secret.yaml file contains the MongoDB Atlas cluster connection string, with the demo_user username and password, one for each of the storefront service’s databases (gist).

Kubernetes Secrets are Base64 encoded. The easiest way to encode the secret values is using the Linux base64 program. The base64 program encodes and decodes Base64 data, as specified in RFC 4648. Pass each MongoDB URI string to the base64 program using echo -n.

MONGODB_URI=mongodb+srv://demo_user:your_password@your_cluster_address/accounts?retryWrites=true
echo -n $MONGODB_URI | base64

bW9uZ29kYitzcnY6Ly9kZW1vX3VzZXI6eW91cl9wYXNzd29yZEB5b3VyX2NsdXN0ZXJfYWRkcmVzcy9hY2NvdW50cz9yZXRyeVdyaXRlcz10cnVl

Repeat this process for the three MongoDB connection strings.

screen_shot_2018-12-26_at_2.15.21_pm

Confluent Cloud Secret

The confluent-cloud-kafka-secret.yaml file contains two data fields in the Secret’s data map, bootstrap.servers and sasl.jaas.config. These configuration items were both listed in the Client Config Java tab of the Confluent Cloud web-based management console, as shown previously. The sasl.jaas.config data field requires the Confluent Cloud cluster API Key and Secret you created earlier. Again, use the base64 encoding process for these two data fields (gist).

Confluent Cloud ConfigMap

The remaining five Confluent Cloud Kafka cluster configuration values are not sensitive, and therefore, may be placed in a Kubernetes ConfigMapconfluent-cloud-kafka-configmap.yaml (gist).

Accounts Deployment Resource

To see how the services consume the ConfigMap and Secret values, review the Accounts Deployment resource, shown below. Note the environment variables section, on lines 44–90, are a mix of hard-coded values and values referenced from the ConfigMap and two Secrets, shown above (gist).

Modify Microservices for Confluent Cloud

As explained earlier, Confluent Cloud’s Kafka cluster requires some very specific configuration, based largely on the security features of Confluent Cloud. Connecting to Confluent Cloud requires some minor modifications to the existing storefront service source code. The changes are identical for all three services. To understand the service’s code, I suggest reviewing the previous post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1. Note the following changes are already made to the source code in the gke git branch, and not necessary for this demo.

The previous Kafka SenderConfig and ReceiverConfig Java classes have been converted to Java interfaces. There are four new SenderConfigConfluent, SenderConfigNonConfluent, ReceiverConfigConfluent, and ReceiverConfigNonConfluent classes, which implement one of the new interfaces. The new classes contain the Spring Boot Profile class-level annotation. One set of Sender and Receiver classes are assigned the @Profile("gke") annotation, and the others, the @Profile("!gke") annotation. When the services start, one of the two class implementations are is loaded, depending on the Active Spring Profile, gke or not gke. To understand the changes better, examine the Account service’s SenderConfigConfluent.java file (gist).

Line 20: Designates this class as belonging to the gke Spring Profile.

Line 23: The class now implements an interface.

Lines 25–44: Reference the Confluent Cloud Kafka cluster configuration. The values for these variables will come from the Kubernetes ConfigMap and Secret, described previously, when the services are deployed to GKE.

Lines 55–59: Additional properties that have been added to the Kafka Sender configuration properties, specifically for Confluent Cloud.

Once code changes were completed and tested, the Docker Image for each service was rebuilt and uploaded to Docker Hub for public access. When recreating the images, the version of the Java Docker base image was upgraded from the previous post to Alpine OpenJDK 12 (openjdk:12-jdk-alpine).

Google Kubernetes Engine (GKE) with Istio

Having created the MongoDB Atlas and Confluent Cloud clusters, built the Kubernetes and Istio resources, modified the service’s source code, and pushed the new Docker Images to Docker Hub, the GKE cluster may now be built.

For the sake of brevity, we will manually create the cluster and deploy the resources, using the Google Cloud SDK gcloud and Kubernetes kubectl CLI tools, as opposed to automating with CI/CD tools, like Jenkins or Spinnaker. For this demonstration, I suggest a minimally-sized two-node GKE cluster using n1-standard-2 machine-type instances. The latest available release of Kubernetes on GKE at the time of this post was 1.11.5-gke.5 and Istio 1.03 (Istio on GKE still considered beta). Note Kubernetes and Istio are evolving rapidly, thus the configuration flags often change with newer versions. Check the GKE Clusters tab for the latest clusters create command format (gist).

Executing these commands successfully will build the cluster and the dev Namespace, into which all the resources will be deployed. The two-node cluster creation process takes about three minutes on average.

screen_shot_2018-12-26_at_2.00.56_pm

We can also observe the new GKE cluster from the GKE Clusters Details tab.

screen_shot_2018-12-26_at_2.18.32_pm

Creating the GKE cluster also creates several other GCP resources, including a TCP load balancer and three external IP addresses. Shown below in the VPC network External IP addresses tab, there is one IP address associated with each of the two GKE cluster’s VM instances, and one IP address associated with the frontend of the load balancer.

screen_shot_2018-12-26_at_2.59.38_pm

While the TCP load balancer’s frontend is associated with the external IP address, the load balancer’s backend is a target pool, containing the two GKE cluster node machine instances.

screen_shot_2018-12-26_at_2.58.42_pm

A forwarding rule associates the load balancer’s frontend IP address with the backend target pool. External requests to the frontend IP address will be routed to the GKE cluster. From there, requests will be routed by Kubernetes and Istio to the individual storefront service Pods, and through the Istio sidecar (Envoy) proxies. There is an Istio sidecar proxy deployed to each Storefront service Pod.

screen_shot_2018-12-26_at_2.59.59_pm

Below, we see the details of the load balancer’s target pool, containing the two GKE cluster’s VMs.

screen_shot_2018-12-26_at_3.57.03_pm.png

As shown at the start of the post, a simplified view of the GCP/GKE network routing looks as follows. For brevity, firewall rules and routes are not illustrated in the diagram.

ConfluentCloudRouting

Apply Kubernetes Resources

Again, using kubectl, deploy the three services and associated Kubernetes and Istio resources. Note the Istio Gateway and VirtualService(s) are not deployed to the dev Namespace since their role is to control ingress and route traffic to the dev Namespace and the services within it (gist).

Once these commands complete successfully, on the Workloads tab, we should observe two Pods of each of the three storefront service Kubernetes Deployments deployed to the dev Namespace, all six Pods with a Status of ‘OK’. A Deployment controller provides declarative updates for Pods and ReplicaSets.

screen_shot_2018-12-26_at_2.51.01_pm

On the Services tab, we should observe the three storefront service’s Kubernetes Services. A Service in Kubernetes is a REST object.

screen_shot_2018-12-26_at_2.51.16_pm

On the Configuration Tab, we should observe the Kubernetes ConfigMap and two Secrets also deployed to the dev Environment.

screen_shot_2018-12-26_at_2.51.36_pm

Below, we see the confluent-cloud-kafka ConfigMap resource with its data map of Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.54.51_pm

Below, we see the confluent-cloud-kafka Secret with its data map of sensitive Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.55.17_pm

Test the Storefront API

If you recall from part two of the previous post, there are a set of seven Storefront API endpoints that can be called to create sample data and test the API. The HTTP GET Requests hit each service, generate test data, populate the three MongoDB databases, and produce and consume Kafka messages across all three topics. Making these requests is the easiest way to confirm the Storefront API is working properly.

  1. Sample Customer: accounts/customers/sample
  2. Sample Orders: orders/customers/sample/orders
  3. Sample Fulfillment Requests: orders/customers/sample/fulfill
  4. Sample Processed Order Event: fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Event: fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Event: fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Event: fulfillment/fulfillment/sample/receive

Thee are a wide variety of tools to interact with the Storefront API. The project includes a simple Python script, sample_data.py, which will make HTTP GET requests to each of the above endpoints, after confirming their health, and return a success message.

screen_shot_2018-12-31_at_12.19.50_pm.png

Postman

Postman, my personal favorite, is also an excellent tool to explore the Storefront API resources. I have the above set of the HTTP GET requests saved in a Postman Collection. Using Postman, below, we see the response from an HTTP GET request to the /accounts/customers endpoint.

screen_shot_2018-12-26_at_5.48.34_pm

Postman also allows us to create integration tests and run Collections of Requests in batches using Postman’s Collection Runner. To test the Storefront API, below, I used Collection Runner to run a single series of integration tests, intended to confirm the API’s functionality, by checking for expected HTTP response codes and expected values in the response payloads. Postman also shows the response times from the Storefront API. Since this platform was not built to meet Production SLAs, measuring response times is less critical in the Development environment.

screen_shot_2018-12-26_at_5.47.57_pm

Google Stackdriver

If you recall, the GKE cluster had the Stackdriver Kubernetes option enabled, which gives us, amongst other observability features, access to all cluster, node, pod, and container logs. To confirm data is flowing to the MongoDB databases and Kafka topics, we can check the logs from any of the containers. Below we see the logs from the two Accounts Pod containers. Observe the AfterSaveListener handler firing on an onAfterSave event, which sends a CustomerChangeEvent payload to the accounts.customer.change Kafka topic, without error. These entries confirm that both Atlas and Confluent Cloud are reachable by the GKE-based workloads, and appear to be functioning properly.

screen_shot_2018-12-26_at_8.05.50_pm.png

MongoDB Atlas Collection View

Review the MongoDB Atlas Clusters Collections tab. In this Development environment, the MongoDB databases and collections are created the first time a service tries to connects to them. In Production, the databases would be created and secured in advance of deploying resources. Once the sample data requests are completed successfully, you should now observe the three Storefront API databases, each with collections of documents.

screen_shot_2018-12-26_at_4.56.25_pm

MongoDB Compass

In addition to the Atlas web-based management console, MongoDB Compass is an excellent desktop tool to explore and manage MongoDB databases. Compass is available for Mac, Linux, and Windows. One of the many great features of Compass is the ability to visualize collection schemas and interactively filter documents. Below we see the fulfillment.requests collection schema.

Screen Shot 2019-01-20 at 10.21.54 AM.png

Confluent Control Center

Confluent Control Center is a downloadable, web browser-based tool for managing and monitoring Apache Kafka, including your Confluent Cloud clusters. Confluent Control Center provides rich functionality for building and monitoring production data pipelines and streaming applications. Confluent offers a free 30-day trial of Confluent Control Center. Since the Control Center is provided at an additional fee, and I found difficult to configure for Confluent Cloud clusters based on Confluent’s documentation, I chose not to cover it in detail, for this post.

screen_shot_2018-12-23_at_10.21.41_pm

screen_shot_2018-12-23_at_10.48.49_pm

Tear Down Cluster

Delete your Confluent Cloud and MongoDB clusters using their web-based management consoles. To delete the GKE cluster and all deployed Kubernetes resources, use the cluster delete command. Also, double-check that the external IP addresses and load balancer, associated with the cluster, were also deleted as part of the cluster deletion (gist).

Conclusion

In this post, we have seen how easy it is to integrate Cloud-based DBaaS and MaaS products with the managed Kubernetes services from GCP, AWS, and Azure. As this post demonstrated, leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications.

In future posts, we will revisit this Storefront API example, further demonstrating how to enable HTTPS (Securing Your Istio Ingress Gateway with HTTPS) and end-user authentication (Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , , ,

4 Comments

Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 2

Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we’ve been exploring one possible solution to this challenge, using Apache Kafka and the model of eventual consistency. In Part One, we examined the online storefront domain, the storefront’s microservices, and the system’s state change event message flows.

Part Two

In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.

Kafka-Eventual-Cons-Swarm

Source code for deploying the Dockerized components of the online storefront, shown in this post, is available on GitHub. All Docker Images are available on Docker Hub. I have chosen the wurstmeister/kafka-docker version of Kafka, available on Docker Hub; it has 580+ stars and 10M+ pulls on Docker Hub. This version of Kafka works well, as long as you run it within a Docker Swarm, locally.

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Deployment Options

For simplicity, I’ve used Docker’s native Docker Swarm Mode to support the deployed online storefront. Docker requires minimal configuration as opposed to other CaaS platforms. Usually, I would recommend Minikube for local development if the final destination of the storefront were Kubernetes in Production (AKS, EKS, or GKE). Alternatively, if the final destination of the storefront were Red Hat OpenShift in Production, I would recommend Minishift for local development.

Docker Deployment

We will break up our deployment into two parts. First, we will deploy everything, except our services. We will allow Kafka, MongoDB, Eureka, and the other components to startup up fully. Afterward, we will deploy the three online storefront services. The storefront-kafka-docker project on Github contains two Docker Compose files, which are divided between the two tasks.

The middleware Docker Compose file (gist).

The services Docker Compose file (gist).

In the storefront-kafka-docker project, there is a shell script, stack_deploy_local.sh. This script will execute both Docker Compose files, in succession, with a pause in between. You may need to adjust the timing for your own system (gist).

Start by running docker swarm init. This command will initialize a Docker Swarm. Next, execute the stack deploy script, using an sh ./stack_deploy_local.sh command. The script will deploy a new Docker Stack, within the Docker Swarm. The Docker Stack will hold all storefront components, deployed as individual Docker containers. The stack is deployed within its own isolated Docker overlay networkkafka-net.

Note we are not using host-based persistent storage for this local development demo. Destroying the Docker stack or the individual Kafka, Zookeeper, or MongoDB Docker containers will result in a loss of data.

stack-deploy

Before completion, the stack deploy script runs docker stack ls command, followed by a docker stack services storefront command. You should see one stack, names storefront, with ten services. You should also see each of the ten services has 1/1 replicas running, indicated everything has started or is starting correctly, without failure. A failure would be reflected here as a service having 0/1 replicas.

docker-stack-ls

Before completion, the stack deploy script also runs docker container ls command. You should observe each of the ten running containers (‘services’ in the Docker stack), along with their instance names and ports.

docker-container-ls

There is also a shell script, stack_delete_local.sh, which will issue a docker stack rm storefront command to destroy the stack when you are done.

Using the names of the storefront’s Docker containers, you can check the start-up logs of any of the components, using the docker logs command.

docker-logs

Testing the Stack

With the storefront stack deployed, we need to confirm that all the components have started correctly and are communicating with each other. To accomplish this, I’ve written a simple Python script, refresh.py. The refresh script has multiple uses. It deletes any existing storefront service MongoDB databases. It also deletes any existing Kafka topics; I call the Kafka Manager’s API to accomplish this. We have no databases or topics since our stack was just created. However, if you are actively developing your data models, you will likely want to purge the databases and topics regularly (gist).

Next, the refresh script calls a series of RESTful HTTP endpoints, in a specific order, to create sample data. Our three storefront services each expose different endpoints. The different /sample endpoints create sample customers, orders, order fulfillment requests, and shipping notifications. The create sample data endpoints include, in order:

  1. Sample Customer: /accounts/customers/sample
  2. Sample Orders: /orders/customers/sample/orders
  3. Sample Fulfillment Requests: /orders/customers/sample/fulfill
  4. Sample Processed Order Events: /fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Events: /fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Events: /fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Events: /fulfillment/fulfillment/sample/receive

You could create data on your own, by POSTing to the exposed CRUD endpoints on each service. However, given the complex data objects required in the request payloads, it is too time-consuming for this demo.

To execute the script, use a python3 ./refresh.py command. I am using Python 3 in the demo, but the script should also work with Python 2.x if you change shebang.

refresh-script

If everything was successful, the script returns one document from each of the three storefront service’s MongoDB database collections. A result of ‘None’ for any of the MongoDB documents usually indicates one of the earlier commands failed. Given an abnormally high response latency, due to the load of the ten running containers on my laptop, I had to increase the Zuul/Ribbon timeouts.

Observing the System

We should now have the online storefront Docker stack running, three MongoDB databases created and populated with sample documents (data), and three Kafka topics, which have messages in them. Based on the fact we saw database documents printed out with our refresh script, we know the topics were used to pass data between the message producing and message consuming services.

In most enterprise environments, a developer may not the access, nor the operational knowledge to interact with Kafka or MongoDB from within a container, on the command line. So how else can we interact with the system?

Kafka Manager

Kafka Manager gives us the ability to interact with Kafka via a convenient browser-based user interface. For this demo, the Kafka Manager UI is available on default port 9000.

kafka_manager_00

To make Kafka Manager useful, define the Kafka cluster. The Cluster Name is up to you. The Cluster Zookeeper Host should be zookeeper:2181, for our demo.

kafka_manager_01

Kafka Manager gives us useful insights into many aspects of our simple, single-broker cluster. You should observe three topics, created during the deployment of Kafka.

kafka_manager_02

Kafka Manager is an appealing alternative, as opposed to connecting with the Kafka container, with a docker exec command, to interact with Kafka. A typical use case might be deleting a topic or adding partitions to a topic. We can also see which Consumers are consuming which topics, from within Kafka Manager.

kafka_manager_03

Mongo Express

Similar to Kafka Manager, Mongo Express gives us the ability to interact with Kafka via a user interface. For this demo, the Mongo Express browser-based user interface is available on default port 8081. The initial view displays each of the existing databases. Note our three service’s databases, including accounts, orders, and fulfillment.

mongo-express-01

Drilling into an individual database, we can view each of the database’s collections. Digging in further, we can interact with individual database collection documents.

mongo-express-02

We may even edit and save the documents.

mongo-express-03

SpringFox and Swagger

Each of the storefront services also implements SpringFox, the automated JSON API documentation for API’s built with Spring. With SpringFox, each service exposes a rich Swagger UI. The Swagger UI allows us to interact with service endpoints.

Since each service exposes its own Swagger interface, we must access them through the Zuul API Gateway on port 8080. In our demo environment, the Swagger browser-based user interface is accessible at /swagger-ui.html. Below, is a fully self-documented Orders service API, as seen through the Swagger UI.

I believe there are still some incompatibilities with the latest SpringFox release and Spring Boot 2, which prevents Swagger from showing the default Spring Data REST CRUD endpoints. Currently, you only see the API  endpoints you explicitly declare in your Controller classes.

swagger-ui-1

The service’s data models (POJOs) are also exposed through the Swagger UI by default. Below we see the Orders service’s models.

swagger-ui-3

The Swagger UI allows you to drill down into the complex structure of the models, such as the CustomerOrder entity, exposing each of the entity’s nested data objects.

swagger-ui-2

Spring Cloud Netflix Eureka

This post does not cover the use of Eureka or Zuul. Eureka gives us further valuable insight into our storefront system. Eureka is our systems service registry and provides load-balancing for our services if we had multiple instances.

For this demo, the Eureka browser-based user interface is available on default port 8761. Within the Eureka user interface, we should observe the three storefront services and Zuul, the API Gateway, registered with Eureka. If we had more than one instance of each service, we would see all of them listed here.

eureka-ui

Although of limited use in a local environment, we can observe some general information about our host.

eureka-ui-02

Interacting with the Services

The three storefront services are fully functional Spring Boot / Spring Data REST / Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. Additionally, each service includes Spring Boot Actuator. Actuator exposes additional operational endpoints, allowing us to observe the running services. Again, this post is not intended to be a demonstration of Spring Boot or Spring Boot Actuator.

Using an application, such as Postman, we can interact with our service’s RESTful HTTP endpoints. Shown below, we are calling the Account service’s customers resource. The Accounts request is proxied through the Zuul API Gateway.

postman

The above Postman Storefront Collection and Postman Environment are both exported and saved with the project.

Some key endpoints to observe the entities that were created using Event-Carried State Transfer are as follows. They assume you are using localhost as a base URL.

References

Links to my GitHub projects for this post

Some additional references I found useful while authoring this post and the online storefront code:

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

 

, , , , , , ,

1 Comment

Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1

Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge, using Apache Kafka and the model of eventual consistency.

I previously covered the topic of eventual consistency in a distributed system, using RabbitMQ, in the post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. This post is featured on Pivotal’s RabbitMQ website.

Introduction

To ground the discussion, let’s examine a common example of the online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

mid-map-final-03

Given this problem domain, we can assume we have the concept of the Customer. Further, the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of a Customer would require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record (SOR) for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program. Fulfillment may maintain a record of all the orders shipped to the customer. Security likely holds the customer’s access credentials and privacy settings.

Below, Customer data objects are shown in yellow. Orange represents logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example. mid-map-final-01

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts, or even between services within the same contexts, then we must ensure data consistency. Take, for example, a change in a customer’s address. The Accounting context is the system of record for the customer’s addresses. However, to fulfill orders, the Shipping context might also need to maintain the customer’s address. Likewise, the Marketing context, who is responsible for direct-mail advertising, also needs to be aware of the address change, and update its own customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change, without the expectation of a response. They are stating a fact, not asking a question. Interested parties can choose if, and how, to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, as defined by Martin Fowler, of ThoughtWorks, in his insightful post, What do you mean by “Event-Driven”?. A change to a piece of data can be thought of as a state change event. Coincidently, Fowler also uses a customer’s address change as an example of Event-Carried State Transfer. The Event-Carried State Transfer Pattern is also detailed by fellow ThoughtWorker and noted Architect, Graham Brooks.

Consistency Strategies

Multiple architectural approaches could be taken to solve for data consistency in a distributed system. For example, you could use a single relational database to persist all data, avoiding the distributed data model altogether. Although I would argue, using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages are persisted in Kafka, the service have the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity.

Storefront Example

In this post, our online storefront’s services will be built using Spring Boot. Thus, we will ensure the uniformity of distributed data by using a Publish/Subscribe model with the Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot service, if appropriate, that state change will trigger an event, which will be shared with other services using Kafka topics.

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from each one another, while still ensuring the data is exchanged.

order-process-flow

Given the use case of placing an order, we will examine the interactions of three services, the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other, in a decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service. Kafka Producers may also be Consumers within our domain.

kafka-data-flow-diagram

Below is a view of the online storefront, through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you the idea of where Kafka, and Zookeeper, Kafka’s cluster manager, might sit in a typical, highly-available, microservice-based, distributed, application platform.

kafka-based-systems-diagram

This post will focus on the storefront’s services, database, and messaging sub-systems.

full-system-partial-view.png

Storefront Microservices

First, we will explore the functionality of each of the three microservices. Then, we will examine how they share state change events using Kafka. Each storefront service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data RESTSpring Data MongoDBSpring for Apache KafkaSpring Cloud SleuthSpringFox, Spring Cloud Netflix Eureka, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream is not part of this post.

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

accounts-diagram

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. A Customer, represented as a BSON document in the customer.accounts database collection, looks as follows (gist).

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added, or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

accounts-events-diagram.png

Since the CustomerChangeEvent domain event object is not persisted in MongoDB, to examine its structure, we can look at its JSON message payload in Kafka. Note the differences in the data structure between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload (gist).

For simplicity, we will assume other services do not make changes to the customer’s name, contact information, or addresses. That is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

orders-diagram

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows (gist).

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. TheFulfillmentRequestEvent object only contains the information it needs to share. In our example, it shares a single Order, along with the customer’s name, contact information, and shipping address.

orders-event-diagram

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at it’s JSON message payload in Kafka. Again, note the structural differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka (gist).

Source code for the Orders service is available on GitHub.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

fulfillment-diagram

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store the latest shipping event, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping addresses are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service, via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object, represented as a BSON document in the fulfillment.requests database collection, looks as follows (gist).

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

fulfillment-event-diagram

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, to examine it, we can again look at it’s JSON message payload in Kafka (gist).

Source code for the Fulfillment service is available on GitHub.

State Change Event Messaging Flows

There is three state change event messaging flows demonstrated in this post.

  1. Change to a Customer triggers an event message by the Accounts service;
  2. Order Approved triggers an event message by the Orders service;
  3. Change to the status of an Order triggers an event message by the Fulfillment service;

Each of these state change event messaging flows follow the exact same architectural pattern on both the Producer and Consumer sides of the Kafka topic.

kafka-event-flow

Let’s examine each state change event messaging flow and the code behind them.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. It can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information, by way of Kafka.

kafka-topic-01

There are different methods to trigger a message to be sent to Kafka, For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity (gist).

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class (gist).

The configuration of the Sender is handled by the SenderConfig class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload (gist).

The Sender uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

kafka-events-01.png

The Orders service’s Receiver class consumes the CustomerChangeEvent messages, produced by the Accounts service (gist).

[gust]cc3c4e55bc291e5435eccdd679d03015[/gist]

The Orders service’s Receiver class is configured differently, compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching (gist).

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). That method accepts a specific object type as input, denoting the object type the message payload needs to be deserialized into. In this way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file, in each service’s resources directory, contains the Kafka configuration (gist).

 Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Kafka-Eventual-Cons Order Flow 2

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class (gist).

The configuration of the Sender class is handled by the SenderConfig class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload (gist).

The Sender class uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Since message order is not critical messages could be sent to a topic with multiple partitions if the volume of messages required it.

kafka-events-02

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. This includes the order to be fulfilled, and the customer’s contact and shipping information (gist).

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object (gist).

Fulfillment Order Status State Change

When the status of the Order in a Fulfillment entity is changed anything other than ‘Approved’, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial ‘Created’ status to the final happy path ‘Received’ status.

kafka-topic-03

The Fulfillment service exposes several endpoints through the FulfillmentController class, which are simulate a change the status of an order. They allow an order status to be changed from ‘Approved’ to ‘Processing’, to ‘Shipped’, to ‘In Transit’, and to ‘Received’. This change is applied to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates an Kafka message, containing the OrderStatusChangeEvent in the message payload. This is handled by the Fulfillment service’s Sender class.

Note in this example, these two events are not handled in an atomic transaction. Either the updating the database or the sending of the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure both these disparate actions succeed or fail as a single transaction, to ensure data consistency (gist).

The configuration of the Sender class is handled by the SenderConfig class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services (gist).

The Sender class uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages could be sent to a topic with multiple partitions if the volume of messages required it.

kafka-events-03

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message, produced by the Fulfillment service (gist).

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service needs to receive messages from more than one topic. The ReceiverConfig class deserializes all message using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching (gist).

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). That method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.

Kafka-Eventual-Cons-Swarm

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , ,

6 Comments