** This post has been rewritten and updated in May 2021 **
Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we’ve been exploring one possible solution to this challenge, using Apache Kafka and the model of eventual consistency. In Part One, we examined the online storefront domain, the storefront’s microservices, and the system’s state change event message flows.
Part Two
In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.
Source code for deploying the Dockerized components of the online storefront, shown in this post, is available on GitHub. All Docker Images are available on Docker Hub. I have chosen the wurstmeister/kafka-docker version of Kafka, available on Docker Hub; it has 580+ stars and 10M+ pulls on Docker Hub. This version of Kafka works well, as long as you run it within a Docker Swarm, locally.
Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
Deployment Options
For simplicity, I’ve used Docker’s native Docker Swarm Mode to support the deployed online storefront. Docker requires minimal configuration as opposed to other CaaS platforms. Usually, I would recommend Minikube for local development if the final destination of the storefront were Kubernetes in Production (AKS, EKS, or GKE). Alternatively, if the final destination of the storefront was Red Hat OpenShift in Production, I would recommend Minishift for local development.
Docker Deployment
We will break up our deployment into two parts. First, we will deploy everything except our services. We will allow Kafka, MongoDB, Eureka, and the other components to start up fully. Afterward, we will deploy the three online storefront services. The storefront-kafka-docker
project on Github contains two Docker Compose files, which are divided between the two tasks.
The middleware Docker Compose file (gist).
version: '3.2' | |
services: | |
zuul: | |
image: garystafford/storefront-zuul:latest | |
expose: | |
- "8080" | |
ports: | |
- "8080:8080/tcp" | |
depends_on: | |
- kafka | |
- mongo | |
- eureka | |
hostname: zuul | |
environment: | |
# LOGGING_LEVEL_ROOT: DEBUG | |
RIBBON_READTIMEOUT: 3000 | |
RIBBON_SOCKETTIMEOUT: 3000 | |
ZUUL_HOST_CONNECT_TIMEOUT_MILLIS: 3000 | |
ZUUL_HOST_CONNECT_SOCKET_MILLIS: 3000 | |
networks: | |
- kafka-net | |
eureka: | |
image: garystafford/storefront-eureka:latest | |
expose: | |
- "8761" | |
ports: | |
- "8761:8761/tcp" | |
hostname: eureka | |
networks: | |
- kafka-net | |
mongo: | |
image: mongo:latest | |
command: --smallfiles | |
# expose: | |
# - "27017" | |
ports: | |
- "27017:27017/tcp" | |
hostname: mongo | |
networks: | |
- kafka-net | |
mongo_express: | |
image: mongo-express:latest | |
expose: | |
- "8081" | |
ports: | |
- "8081:8081/tcp" | |
hostname: mongo_express | |
networks: | |
- kafka-net | |
zookeeper: | |
image: wurstmeister/zookeeper:latest | |
ports: | |
- "2181:2181/tcp" | |
hostname: zookeeper | |
networks: | |
- kafka-net | |
kafka: | |
image: wurstmeister/kafka:latest | |
depends_on: | |
- zookeeper | |
# expose: | |
# - "9092" | |
ports: | |
- "9092:9092/tcp" | |
environment: | |
KAFKA_ADVERTISED_HOST_NAME: kafka | |
KAFKA_CREATE_TOPICS: "accounts.customer.change:1:1,fulfillment.order.change:1:1,orders.order.fulfill:1:1" | |
KAFKA_ADVERTISED_PORT: 9092 | |
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 | |
KAFKA_DELETE_TOPIC_ENABLE: "true" | |
volumes: | |
- /var/run/docker.sock:/var/run/docker.sock | |
hostname: kafka | |
networks: | |
- kafka-net | |
kafka_manager: | |
image: hlebalbau/kafka-manager:latest | |
ports: | |
- "9000:9000/tcp" | |
expose: | |
- "9000" | |
depends_on: | |
- kafka | |
environment: | |
ZK_HOSTS: "zookeeper:2181" | |
APPLICATION_SECRET: "random-secret" | |
command: -Dpidfile.path=/dev/null | |
hostname: kafka_manager | |
networks: | |
- kafka-net | |
networks: | |
kafka-net: | |
driver: overlay |
The services Docker Compose file (gist).
version: '3.2' | |
services: | |
accounts: | |
image: garystafford/storefront-accounts:latest | |
depends_on: | |
- kafka | |
- mongo | |
hostname: accounts | |
# environment: | |
# LOGGING_LEVEL_ROOT: DEBUG | |
networks: | |
- kafka-net | |
orders: | |
image: garystafford/storefront-orders:latest | |
depends_on: | |
- kafka | |
- mongo | |
- eureka | |
hostname: orders | |
# environment: | |
# LOGGING_LEVEL_ROOT: DEBUG | |
networks: | |
- kafka-net | |
fulfillment: | |
image: garystafford/storefront-fulfillment:latest | |
depends_on: | |
- kafka | |
- mongo | |
- eureka | |
hostname: fulfillment | |
# environment: | |
# LOGGING_LEVEL_ROOT: DEBUG | |
networks: | |
- kafka-net | |
networks: | |
kafka-net: | |
driver: overlay |
In the storefront-kafka-docker
project, there is a shell script, stack_deploy_local.sh
. This script will execute both Docker Compose files in succession, with a pause in between. You may need to adjust the timing for your own system (gist).
#!/bin/sh | |
# Deploys the storefront Docker stack | |
# usage: sh ./stack_deploy_local.sh | |
set -e | |
docker stack deploy -c docker-compose-middleware.yml storefront | |
echo "Starting the stack: middleware...pausing for 30 seconds..." | |
sleep 30 | |
docker stack deploy -c docker-compose-services.yml storefront | |
echo "Starting the stack: services...pausing for 10 seconds..." | |
sleep 10 | |
docker stack ls | |
docker stack services storefront | |
docker container ls | |
echo "Script completed..." | |
echo "Services may take up to several minutes to start, fully..." |
Start by running docker swarm init
. This command will initialize a Docker Swarm. Next, execute the stack deploy script, using an sh ./stack_deploy_local.sh
command. The script will deploy a new Docker Stack, within the Docker Swarm. The Docker Stack will hold all storefront components, deployed as individual Docker containers. The stack is deployed within its own isolated Docker overlay network, kafka-net
.
Note that we are not using host-based persistent storage for this local development demo. Destroying the Docker stack or the individual Kafka, Zookeeper, or MongoDB Docker containers will result in a loss of data.
Before completion, the stack deploy script runs docker stack ls
command, followed by a docker stack services storefront
command. You should see one stack, named storefront, with ten services. You should also see each of the ten services has 1/1 replicas running, indicating everything has started or is starting correctly, without failure. Failure would be reflected here as a service having 0/1 replicas.
Before completion, the stack deploy script also runs docker container ls
command. You should observe each of the ten running containers (‘services’ in the Docker stack), along with their instance names and ports.
There is also a shell script, stack_delete_local.sh
, which will issue a docker stack rm storefront
command to destroy the stack when you are done.
Using the names of the storefront’s Docker containers, you can check the start-up logs of any of the components, using the docker logs
command.
Testing the Stack
With the storefront stack deployed, we need to confirm that all the components have started correctly and are communicating with each other. To accomplish this, I’ve written a simple Python script, refresh.py
. The refresh script has multiple uses. It deletes any existing storefront service MongoDB databases. It also deletes any existing Kafka topics; I call the Kafka Manager’s API to accomplish this. We have no databases or topics since our stack was just created. However, if you are actively developing your data models, you will likely want to purge the databases and topics regularly (gist).
#!/usr/bin/env python3 | |
# Delete (3) MongoDB databases, (3) Kafka topics, | |
# create sample data by hitting Zuul API Gateway endpoints, | |
# and return MongoDB documents as verification. | |
# usage: python3 ./refresh.py | |
from pprint import pprint | |
from pymongo import MongoClient | |
import requests | |
import time | |
client = MongoClient('mongodb://localhost:27017/') | |
def main(): | |
delete_databases() | |
delete_topics() | |
create_sample_data() | |
get_mongo_doc('accounts', 'customer.accounts') | |
get_mongo_doc('orders', 'customer.orders') | |
get_mongo_doc('fulfillment', 'fulfillment.requests') | |
def delete_databases(): | |
dbs = ['accounts', 'orders', 'fulfillment'] | |
for db in dbs: | |
client.drop_database(db) | |
print('MongoDB dropped: ' + db) | |
dbs = client.database_names() | |
print('Reamining databases:') | |
print(dbs) | |
print('\n') | |
def delete_topics(): | |
# call Kafka Manager API | |
topics = ['accounts.customer.change', | |
'orders.order.fulfill', | |
'fulfillment.order.change'] | |
for topic in topics: | |
kafka_manager_url = 'http://localhost:9000/clusters/dev/topics/delete?t=' + topic | |
r = requests.post(kafka_manager_url, data={'topic': topic}) | |
time.sleep(3) | |
print('Kafka topic deleted: ' + topic) | |
print('\n') | |
def create_sample_data(): | |
sample_urls = [ | |
'http://localhost:8080/accounts/customers/sample', | |
'http://localhost:8080/orders/customers/sample/orders', | |
'http://localhost:8080/orders/customers/sample/fulfill', | |
'http://localhost:8080/fulfillment/fulfillments/sample/process', | |
'http://localhost:8080/fulfillment/fulfillments/sample/ship', | |
'http://localhost:8080/fulfillment/fulfillments/sample/in-transit', | |
'http://localhost:8080/fulfillment/fulfillments/sample/receive'] | |
for sample_url in sample_urls: | |
r = requests.get(sample_url) | |
print(r.text) | |
time.sleep(5) | |
print('\n') | |
def get_mongo_doc(db_name, collection_name): | |
db = client[db_name] | |
collection = db[collection_name] | |
pprint(collection.find_one()) | |
print('\n') | |
if __name__ == "__main__": | |
main() |
Next, the refresh script calls a series of RESTful HTTP endpoints, in a specific order, to create sample data. Our three storefront services each expose different endpoints. Different /sample
endpoints create sample customers, orders, order fulfillment requests, and shipping notifications. The create sample data endpoints include, in order:
- Sample Customer: /accounts/customers/sample
- Sample Orders: /orders/customers/sample/orders
- Sample Fulfillment Requests: /orders/customers/sample/fulfill
- Sample Processed Order Events: /fulfillment/fulfillment/sample/process
- Sample Shipped Order Events: /fulfillment/fulfillment/sample/ship
- Sample In-Transit Order Events: /fulfillment/fulfillment/sample/in-transit
- Sample Received Order Events: /fulfillment/fulfillment/sample/receive
You can create data on your own by POSTing to the exposed CRUD endpoints on each service. However, given the complex data objects required in the request payloads, it is too time-consuming for this demo.
To execute the script, use a python3 ./refresh.py
command. I am using Python 3 in the demo, but the script should also work with Python 2.x if you change shebang.
If everything was successful, the script returns one document from each of the three storefront service’s MongoDB database collections. A result of ‘None’ for any of the MongoDB documents usually indicates one of the earlier commands failed. Given an abnormally high response latency, due to the load of the ten running containers on my laptop, I had to increase the Zuul/Ribbon timeouts.
Observing the System
We should now have the online storefront Docker stack running, three MongoDB databases created and populated with sample documents (data), and three Kafka topics, which have messages in them. Based on the fact we saw database documents printed out with our refresh script, we know the topics were used to pass data between the message producing and message consuming services.
In most enterprise environments, a developer may not have the access, nor the operational knowledge to interact with Kafka or MongoDB from within a container, on the command line. So how else can we interact with the system?
Kafka Manager
Kafka Manager gives us the ability to interact with Kafka via a convenient browser-based user interface. For this demo, the Kafka Manager UI is available on the default port 9000.
To make Kafka Manager useful, define the Kafka cluster. The Cluster Name is up to you. The Cluster Zookeeper Host should be zookeeper:2181
, for our demo.
Kafka Manager gives us useful insights into many aspects of our simple, single-broker cluster. You should observe three topics, created during the deployment of Kafka.
Kafka Manager is an appealing alternative, as opposed to connecting with the Kafka container, with a docker exec command, to interact with Kafka. A typical use case might be deleting a topic or adding partitions to a topic. We can also see which Consumers are consuming which topics, from within Kafka Manager.
Mongo Express
Similar to Kafka Manager, Mongo Express gives us the ability to interact with Kafka via a user interface. For this demo, the Mongo Express browser-based user interface is available on the default port 8081. The initial view displays each of the existing databases. Note our three service’s databases, including accounts
, orders
, and fulfillment
.
Drilling into an individual database, we can view each of the database’s collections. Digging in further, we can interact with individual database collection documents.
We may even edit and save the documents.
SpringFox and Swagger
Each of the storefront services also implements SpringFox, the automated JSON API documentation for API’s built with Spring. With SpringFox, each service exposes a rich Swagger UI. The Swagger UI allows us to interact with service endpoints.
Since each service exposes its own Swagger interface, we must access them through the Zuul API Gateway on port 8080. In our demo environment, the Swagger browser-based user interface is accessible at /swagger-ui.html. Below is a fully self-documented Orders service API, as seen through the Swagger UI.
I believe there are still some incompatibilities with the latest SpringFox release and Spring Boot 2, which prevents Swagger from showing the default Spring Data REST CRUD endpoints. Currently, you only see the API endpoints you explicitly declare in your Controller classes.
The service’s data models (POJOs) are also exposed through the Swagger UI by default. Below we see the Orders service’s models.
The Swagger UI allows you to drill down into the complex structure of the models, such as the CustomerOrder
entity, exposing each of the entity’s nested data objects.
Spring Cloud Netflix Eureka
This post does not cover the use of Eureka or Zuul. Eureka gives us further valuable insight into our storefront system. Eureka is our systems service registry and provides load-balancing for our services if we have multiple instances.
For this demo, the Eureka browser-based user interface is available on the default port 8761. Within the Eureka user interface, we should observe the three storefront services and Zuul, the API Gateway, registered with Eureka. If we had more than one instance of each service, we would see all of them listed here.
Although of limited use in a local environment, we can observe some general information about our host.
Interacting with the Services
The three storefront services are fully functional Spring Boot / Spring Data REST / Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. Additionally, each service includes Spring Boot Actuator. Actuator exposes additional operational endpoints, allowing us to observe the running services. Again, this post is not intended to be a demonstration of Spring Boot or Spring Boot Actuator.
Using an application such as Postman, we can interact with our service’s RESTful HTTP endpoints. As shown below, we are calling the Account service’s customers
resource. The Accounts request is proxied through the Zuul API Gateway.
The above Postman Storefront Collection and Postman Environment are both exported and saved with the project.
Some key endpoints to observe the entities that were created using Event-Carried State Transfer are shown below. They assume you are using localhost
as a base URL.
- Zuul Registered Routes: /actuator/routes
- Accounts Service Customers: /accounts/customers
- Orders Service Customer Orders: /orders/customerOrderses
- Fulfillment Service Fulfillments: /fulfillment/fulfillments
References
Links to my GitHub projects for this post
- storefront-kafka-docker
- storefront-zuul-proxy
- storefront-eureka-server
- storefront-demo-accounts
- storefront-demo-orders
- storefront-demo-fulfillment
Some additional references I found useful while authoring this post and the online storefront code:
- Wurstmeister’s kafka-docker GitHub README
- Spring for Apache Kafka Reference Documentation
- Baeldung’s Intro to Apache Kafka with Spring
- CodeNotFound.com’s Spring Kafka – Consumer Producer Example
- MemoryNotFound’s Spring Kafka – Consumer and Producer Example
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.