Posts Tagged Spring Boot
Significantly improve the performance and reduce the cost of data lake-based analytics applications using Amazon ElastiCache for Redis
The recent post, Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena, demonstrated how to develop a Cloud-native analytics application using Spring Boot. The application queried data in an Amazon S3-based data lake via an AWS Glue Data Catalog utilizing the Amazon Athena API.
Securely exposing data in a data lake using RESTful APIs can fulfill many data-consumer needs. However, access to that data can be significantly slower than access from a database or data warehouse. For example, in the previous post, we imported the OpenAPI v3 specification from the Spring Boot service into Postman. The API specification contained approximately 17 endpoints.
From my local development laptop, the Postman API test run times for all service endpoints took an average of 32.4 seconds. The Spring Boot service was running three Kubernetes pod replicas on Amazon EKS in the AWS US East (N. Virginia) Region.
Compare the data lake query result times to equivalent queries made against a minimally-sized Amazon RDS for PostgreSQL database instance containing the same data. The average run times for all PostgreSQL queries averaged 10.8 seconds from a similar Spring Boot service. Although not a precise benchmark, we can clearly see that access to the data in the Amazon S3-based data lake is substantially slower, approximately 3x slower, than that of the PostgreSQL database. Tuning the database would easily create an even greater disparity.
Caching for Data Lakes
According to AWS, the speed and throughput of a database can be the most impactful factor for overall application performance. Consequently, in-memory data caching can be one of the most effective strategies to improve overall application performance and reduce database costs. This same caching strategy can be applied to analytics applications built atop data lakes, as this post will demonstrate.
According to Hazelcast, memory caching (aka in-memory caching), often simply referred to as caching, is a technique in which computer applications temporarily store data in a computer’s main memory (e.g., RAM) to enable fast retrievals of that data. The RAM used for the temporary storage is known as the cache. As an application tries to read data, typically from a data storage system like a database, it checks to see if the desired record already exists in the cache. If it does, the application will read the data from the cache, thus eliminating the slower access to the database. If the desired record is not in the cache, then the application reads the record from the source. When it retrieves that data, it writes it to the cache so that when the application needs that same data in the future, it can quickly retrieve it from the cache.
Further, according to Hazelcast, as an application tries to read data, typically from a data storage system like a database, it checks to see if the desired record already exists in the cache. If it does, the application will read the data from the cache, thus eliminating the slower access to the database. If the desired record is not in the cache, then the application reads the record from the source. When it retrieves that data, it writes it to the cache so that when the application needs that same data in the future, it can quickly retrieve it from the cache.
Redis In-memory Data Store
According to their website, Redis is the open-source, in-memory data store used by millions of developers as a database, cache, streaming engine, and message broker. Redis provides data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, geospatial indexes, and streams. In addition, Redis has built-in replication, Lua scripting, LRU eviction, transactions, and different levels of on-disk persistence and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.
Amazon ElastiCache for Redis
According to AWS, Amazon ElastiCache for Redis, the fully-managed version of Redis, according to AWS, is a blazing fast in-memory data store that provides sub-millisecond latency to power internet-scale real-time applications. Redis applications can work seamlessly with ElastiCache for Redis without any code changes. ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with manageability, security, and scalability from AWS. As a result, Redis is an excellent choice for implementing a highly available in-memory cache to decrease data access latency, increase throughput, and ease the load on relational and NoSQL databases.
ElastiCache Performance Results
In the following post, we will add in-memory caching to the Spring Boot service introduced in the previous post. In preliminary tests with Amazon ElastiCache for Redis, the Spring Boot service delivered a 34x improvement in average response times. For example, test runs with the best-case scenario of a Redis cache hit ratio of 100% averaged 0.95 seconds compared to 32.4 seconds without Redis.
All the source code and Docker and Kubernetes resources are open-source and available on GitHub.
git clone --depth 1 -b redis \
In addition, a Docker image for the Redis-base Spring Boot service is available on Docker Hub. For this post, use the latest tag with the
The following code changes are required to the Spring Boot service to implement Spring Boot Cache with Redis.
The application properties file,
application.yml, has been modified for both the
prod Spring Profiles. The
dev Spring Profile expects Redis to be running on
localhost. Correspondingly, the project’s
docker-compose.yml file now includes a Redis container for local development. The time-to-live (TTL) for all Redis caches is arbitrarily set to one minute for
dev and five minutes for
prod. To increase application performance and reduce the cost of querying the data lake using Athena, increase Redis’s TTL. Note that increasing the TTL will reduce data freshness.
Athena Application Class
AthenaApplication class declaration is now decorated with Spring Framework’s
EnableCaching annotation (line 22). Additionally, two new Beans have been added (lines 58–68). Spring Redis provides an implementation for the Spring cache abstraction through the
org.springframework.data.redis.cache package. The
RedisCacheManager cache manager creates caches by default upon the first write. The
RedisCacheConfiguration cache configuration helps to customize
RedisCache behavior such as caching null values, cache key prefixes, and binary serialization.
POJO Data Model Classes
Spring Boot Redis caching uses Java serialization and deserialization. Therefore, all the POJO data model classes must implement
Serializable (line 14).
Each public method in the Service classes is now decorated with Spring Framework’s
Cachable annotation (lines 42 and 66). For example, the
findById(int id) method in the
CategoryServiceImp class is annotated with
@Cacheable(value = "categories", key = "#id"). The method’s
key parameter uses Spring Expression Language (SpEL) expression for computing the key dynamically. Default is null, meaning all method parameters are considered as a key unless a custom
keyGenerator has been configured. If no value is found in the Redis cache for the computed key, the target method will be invoked, and the returned value will be stored in the associated cache.
There are no changes required to the Controller classes.
Amazon ElastiCache for Redis
Multiple options are available for creating an Amazon ElastiCache for Redis cluster, including cluster mode, multi-AZ option, auto-failover option, node type, number of replicas, number of shards, replicas per shard, Availability Zone placements, and encryption at rest and encryption in transit options. The results in this post are based on a minimally-configured Redis version 6.2.6 cluster, with one shard, two
cache.r6g.large nodes, and cluster mode, multi-AZ option, and auto-failover all disabled. In addition, encryption at rest and encryption in transit were also disabled. This cluster configuration is sufficient for development and testing, but not Production.
Testing the Cache
To test Amazon ElastiCache for Redis, we will use Postman again with the imported OpenAPI v3 specification. With all data evicted from existing Redis caches, the first time the Postman tests run, they cause the service’s target methods to be invoked and the returned data stored in the associated caches.
To confirm this caching behavior, use the Redis CLI’s
--scan option. To access the
redis-cli, I deployed a single Redis pod to Amazon EKS. The first time the
--scan command runs, we should get back an empty list of keys. After the first Postman test run, the same
--scan command should return a list of cached keys.
Use the Redis CLI’s
MONITOR option to further confirm data is being cached, as indicated by the
After the initial caching of data, use the Redis CLI’s
MONITOR option, again, to confirm the cache is being hit instead of calling the target methods, which would then invoke the Athena API. Rerunning the Postman tests, we should see
get commands as opposed to
Lastly, to confirm the Spring Boot service is effectively using Redis to cache data, we can also check Amazon Athena’s Recent queries tab in the AWS Management Console. After repeated sequential test runs within the TTL window, we should only see one Athena query per endpoint.
In this brief follow-up to the recent post, Developing Spring Boot Applications for Querying Data Lakes on AWS using Amazon Athena, we saw how to substantially increase data lake application performance using Amazon ElastiCache for Redis. Although this caching technique is often associated with databases, it can also be effectively applied to data lake-based applications, as demonstrated in the post.
This blog represents my viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author unless otherwise noted.
Learn how to develop Cloud-native, RESTful Java services that query data in an AWS-based data lake using Amazon Athena’s API
AWS provides a collection of fully-managed services that makes building and managing secure data lakes faster and easier, including AWS Lake Formation, AWS Glue, and Amazon S3. Additional analytics services such as Amazon EMR, AWS Glue Studio, and Amazon Redshift allow Data Scientists and Analysts to run high-performance queries on large volumes of semi-structured and structured data quickly and economically.
What is not always as obvious is how teams develop internal and external customer-facing analytics applications built on top of data lakes. For example, imagine sellers on an eCommerce platform, the scenario used in this post, want to make better marketing decisions regarding their products by analyzing sales trends and buyer preferences. Further, suppose the data required for the analysis must be aggregated from multiple systems and data sources; the ideal use case for a data lake.
In this post, we will explore an example Java Spring Boot RESTful Web Service that allows end-users to query data stored in a data lake on AWS. The RESTful Web Service will access data stored as Apache Parquet in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena. The service will use Spring Boot and the AWS SDK for Java to expose a secure, RESTful Application Programming Interface (API).
Amazon Athena is a serverless, interactive query service based on Presto, used to query data and analyze big data in Amazon S3 using standard SQL. Using Athena functionality exposed by the AWS SDK for Java and Athena API, the Spring Boot service will demonstrate how to access tables, views, prepared statements, and saved queries (aka named queries).
Do you want to explore the source code for this post’s Spring Boot service or deploy it to Kubernetes before reading the full article? All the source code, Docker, and Kubernetes resources are open-source and available on GitHub.
git clone --depth 1 -b main \ https://github.com/garystafford/athena-spring-app.git
A Docker image for the Spring Boot service is also available on Docker Hub.
Data Lake Data Source
There are endless data sources to build a demonstration data lake on AWS. This post uses the TICKIT sample database provided by AWS and designed for Amazon Redshift, AWS’s cloud data warehousing service. The database consists of seven tables. Two previous posts and associated videos, Building a Data Lake on AWS with Apache Airflow and Building a Data Lake on AWS, detail the setup of the data lake used in this post using AWS Glue and optionally Apache Airflow with Amazon MWAA.
Those two posts use the data lake pattern of segmenting data as bronze (aka raw), silver (aka refined), and gold (aka aggregated), popularized by Databricks. The data lake simulates a typical scenario where data originates from multiple sources, including an e-commerce platform, a CRM system, and a SaaS provider must be aggregated and analyzed.
Spring Projects with IntelliJ IDE
Although not a requirement, I used JetBrains IntelliJ IDEA 2022 (Ultimate Edition) to develop and test the post’s Spring Boot service. Bootstrapping Spring projects with IntelliJ is easy. Developers can quickly create a Spring project using the Spring Initializr plugin bundled with the IntelliJ.
The Spring Initializr plugin’s new project creation wizard is based on start.spring.io. The plugin allows you to quickly select the Spring dependencies you want to incorporate into your project.
Visual Studio Code
There are also several Spring extensions for the popular Visual Studio Code IDE, including Microsoft’s Spring Initializr Java Support extension.
This post uses Gradle instead of Maven to develop, test, build, package, and deploy the Spring service. Based on the packages selected in the new project setup shown above, the Spring Initializr plugin’s new project creation wizard creates a
build.gradle file. Additional packages, such as Lombak, Micrometer, and Rest Assured, were added separately.
The Spring boot service is developed for and compiled with the most recent version of Amazon Corretto 17. According to AWS, Amazon Corretto is a no-cost, multiplatform, production-ready distribution of the Open Java Development Kit (OpenJDK). Corretto comes with long-term support that includes performance enhancements and security fixes. Corretto is certified as compatible with the Java SE standard and is used internally at Amazon for many production services.
Each API endpoint in the Spring Boot RESTful Web Service has a corresponding POJO data model class, service interface and service implementation class, and controller class. In addition, there are also common classes such as configuration, a client factory, and Athena-specific request/response methods. Lastly, there are additional class dependencies for views and prepared statements.
The project’s source code is arranged in a logical hierarchy by package and class type.
Amazon Athena Access
There are three standard methods for accessing Amazon Athena with the AWS SDK for Java: 1) the
AthenaClient service client, 2) the
AthenaAsyncClient service client for accessing Athena asynchronously, and 3) using the JDBC driver with the AWS SDK. The
AthenaAsyncClient service clients are both parts of the
software.amazon.awssdk.services.athena package. For simplicity, this post’s Spring Boot service uses the
AthenaClient service client instead of Java’s asynchronously programming model. AWS supplies basic code samples as part of their documentation as a starting point for writing Athena applications using the SDK. The code samples also use the
AthenaClient service client.
POJO-based Data Model Class
For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Plain Old Java Object (POJO). According to Wikipedia, a POGO is an ordinary Java object, not bound by any particular restriction. The POJO class is similar to a JPA Entity, representing persistent data stored in a relational database. In this case, the POJO uses Lombok’s
@Data annotation. According to the documentation, this annotation generates getters for all fields, a useful
toString method, and
equals implementations that check all non-transient fields. It also generates setters for all non-final fields and a constructor.
Each POJO corresponds directly to a ‘silver’ table in the AWS Glue Data Catalog. For example, the
Event POJO corresponds to the
refined_tickit_public_event table in the
tickit_demo Data Catalog database. The POJO defines the Spring Boot service’s data model for data read from the corresponding AWS Glue Data Catalog table.
The Glue Data Catalog table is the interface between the Athena query and the underlying data stored in Amazon S3 object storage. The Athena query targets the table, which returns the underlying data from S3.
Retrieving data from the data lake via AWS Glue, using Athena, is handled by a service class. For each API endpoint in the Spring Boot RESTful Web Service, there is a corresponding Service Interface and implementation class. The service implementation class uses Spring Framework’s
@Service annotation. According to the documentation, it indicates that an annotated class is a “Service,” initially defined by Domain-Driven Design (Evans, 2003) as “an operation offered as an interface that stands alone in the model, with no encapsulated state.” Most importantly for the Spring Boot service, this annotation serves as a specialization of
@Component, allowing for implementation classes to be autodetected through classpath scanning.
Using Spring’s common constructor-based Dependency Injection (DI) method (aka constructor injection), the service auto-wires an instance of the
AthenaClientFactory interface. Note that we are auto-wiring the service interface, not the service implementation, allowing us to wire in a different implementation if desired, such as for testing.
The service calls the
createClient() method, which returns a connection to Amazon Athena using one of several available authentication methods. The authentication scheme will depend on where the service is deployed and how you want to securely connect to AWS. Some options include environment variables, local AWS profile, EC2 instance profile, or token from the web identity provider.
The service class transforms the payload returned by an instance of
GetQueryResultsResponse into an ordered collection (also known as a sequence),
E represents a POJO. For example, with the data lake’s
refined_tickit_public_event table, the service returns a
List<Event>. This pattern repeats itself for tables, views, prepared statements, and named queries. Column data types can be transformed and formatted on the fly, new columns added, and existing columns skipped.
For each endpoint defined in the Controller class, for example,
FindById(), there is a corresponding method in the Service class. Below, we see an example of the
findAll() method in the
SalesByCategoryServiceImp service class. This method corresponds to the identically named method in the
SalesByCategoryController controller class. Each of these service methods follows a similar pattern of constructing a dynamic Athena SQL query based on input parameters, which is passed to Athena through the
AthenaClient service client using an instance of
Lastly, there is a corresponding Controller class for each API endpoint in the Spring Boot RESTful Web Service. The controller class uses Spring Framework’s
@RestController annotation. According to the documentation, this annotation is a convenience annotation that is itself annotated with
@ResponseBody. Types that carry this annotation are treated as controllers where
@RequestMapping methods assume
@ResponseBody semantics by default.
The controller class takes a dependency on the corresponding service class application component using constructor-based Dependency Injection (DI). Like the service example above, we are auto-wiring the service interface, not the service implementation.
The controller is responsible for serializing the ordered collection of POJOs into JSON and returning that JSON payload in the body of the HTTP response to the initial HTTP request.
In addition to querying AWS Glue Data Catalog tables (aka Athena tables), we also query views. According to the documentation, a view in Amazon Athena is a logical table, not a physical table. Therefore, the query that defines a view runs each time the view is referenced in a query.
For convenience, each time the Spring Boot service starts, the main
AthenaApplication class calls the
CreateView() method to check for the existence of the view,
view_tickit_sales_by_day_and_category. If the view does not exist, it is created and becomes accessible to all application end-users. The view is queried through the service’s
This confirm-or-create pattern is repeated for the prepared statement in the main
AthenaApplication class (detailed in the next section).
Below, we see the
View class called by the service at startup.
Aside from the fact the
/salesbycategory endpoint queries a view, everything else is identical to querying a table. This endpoint uses the same model-service-controller pattern.
Executing Prepared Statements
According to the documentation, you can use the Athena parameterized query feature to prepare statements for repeated execution of the same query with different query parameters. The prepared statement used by the service,
tickit_sales_by_seller, accepts a single parameter, the ID of the seller (
sellerid). The prepared statement is executed using the
/salesbyseller endpoint. This scenario simulates an end-user of the analytics application who wants to retrieve enriched sales information about their sales.
The pattern of querying data is similar to tables and views, except instead of using the common
SELECT...FROM...WHERE SQL query pattern, we use the
For example, to execute the prepared statement for a seller with an ID of 3, we would use
EXECUTE tickit_sales_by_seller USING 3;. We pass the seller’s ID of 3 as a path parameter similar to other endpoints exposed by the service:
Again, aside from the fact the
/salesbyseller endpoint executes a prepared statement and passes a parameter; everything else is identical to querying a table or a view, using the same model-service-controller pattern.
Working with Named Queries
In addition to tables, views, and prepared statements, Athena has the concept of saved queries, referred to as named queries in the Athena API and when using AWS CloudFormation. You can use the Athena console or API to save, edit, run, rename, and delete queries. The queries are persisted using a
NamedQueryId, a unique identifier (UUID) of the query. You must reference the
NamedQueryId when working with existing named queries.
There are multiple ways to use and reuse existing named queries programmatically. For this demonstration, I created the named query,
buyer_likes_by_category, in advance and then stored the resulting
NamedQueryId as an application property, injected at runtime or kubernetes deployment time through a local environment variable.
Alternately, you might iterate through a list of named queries to find one that matches the name at startup. However, this method would undoubtedly impact service performance, startup time, and cost. Lastly, you could use a method like
NamedQuery() included in the unused
NamedQuery class at startup, similar to the view and prepared statement. That named query’s unique
NamedQueryId would be persisted as a system property, referencable by the service class. The downside is that you would create a duplicate of the named query each time you start the service. Therefore, this method is also not recommended.
Two components responsible for persisting configuration for the Spring Boot service are the
application.yml properties file and
ConfigProperties class. The class uses Spring Framework’s
@ConfigurationProperties annotation. According to the documentation, this annotation is used for externalized configuration. Add this to a class definition or a
@Bean method in a
@Configuration class if you want to bind and validate some external Properties (e.g., from a
.yml file). Binding is performed by calling setters on the annotated class or, if
@ConstructorBinding in use, by binding to the constructor parameters.
@ConfigurationProperties annotation includes the
athena. This value corresponds to the
athena prefix in the the
application.yml properties file. The fields in the
ConfigProperties class are bound to the properties in the the
application.yml. For example, the property,
namedQueryId, is bound to the property,
athena.named.query.id. Further, that property is bound to an external environment variable,
NAMED_QUERY_ID. These values could be supplied from an external configuration system, a Kubernetes secret, or external secrets management system.
AWS IAM: Authentication and Authorization
For the Spring Boot service to interact with Amazon Athena, AWS Glue, and Amazon S3, you need to establish an AWS IAM Role, which the service assumes once authenticated. The Role must be associated with an attached IAM Policy containing the requisite Athena, Glue, and S3 permissions. For development, the service uses a policy similar to the one shown below. Please note this policy is broader than recommended for Production; it does not represent the security best practice of least privilege. In particular, the use of the overly-broad
* for Resources should be strictly avoided when creating policies.
In addition to the authorization granted by the IAM Policy, AWS Lake Formation can be used with Amazon S3, AWS Glue, and Amazon Athena to grant fine-grained database-, table-, column-, and row-level access to datasets.
Swagger UI and the OpenAPI Specification
The easiest way to view and experiment with all the endpoints available through the controller classes is using the Swagger UI, included in the example Spring Boot service, by way of the
springdoc-openapi Java library. The Swagger UI is accessed at
The OpenAPI Specification (formerly Swagger Specification) is an API description format for REST APIs. The
/v1/v3/api-docs endpoint allows you to generate an OpenAPI v3 specification file. The OpenAPI file describes the entire API.
Included in the Spring Boot service’s source code is a limited number of example integration tests, not to be confused with unit tests. Each test class uses Spring Framework’s
@SpringBootTest annotation. According to the documentation, this annotation can be specified on a test class that runs Spring Boot-based tests. It provides several features over and above the regular Spring
The integration tests use Rest Assured’s given-when-then pattern of testing, made popular as part of Behavior-Driven Development (BDD). In addition, each test uses the JUnit’s
@Test annotation. According to the documentation, this annotation signals that the annotated method is a test method. Therefore, methods using this annotation must not be private or static and must not return a value.
Run the integration tests using Gradle from the project’s root:
./gradlew clean build test. A detailed ‘Test Summary’ is produced in the project’s
build directory as HTML for easy review.
Load Testing the Service
In Production, the Spring Boot service will need to handle multiple concurrent users executing queries against Amazon Athena.
We could use various load testing tools to evaluate the service’s ability to handle multiple concurrent users. One of the simplest is my favorite go-based utility,
hey, which sends load to a URL using a provided number of requests in the provided concurrency level and prints stats. It also supports HTTP2 endpoints. So, for example, we could execute 500 HTTP requests with a concurrency level of 25 against the Spring Boot service’s
/users endpoint using
hey. The post’s integration tests were run against three Kubernetes replica pods of the service deployed to Amazon EKS.
hey -n 500 -c 25 -T "application/json;charset=UTF-8" \
From Athena’s Recent Queries console, we see many simultaneous queries being queued and executed by a
hey through the Spring Boot service’s endpoint.
The Spring Boot service implements the
micrometer-registry-prometheus extension. The Micrometer metrics library exposes runtime and application metrics. Micrometer defines a core library, providing a registration mechanism for metrics and core metric types. These metrics are exposed by the service’s
Using the Micrometer extension, metrics exposed by the
/v1/actuator/prometheus endpoint can be scraped and visualized by tools such as Prometheus. Conveniently, AWS offers the fully-managed Amazon Managed Service for Prometheus (AMP), which easily integrates with Amazon EKS.
This post taught us how to create a Spring Boot RESTful Web Service, allowing end-user applications to securely query data stored in a data lake on AWS. The service used AWS SDK for Java to access data stored in Amazon S3 through an AWS Glue Data Catalog using Amazon Athena.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners. All diagrams and illustrations are property of the author unless otherwise noted.
Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices
As discussed in Part One of this post, given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.
In Part Two of this post, we will review how to deploy and run the storefront API components in a local development environment running on Kubernetes with Istio, using minikube. For simplicity’s sake, we will only run a single instance of each service. Additionally, we are not implementing custom domain names, TLS/HTTPS, authentication and authorization, API keys, or restricting access to any sensitive operational API endpoints or ports, all of which we would certainly do in an actual production environment.
This post will assume a basic level of knowledge of Kubernetes, minikube, Docker, and Istio. Furthermore, the post assumes you have already installed recent versions of minikube, kubectl, Docker, and Istio. Meaning, that the
minikube commands are all available from the terminal.
For this post demonstration, I am using an Apple MacBook Pro running macOS as my development machine. I have the latest versions of Docker Desktop, minikube, kubectl, and Istio installed as of May 2021.
The source code for this post is open-source and is publicly available on GitHub. Clone the GitHub project using the following command:
clone --branch 2021-istio \
--single-branch --depth 1 \
Part of the Kubernetes project, minikube is local Kubernetes, focusing on making it easy to learn and develop for Kubernetes. Minikube quickly sets up a local Kubernetes cluster on macOS, Linux, and Windows. Given the number of Kubernetes resources we will be deploying to minikube, I would recommend at least 3 CPUs and 4–5 GBs of memory. If you choose to deploy multiple observability tools, you may want to increase both of these resources if you can afford it. I maxed out both CPUs and memory several times while setting up this demonstration, causing temporary lock-ups of minikube.
minikube --cpus 3 --memory 5g --driver=docker start start
The Docker driver allows you to install Kubernetes into an existing Docker install. If you are using Docker, please be aware that you must have at least an equivalent amount of resources allocated to Docker to apportion to minikube.
Before continuing, confirm minikube is up and running and confirm the current context of
kubectl config current-context
The statuses should look similar to the following:
eval below command to point your shell to minikube’s docker-daemon. You can confirm this by using the
docker image ls and
docker container ls command to view running Kubernetes containers on minikube.
eval $(minikube -p minikube docker-env)
docker image ls
docker container ls
The output should look similar to the following:
You can also check the status of minikube from Docker Desktop. Minikube is running as a container, instantiated from a Docker image,
gcr.io/k8s-minikube/kicbase. View the container’s Stats, as shown below.
Assuming you have downloaded and configured Istio, install it onto minikube. I currently have Istio 1.10.0 installed and have the
ISTIO_HOME environment variable set in my Oh My Zsh
.zshrc file. I have also set Istio’s
bin/ subdirectory in my
PATH environment variable. The
bin/ subdirectory contains the
> client version: 1.10.0
control plane version: 1.10.0
data plane version: 1.10.0 (4 proxies)
Istio comes with several built-in configuration profiles. The profiles provide customization of the Istio control plane and of the sidecars for the Istio data plane.
istioctl profile list
> Istio configuration profiles:
For this demonstration, we will use the default profile, which installs
istiod and an
istio-ingressgateway. We will not require the use of an
istio-egressgateway, since all components will be installed locally on minikube.
istioctl install --set profile=default -y
> ✔ Istio core installed
✔ Istiod installed
✔ Ingress gateways installed
✔ Installation complete
kubectl get svc istio-ingressgateway -n istio-system
To associate an IP address, run the
minikube tunnel command in a separate terminal tab. Since it requires opening privileged ports 80 and 443 to be exposed, this command will prompt you for your
Services of the type
LoadBalancer can be exposed by using the
minikube tunnel command. It must be run in a separate terminal window to keep the
LoadBalancer running. We previously created the
istio-ingressgateway. Run the following command and note that the status of
<pending>. There is currently no external IP address associated with our
Rerun the previous command. There should now be an external IP address associated with the
LoadBalancer. In my case,
kubectl get svc istio-ingressgateway -n istio-system
The external IP address shown is the address we will use to access the resources we chose to expose externally on minikube.
Once again, in a separate terminal tab, open the Minikube Dashboard (aka Kubernetes Dashboard).
The dashboard will give you a visual overview of all your installed Kubernetes components.
Kubernetes supports multiple virtual clusters backed by the same physical cluster. These virtual clusters are called namespaces. For this demonstration, we will use four namespaces to organize our deployed resources:
dev namespace is where we will deploy our Storefront API’s microservices:
fulfillment. We will deploy MongoDB and Mongo Express to the
mongo namespace. Lastly, we will use the
storefront-kafka-project namespaces to deploy Apache Kafka to minikube using Strimzi, a Cloud Native Computing Foundation sandbox project, and CMAK.
kubectl apply -f ./minikube/resources/namespaces.yaml
Automatic Sidecar Injection
In order to take advantage of all of Istio’s features, pods in the mesh must be running an Istio sidecar proxy. When you set the
label on a namespace and the injection webhook is enabled, any new pods created in that namespace will automatically have a sidecar added to them. Labeling the
dev namespace for automatic sidecar injection ensures that our Storefront API’s microservices —
fulfillment— will have Istio sidecar proxy automatically injected into their pods.
kubectl label namespace dev istio-injection=enabled
Next, deploy MongoDB and Mongo Express to the
mongo namespace on minikube. To ensure a successful connection to MongoDB from Mongo Express, I suggest giving MongoDB a chance to start up fully before deploying Mongo Express.
kubectl apply -f ./minikube/resources/mongodb.yaml -n mongo
kubectl apply -f ./minikube/resources/mongo-express.yaml -n mongo
To confirm the success of the deployments, use the following command:
kubectl get services -n mongo
Or use the Kubernetes Dashboard to confirm deployments.
Mongo Express UI Access
For parts of your application (for example, frontends) you may want to expose a Service onto an external IP address outside of your cluster. Kubernetes
ServiceTypes allows you to specify what kind of Service you want; the default is
Note that while MongoDB uses the
ClusterIP, Mongo Express uses
NodePort. With NodePort, the Service is exposed on each Node’s IP at a static port (the
NodePort). You can contact the
NodePort Service, from outside the cluster, by requesting
In a separate terminal tab, open Mongo Express using the following command:
minikube service --url mongo-express -n mongo
You should see output similar to the following:
Click on the link to open Mongo Express. There should already be three MongoDB operational databases shown in the UI. The three Storefront databases and collections will be created automatically, later in the post:
Apache Kafka using Strimzi
Next, we will install Apache Kafka and Apache Zookeeper into the
storefront-kafka-project namespaces on minikube, using Strimzi. Since Strimzi has a great, easy-to-use Quick Start guide, I will not detail the complete install complete process in this post. I suggest using their guide to understand the process and what each command does. Then, use the slightly modified Strimzi commands I have included below to install Kafka and Zookeeper.
# assuming 0.23.0 is latest version available
curl -L -O https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.23.0/strimzi-0.23.0.zip
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
# manually change STRIMZI_NAMESPACE value to storefront-kafka-project
kubectl create -f install/cluster-operator/ -n kafka
kubectl create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n storefront-kafka-project
kubectl create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-cluster.yaml -n storefront-kafka-project
kubectl wait kafka/kafka-cluster --for=condition=Ready --timeout=300s -n storefront-kafka-project
kubectl apply -f ../storefront-demo/minikube/resources/strimzi-kafka-topics.yaml -n storefront-kafka-project
We want to install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. However, CMAK required access to Zookeeper. You can not access Strimzi’s Zookeeper directly from CMAK; this is intentional to avoid performance and security issues. See this GitHub issue for a better explanation of why. We will use the appropriately named Zoo Entrance as a proxy for CMAK to Zookeeper to overcome this challenge.
To install Zoo Entrance, review the GitHub project’s install guide, then use the following commands:
# optional: change my-cluster to kafka-cluster
sed -i '' 's/my-cluster/kafka-cluster/' deploy.yaml
kubectl apply -f deploy.yaml -n storefront-kafka-project
Cluster Manager for Apache Kafka
Next, install Yahoo’s CMAK (Cluster Manager for Apache Kafka) to give us a management interface for Kafka. Run the following command to deploy CMAK into the
kubectl apply -f ./minikube/resources/cmak.yaml -n storefront-kafka-project
Similar to Mongo Express, we can access CMAK’s UI using its
NodePort. In a separate terminal tab, run the following command:
minikube service --url cmak -n storefront-kafka-project
You should see output similar to Mongo Express. Click on the link provided to access CMAK. Choose ‘Add Cluster’ in CMAK to add our existing Kafka cluster to CMAK’s management interface. Use Zoo Enterence’s service address for the Cluster Zookeeper Hosts value.
Once complete, you should see the three Kafka topics we created previously with Strimzi:
orders.order.change. Each topic will have three partitions, one replica, and one broker. You should also see the
_consumer_offsets topic that Kafka uses to store information about committed offsets for each topic:partition per group of consumers (groupID).
Storefront API Microservices
We are finally ready to install our Storefront API’s microservices into the
dev namespace. Each service is preconfigured to access Kafka and MongoDB in their respective namespaces.
kubectl apply -f ./minikube/resources/accounts.yaml -n dev
kubectl apply -f ./minikube/resources/orders.yaml -n dev
kubectl apply -f ./minikube/resources/fulfillment.yaml -n dev
Spring Boot services usually take about two minutes to fully start. The time required to download the Docker Images from docker.com and the start-up time means it could take 3–4 minutes for each of the three services to be ready to accept API traffic.
We want to be able to access our Storefront API’s microservices through our Kubernetes
LoadBalancer, while also leveraging all the capabilities of Istio as a service mesh. To do so, we need to deploy an Istio
Gateway and a
VirtualService. We will also need to deploy
DestinationRule resources. A
Gateway describes a load balancer operating at the edge of the mesh receiving incoming or outgoing HTTP/TCP connections. A
VirtualService defines a set of traffic routing rules to apply when a host is addressed. Lastly, a
DestinationRule defines policies that apply to traffic intended for a Service after routing has occurred.
kubectl apply -f ./minikube/resources/destination_rules.yaml -n dev
kubectl apply -f ./minikube/resources/istio-gateway.yaml -n dev
Testing the System and Creating Sample Data
I have provided a Python 3 script that runs a series of seven
HTTP GET requests, in a specific order, against the Storefront API. These calls will validate the deployments, confirm the API’s services can access Kafka and MongoDB, generate some initial data, and automatically create the MongoDB database collections from the initial Insert statements.
python3 -m pip install -r ./utility_scripts/requirements.txt -U
The script’s output should be as follows:
If we now look at Mongo Express, we should note three new databases:
Istio makes it easy to integrate with a number of common tools, including cert-manager, Prometheus, Grafana, Kiali, Zipkin, and Jaeger. In order to better observe our Storefront API, we will install three well-known observability tools: Kiali, Prometheus, and Grafana. Luckily, these tools are all included with Istio. You can install any or all of these to minikube. I suggest installing the tools one at a time as not to overwhelm minikube’s CPU and memory resources.
kubectl apply -f ./minikube/resources/prometheus.yaml kubectl apply -f $ISTIO_HOME/samples/addons/grafana.yaml kubectl apply -f $ISTIO_HOME/samples/addons/kiali.yaml
Once deployment is complete, to access any of the UI’s for these tools, use the
istioctl dashboard command from a new terminal window:
istioctl dashboard kiali istioctl dashboard prometheus istioctl dashboard grafana
Below we see a view of Kiali with API traffic flowing to Kafka and MongoDB.
Each of the three Storefront API microservices has a dependency on Micrometer; specifically, a dependency on
micrometer-registry-prometheus. As an instrumentation facade, Micrometer allows you to instrument your code with dimensional metrics with a vendor-neutral interface and decide on the monitoring system as a last step. Instrumenting your core library code with Micrometer allows the libraries to be included in applications that ship metrics to different backends. Given the Micrometer Prometheus dependency, each microservice exposes a
/prometheus endpoint (e.g.,
http://127.0.0.1/accounts/actuator/prometheus) as shown below in Postman.
/prometheus endpoint exposes dozens of useful metrics and is configured to be scraped by Prometheus. These metrics can be displayed in Prometheus and indirectly in Grafana dashboards via Prometheus. I have customized Istio’s version of Prometheus and included it in the project (
prometheus.yaml), which now scrapes the Storefront API’s metrics.
- job_name: 'spring_micrometer'
- targets: ['accounts.dev:8080','orders.dev:8080','fulfillment.dev:8080']
Here we see an example graph of a Spring Kafka Listener metric,
spring_kafka_listener_seconds_sum, in Prometheus. There are dozens of metrics exposed to Prometheus from our system that we can observe and alert on.
Lastly, here is an example Spring Boot Dashboard in Grafana. More dashboards are available on Grafana’s community dashboard page. The Grafana dashboard uses Prometheus as the source of its metrics data.
Storefront API Endpoints
The three storefront services are fully functional Spring Boot, Spring Data REST, Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. To better understand the Storefront API, each Spring Boot microservice uses SpringFox, which produces automated JSON API documentation for APIs built with Spring. The service builds also include the
springfox-swagger-ui web jar, which ships with Swagger UI. Swagger takes the manual work out of API documentation, with a range of solutions for generating, visualizing, and maintaining API docs.
Each service’s data model (POJOs) is also exposed through the Swagger UI.
Spring Boot Actuator
Additionally, each service includes Spring Boot Actuator. The Actuator exposes additional operational endpoints, allowing us to observe the running services. With Actuator, you get many features, including access to available operational-oriented endpoints, using the
/actuator/ subdirectory/subpath (e.g.,
http://127.0.0.1/accounts/actuator/). For this demonstration, I have not restricted access to any available Actuator endpoints.
In this two-part post, we learned how to build an API using Spring Boot. We ensured the API’s distributed data integrity using a pub/sub model with Spring for Apache Kafka Project. When a relevant piece of data was changed by one microservice, that state change triggered a state change event that was shared with other microservices using Kafka topics.
We also learned how to deploy and run the API in a local development environment running on Kubernetes with Istio, using minikube. We have added production-tested observability tools to provide operational visibility, including CMAK, Mongo Express, Kiali, Prometheus, and Grafana.
This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.
Using Spring for Apache Kafka to manage a Distributed Data Model in MongoDB across multiple microservices
Given a modern distributed system composed of multiple microservices, each possessing a sub-set of a domain’s aggregate data, the system will almost assuredly have some data duplication. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge — Apache Kafka and the model of eventual consistency.
Apache Kafka is an open-source distributed event streaming platform capable of handling trillions of messages. According to Confluent, initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform.
Eventual consistency, according to Wikipedia, is a consistency model used in distributed computing to achieve high availability that informally guarantees that if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. I previously covered the topic of eventual consistency in a distributed system using RabbitMQ in the May 2017 post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. The post was featured on Pivotal’s RabbitMQ website.
To ground the discussion, let’s examine a common example — an online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.
Given this problem domain, we can assume we have the concept of a Customer. Further, we can assume the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of the Customer will require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program and online shopping activity. Fulfillment may maintain a record of all orders being shipped to the customer. Security likely holds the customer’s access credentials, account access history, and privacy settings.
Below are the Customer data objects are shown in yellow. Orange represents the logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.
Distributed Data Consistency
If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts or even between services within the same context, then we must ensure data consistency. Take, for example, the case where a customer changes their home address or email. Let us assume that the Accounting context is the system of record for these data fields. However, to fulfill orders, the Shipping context might also need to maintain the customer’s current home address. Likewise, the Marketing context, responsible for opt-in email advertising, also needs to be aware of the email change and update its customer records.
If a piece of shared data is changed, then the party making the change should be responsible for communicating the change without expecting a response. They are stating a fact, not asking a question. Interested parties can choose if and how to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, defined by Martin Fowler of ThoughtWorks in his insightful post, What do you mean by “Event-Driven”?. Changes to a piece of data can be thought of as a state change event — events that contain details of the data that changed. Coincidentally, Fowler uses a customer’s address change as an example of Event-Carried State Transfer in the post. Fellow former ThoughtWorker Graham Brooks also detailed the concept in his post, Event-Carried State Transfer Pattern.
Multiple architectural approaches can be taken to solve for data consistency in a distributed system. For example, you could use a single relational database with shared schemas to persist data, avoiding the distributed data model altogether. However, it could be argued that using a single database just turned your distributed system back into a monolith.
You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.
Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages persist in Kafka, the service has the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity to the system.
In this post’s somewhat simplistic architecture, the business microservices will maintain consistency across their respective domains by producing and consuming messages from multiple Kafka topics to which they are subscribed. Kafka Producers may also be Consumers within our domain.
In this post, our online storefront API will be built in Java using Spring Boot and OpenJDK 16. We will ensure the uniformity of distributed data by using a publish/subscribe model with Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot microservice, if appropriate, that state change will trigger a state change event, which will be shared with other microservices using Kafka topics.
We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from one another while still ensuring the data is distributed.
Given the use case of placing an order, we will examine the interactions of three services that compose our storefront API: the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other in a completely decoupled manner.
The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at three event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service.
Below is a view of the online storefront through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you an idea of where Kafka and Zookeeper, Kafka’s current cluster manager, might sit in a typical, highly-available, microservice-based, distributed application platform.
This post will focus on the storefront’s backend API — its services, databases, and messaging sub-systems.
We will explore the functionality of each of the three microservices and how they share state change events using Kafka 2.8. Each storefront API service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream are not part of this post.
The storefront’s microservices source code is publicly available on GitHub. The four GitHub projects can be cloned using the following commands:
git clone --branch 2021-istio \
--single-branch --depth 1 \
git clone --branch 2021-istio \
--single-branch --depth 1 \
git clone --branch 2021-istio \
--single-branch --depth 1 \
git clone --branch 2021-istio \
--single-branch --depth 1 \
Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.
Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. Below we see the representation of a
Customer, as a BSON document in the
customer.accounts MongoDB database collection.
Along with the primary
Customer entity, the Accounts service contains a
CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the
CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added or a change is made to an existing customer. The
CustomerChangeEvent object is not an exact duplicate of the
Customer object. For example, the
CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the
CreditCard data object).
CustomerChangeEvent domain event object does not persist in MongoDB, we can look at its JSON message payload in Kafka to examine its structure. Note the differences in the data structure (schema) between the
Customer document in MongoDB and the Kafka
CustomerChangeEvent message payload.
For simplicity, we will assume that other services do not make changes to the customer’s name, contact information, or addresses — this is the sole responsibility of the Accounts service.
Source code for the Accounts service is available on GitHub. Use the latest
2021-istio branch of the project.
The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.
CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (
Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a
CustomerOrders, represented as a BSON document in the
customer.orders database collection, looks as follows:
Along with the primary
CustomerOrders entity, the Orders service contains the
FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the
FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. The
FulfillmentRequestEvent object only contains the information it needs to share. Our example shares a single
Order, along with the customer’s name, contact information, and shipping address.
FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at its JSON message payload in Kafka. Again, note the schema differences between the
CustomerOrders document in MongoDB and the
FulfillmentRequestEvent message payload in Kafka.
Source code for the Orders service is available on GitHub. Use the latest
2021-istio branch of the project.
Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.
The Fulfillment service’s primary entity, the
Fulfillment class, is persisted in MongoDB. This entity contains a single
Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the
Fulfillment entity to store the latest shipping status, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping address are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service via Kafka, using the
In the Fulfillment MongoDB database, a
Fulfillment object represented as a BSON document in the
fulfillment.requests database collection looks as follows:
Along with the primary
Fulfillment entity, the Fulfillment service has an
OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the
OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The
OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.
OrderStatusChangeEvent domain event object is not persisted in MongoDB, again, we can again look at its JSON message payload in Kafka.
Source code for the Fulfillment service is available on GitHub. Use the latest
2021-istio branch of the project.
State Change Event Messaging Flows
There are three state change event messaging flows illustrated in this post.
- Changes to a Customer triggers an event message produced by the Accounts service, which is published on the
accounts.customer.changeKafka topic and consumed by the Orders service;
- Order Approved triggers an event message produced by the Orders service, which is published on the
orders.order.fulfillKafka topic, and is consumed by the Fulfillment service;
- Changes to the status of an Order triggers an event message produced by the Fulfillment Service, which is published on the
fulfillment.order.changeKafka topic, and is consumed by the Orders service;
Each of these state change event messaging flows follows the same architectural pattern on both the Kafka topic’s producer and consumer sides.
Let us examine each state change event messaging flow and the code behind it.
Customer State Change
When a new Customer entity is created or updated by the Accounts service, a
CustomerChangeEvent message is produced and sent to the
accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. By way of Kafka, it can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information.
There are different methods to trigger a message to be sent to Kafka. For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity.
The listener handles the event by instantiating a new
CustomerChangeEvent with the Customer’s information and passes it to the
SenderConfig class handles the configuration of the
Sender. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the
CustomerChangeEvent object into a JSON message payload.
Sender uses a KafkaTemplate to send the message to the
accounts.customer.change Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.
The Orders service’s
Receiver class consumes the
CustomerChangeEvent messages produced by the Accounts service.
The Orders service’s
Receiver class is configured differently compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the
ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s
ReceiverConfig references Spring Kafka’s
AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching.
Each Kafka topic the Orders service consumes messages from is associated with a method in the
Receiver class (shown above). This method accepts a specific object type as input, denoting the object type into which the message payload needs to be deserialized. This way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a
CustomerChangeEvent, the Orders service calls the
receiveCustomerOrder method to consume the message and properly deserialize it.
For all services, a Spring
application.yaml properties file in each service’s
resources directory contains the Kafka configuration (lines 11–19).
Order Approved for Fulfillment
When the status of the
Order in a
CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a
FulfillmentRequestEvent message is produced and sent to the
orders.order.fulfill Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.
Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A
FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The
FulfillmentRequestEvent is passed to the
SenderConfig class handles the configuration of the
Sender class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the
FulfillmentRequestEvent object into a JSON message payload.
Sender class uses a
KafkaTemplate to send the message to the
orders.order.fulfill Kafka topic, as shown below. Since message order is not critical, messages can be sent to a topic with multiple partitions if the volume of messages required it.
The Fulfillment service’s
Receiver class consumes the
FulfillmentRequestEvent from the Kafka topic and instantiates a
Fulfillment object, containing the data passed in the
FulfillmentRequestEvent message payload. The
Fulfillment object includes the order to be fulfilled and the customer’s contact and shipping information.
The Fulfillment service’s
ReceiverConfig class defines the
ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a