Posts Tagged Messaging
IoT Telemetry Collection using Google Protocol Buffers, Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas
Posted by Gary A. Stafford in Big Data, Cloud, GCP, Python, Serverless, Software Development on May 21, 2019
Collect IoT sensor telemetry using Google Protocol Buffers’ serialized binary format over HTTPS, serverless Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas on GCP, as an alternative to integrated Cloud IoT platforms and standard IoT protocols. Aggregate, analyze, and build machine learning models with the data using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.
Introduction
Most of the dominant Cloud providers offer IoT (Internet of Things) and IIotT (Industrial IoT) integrated services. Amazon has AWS IoT, Microsoft Azure has multiple offering including IoT Central, IBM’s offering including IBM Watson IoT Platform, Alibaba Cloud has multiple IoT/IIoT solutions for different vertical markets, and Google offers Google Cloud IoT platform. All of these solutions are marketed as industrial-grade, highly-performant, scalable technology stacks. They are capable of scaling to tens-of-thousands of IoT devices or more and massive amounts of streaming telemetry.
In reality, not everyone needs a fully integrated IoT solution. Academic institutions, research labs, tech start-ups, and many commercial enterprises want to leverage the Cloud for IoT applications, but may not be ready for a fully-integrated IoT platform or are resistant to Cloud vendor platform lock-in.
Similarly, depending on the performance requirements and the type of application, organizations may not need or want to start out using IoT/IIOT industry standard data and transport protocols, such as MQTT (Message Queue Telemetry Transport) or CoAP (Constrained Application Protocol), over UDP (User Datagram Protocol). They may prefer to transmit telemetry over HTTP using TCP, or securely, using HTTPS (HTTP over TLS).
Demonstration
In this demonstration, we will collect environmental sensor data from a number of IoT device sensors and stream that telemetry over the Internet to Google Cloud. Each IoT device is installed in a different physical location. The devices contain a variety of common sensors, including humidity and temperature, motion, and light intensity.

Prototype IoT Devices used in this Demonstration
We will transmit the sensor telemetry data as JSON over HTTP to serverless Google Cloud Function HTTPS endpoints. We will then switch to using Google’s Protocol Buffers to transmit binary data over HTTP. We should observe a reduction in the message size contained in the request payload as we move from JSON to Protobuf, which should reduce system latency and cost.
Data received by Cloud Functions over HTTP will be published asynchronously to Google Cloud Pub/Sub. A second Cloud Function will respond to all published events and push the messages to MongoDB Atlas on GCP. Once in Atlas, we will aggregate, transform, analyze, and build machine learning models with the data, using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.
For this demonstration, the architecture for JSON over HTTP will look as follows. All sensors will transmit data to a single Cloud Function HTTPS endpoint.
For Protobuf over HTTP, the architecture will look as follows in the demonstration. Each type of sensor will transmit data to a different Cloud Function HTTPS endpoint.
Although the Cloud Functions will automatically scale horizontally to accommodate additional load created by the volume of telemetry being received, there are also other options to scale the system. For example, we could create individual pipelines of functions and topic/subscriptions for each sensor type. We could also split the telemetry data across multiple MongoDB Atlas Collections, based on sensor type, instead of a single collection. In all cases, we will still benefit from the Cloud Function’s horizontal scaling capabilities.
Source Code
All source code is all available on GitHub. Use the following command to clone the project.
git clone \ --branch master --single-branch --depth 1 --no-tags \ https://github.com/garystafford/iot-protobuf-demo.git
You will need to adjust the project’s environment variables to fit your own development and Cloud environments. All source code for this post is written in Python. It is intended for Python 3 interpreters but has been tested using Python 2 interpreters. The project’s Jupyter Notebooks can be viewed from within the project on GitHub or using the free, online Jupyter nbviewer.
Technologies
Protocol Buffers
According to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.
Each protocol buffer message is a small logical record of information, containing a series of strongly-typed name-value pairs. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto
file to generate data access classes.
Google Cloud Functions
According to Google, Cloud Functions is Google’s event-driven, serverless compute platform. Key features of Cloud Functions include automatic scaling, high-availability, fault-tolerance,
no servers to provision, manage, patch or update, only
pay while your code runs, and they easily connect and extend other cloud services. Cloud Functions natively support multiple event-types, including HTTP, Cloud Pub/Sub, Cloud Storage, and Firebase. Current language support includes Python, Go, and Node.
Google Cloud Pub/Sub
According to Google, Cloud Pub/Sub is an enterprise message-oriented middleware for the Cloud. It is a scalable, durable event ingestion and delivery system. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independent applications. Cloud Pub/Sub delivers low-latency, durable messaging that integrates with systems hosted on the Google Cloud Platform and externally.
MongoDB Atlas
MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime service-level agreements, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.
MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, free M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.
Cost Effectiveness of Cloud Functions
At true IIoT scale, Google Cloud Functions may not be the most efficient or cost-effective method of ingesting telemetry data. Based on Google’s pricing model, you get two million free function invocations per month, with each additional million invocations costing USD $0.40. The total cost also includes memory usage, total compute time, and outbound data transfer. If your system is comprised of tens or hundreds of IoT devices, Cloud Functions may prove cost-effective.
However, with thousands of devices or more, each transmitting data multiple times per minutes, you could quickly outgrow the cost-effectiveness of Google Functions. In that case, you might look to Google’s Google Cloud IoT platform. Alternately, you can build your own platform with Google products such as Knative, letting you choose to run your containers either fully managed with the newly-released Cloud Run, or in your Google Kubernetes Engine cluster with Cloud Run on GKE.
Sensor Scripts
For each sensor type, I have developed separate Python scripts, which run on each IoT device. There are two versions of each script, one for JSON over HTTP and one for Protobuf over HTTP.
JSON over HTTPS
Below we see the script, dht_sensor_http_json.py, used to transmit humidity and temperature data via JSON over HTTP to a Google Cloud Function running on GCP. The JSON request payload contains a timestamp, IoT device ID, device type, and the temperature and humidity sensor readings. The URL for the Google Cloud Function is stored as an environment variable, local to the IoT devices, and set when the script is deployed.
import json import logging import os import socket import sys import time import Adafruit_DHT import requests URL = os.environ.get('GCF_URL') JWT = os.environ.get('JWT') SENSOR = Adafruit_DHT.DHT22 TYPE = 'DHT22' PIN = 18 FREQUENCY = 15 def main(): if not URL or not JWT: sys.exit("Are the Environment Variables set?") get_sensor_data(socket.gethostname()) def get_sensor_data(device_id): while True: humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN) payload = {'device': device_id, 'type': TYPE, 'timestamp': time.time(), 'data': {'temperature': temperature, 'humidity': humidity}} post_data(payload) time.sleep(FREQUENCY) def post_data(payload): payload = json.dumps(payload) headers = { 'Content-Type': 'application/json; charset=utf-8', 'Authorization': JWT } try: requests.post(URL, json=payload, headers=headers) except requests.exceptions.ConnectionError: logging.error('Error posting data to Cloud Function!') except requests.exceptions.MissingSchema: logging.error('Error posting data to Cloud Function! Are Environment Variables set?') if __name__ == '__main__': sys.exit(main())
Telemetry Frequency
Although the sensors are capable of producing data many times per minute, for this demonstration, sensor telemetry is intentionally limited to only being transmitted every 15 seconds. To reduce system complexity, potential latency, back-pressure, and cost, in my opinion, you should only produce telemetry data at the frequency your requirements dictate.
JSON Web Tokens
For security, in addition to the HTTPS endpoints exposed by the Google Cloud Functions, I have incorporated the use of a JSON Web Token (JWT). JSON Web Tokens are an open, industry standard RFC 7519 method for representing claims securely between two parties. In this case, the JWT is used to verify the identity of the sensor scripts sending telemetry to the Cloud Functions. The JWT contains an id, password, and expiration, all encrypted with a secret key, which is known to each Cloud Function, in order to verify the IoT device’s identity. Without the correct JWT being passed in the Authorization header, the request to the Cloud Function will fail with an HTTP status code of 401 Unauthorized. Below is an example of the JWT’s payload data.
{ "sub": "IoT Protobuf Serverless Demo", "id": "iot-demo-key", "password": "t7J2gaQHCFcxMD6584XEpXyzWhZwRrNJ", "iat": 1557407124, "exp": 1564664724 }
For this demonstration, I created a temporary JWT using jwt.io. The HTTP Functions are using PyJWT
, a Python library which allows you to encode and decode the JWT. The PyJWT library allows the Function to decode and validate the JWT (Bearer Token) from the incoming request’s Authorization header. The JWT token is stored as an environment variable. Deployment instructions are included in the GitHub project.
JSON Payload
Below is a typical JSON request payload (pretty-printed), containing DHT sensor data. This particular message is 148 bytes in size. The message format is intentionally reader-friendly. We could certainly shorten the message’s key fields, to reduce the payload size by an additional 15-20 bytes.
{ "device": "rp829c7e0e", "type": "DHT22", "timestamp": 1557585090.476025, "data": { "temperature": 17.100000381469727, "humidity": 68.0999984741211 } }
Protocol Buffers
For the demonstration, I have built a Protocol Buffers file, sensors.proto
, to support the data output by three sensor types: digital humidity and temperature (DHT), passive infrared sensor (PIR), and digital light intensity (DLI). I am using the newer proto3
version of the protocol buffers language. I have created a common Protobuf sensor message schema, with the variable sensor telemetry stored in the nested data
object, within each message type.
It is important to use the correct Protobuf Scalar Value Type to maintain numeric precision in the language you compile for. For simplicity, I am using a double
to represent the timestamp, as well as the numeric humidity and temperature readings. Alternately, you could choose Google’s Protobuf WellKnownTypes
, Timestamp to store timestamp.
syntax = "proto3"; package sensors; // DHT22 message SensorDHT { string device = 1; string type = 2; double timestamp = 3; DataDHT data = 4; } message DataDHT { double temperature = 1; double humidity = 2; } // Onyehn_PIR message SensorPIR { string device = 1; string type = 2; double timestamp = 3; DataPIR data = 4; } message DataPIR { bool motion = 1; } // Anmbest_MD46N message SensorDLI { string device = 1; string type = 2; double timestamp = 3; DataDLI data = 4; } message DataDLI { bool light = 1; }
Since the sensor data will be captured with scripts written in Python 3, the Protocol Buffers file is compiled for Python, resulting in the file, sensors_pb2.py
.
protoc --python_out=. sensors.proto
Protocol Buffers over HTTPS
Below we see the alternate DHT sensor script, dht_sensor_http_pb.py, which transmits a Protocol Buffers-based binary request payload over HTTPS to a Google Cloud Function running on GCP. Note the request’s Content-Type
header has been changed from application/json
to application/x-protobuf
. In this case, instead of JSON, the same data fields are stored in an instance of the Protobuf’s SensorDHT
message type (sensors_pb2.SensorDHT()
). Note the import sensors_pb2
statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device.
import logging import os import socket import sys import time import Adafruit_DHT import requests import sensors_pb2 URL = os.environ.get('GCF_DHT_URL') JWT = os.environ.get('JWT') SENSOR = Adafruit_DHT.DHT22 TYPE = 'DHT22' PIN = 18 FREQUENCY = 15 def main(): if not URL or not JWT: sys.exit("Are the Environment Variables set?") get_sensor_data(socket.gethostname()) def get_sensor_data(device_id): while True: try: humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN) sensor_dht = sensors_pb2.SensorDHT() sensor_dht.device = device_id sensor_dht.type = TYPE sensor_dht.timestamp = time.time() sensor_dht.data.temperature = temperature sensor_dht.data.humidity = humidity payload = sensor_dht.SerializeToString() post_data(payload) time.sleep(FREQUENCY) except TypeError: logging.error('Error getting sensor data!') def post_data(payload): headers = { 'Content-Type': 'application/x-protobuf', 'Authorization': JWT } try: requests.post(URL, data=payload, headers=headers) except requests.exceptions.ConnectionError: logging.error('Error posting data to Cloud Function!') except requests.exceptions.MissingSchema: logging.error('Error posting data to Cloud Function! Are Environment Variables set?') if __name__ == '__main__': sys.exit(main())
Protobuf Binary Payload
To understand the binary Protocol Buffers-based payload, we can write a sample SensorDHT
message to a file on disk as a byte array.
message = sensorDHT.SerializeToString() binary_file_output = open("./data_binary.txt", "wb") file_byte_array = bytearray(message) binary_file_output.write(file_byte_array)
Then, using the hexdump
command, we can view a representation of the binary data file.
> hexdump -C data_binary.txt 00000000 0a 08 38 32 39 63 37 65 30 65 12 05 44 48 54 32 |..829c7e0e..DHT2| 00000010 32 1d 05 a0 b9 4e 22 0a 0d ec 51 b2 41 15 cd cc |2....N"...Q.A...| 00000020 38 42 |8B| 00000022
The binary data file size is 48 bytes on disk, as compared to the equivalent JSON file size of 148 bytes on disk (32% the size). As a test, we could then send that binary data file as the payload of a POST to the Cloud Function, as shown below using Postman. Postman will serialize the binary data file’s contents to a binary string before transmitting.
Similarly, we can serialize the same binary Protocol Buffers-based SensorDHT
message to a binary string using the SerializeToString
method.
message = sensorDHT.SerializeToString() print(message)
The resulting binary string resembles the following.
b'\n\nrp829c7e0e\x12\x05DHT22\x19c\xee\xbcg\xf5\x8e\xccA"\x12\t\x00\x00\x00\xa0\x99\x191@\x11\x00\x00\x00`f\x06Q@'
The binary string length of the serialized message, and therefore the request payload sent by Postman and received by the Cloud Function for this particular message, is 111 bytes, as compared to the JSON payload size of 148 bytes (75% the size).
Validate Protobuf Payload
To validate the data contained in the Protobuf payload is identical to the JSON payload, we can parse the payload from the serialized binary string using the Protobuf ParseFromString
method. We then convert it to JSON using the Protobuf MessageToJson
method.
message = sensorDHT.SerializeToString() message_parsed = sensors_pb2.SensorDHT() message_parsed.ParseFromString(message) print(MessageToJson(message_parsed))
The resulting JSON object is identical to the JSON payload sent using JSON over HTTPS, earlier in the demonstration.
{ "device": "rp829c7e0e", "type": "DHT22", "timestamp": 1557585090.476025, "data": { "temperature": 17.100000381469727, "humidity": 68.0999984741211 } }
Google Cloud Functions
There are a series of Google Cloud Functions, specifically four HTTP Functions, which accept the sensor data over HTTP from the IoT devices. Each function exposes an HTTPS endpoint. According to Google, you use HTTP functions when you want to invoke your function via an HTTP(S) request. To allow for HTTP semantics, HTTP function signatures accept HTTP-specific arguments.
Below, I have deployed a single function that accepts JSON sensor telemetry from all sensor types, and three functions for Protobuf, one for each sensor type: DHT, PIR, and DLI.
JSON Message Processing
Below, we see the Cloud Function, main.py, which processes the incoming JSON over HTTPS payload from all sensor types. Once the request’s JWT is validated, the JSON message payload is serialized to a byte string and sent to a common Google Cloud Pub/Sub Topic. Note the JWT secret key, id, and password, and the Google Cloud Pub/Sub Topic are all stored as environment variables, local to the Cloud Functions. In my tests, the JSON-based HTTP Functions took an average of 9–18 ms to execute successfully.
import logging import os import jwt from flask import make_response, jsonify from flask_api import status from google.cloud import pubsub_v1 TOPIC = os.environ.get('TOPIC') SECRET_KEY = os.getenv('SECRET_KEY') ID = os.getenv('ID') PASSWORD = os.getenv('PASSWORD') def incoming_message(request): if not validate_token(request): return make_response(jsonify({'success': False}), status.HTTP_401_UNAUTHORIZED, {'ContentType': 'application/json'}) request_json = request.get_json() if not request_json: return make_response(jsonify({'success': False}), status.HTTP_400_BAD_REQUEST, {'ContentType': 'application/json'}) send_message(request_json) return make_response(jsonify({'success': True}), status.HTTP_201_CREATED, {'ContentType': 'application/json'}) def validate_token(request): auth_header = request.headers.get('Authorization') if not auth_header: return False auth_token = auth_header.split(" ")[1] if not auth_token: return False try: payload = jwt.decode(auth_token, SECRET_KEY) if payload['id'] == ID and payload['password'] == PASSWORD: return True except jwt.ExpiredSignatureError: return False except jwt.InvalidTokenError: return False def send_message(message): publisher = pubsub_v1.PublisherClient() publisher.publish(topic=TOPIC, data=bytes(str(message), 'utf-8'))
The Cloud Functions are deployed to GCP using the gcloud functions deploy
CLI command (I use Jenkins to automate the deployments). I have wrapped the deploy commands into bash scripts. The script also copies over a common environment variables YAML file, consumed by the Cloud Function. Each Function has a deployment script, included in the project.
# get latest env vars file cp -f ./../env_vars_file/env.yaml . # deploy function gcloud functions deploy http_json_to_pubsub \ --runtime python37 \ --trigger-http \ --region us-central1 \ --memory 256 \ --entry-point incoming_message \ --env-vars-file env.yaml
Using a .gcloudignore
file, the gcloud functions deploy
CLI command deploys three files: the cloud function (main.py
), required Python packages file (requirements.txt
), the environment variables file (env.yaml
). Google automatically installs dependencies using the requirements.txt
file.
Protobuf Message Processing
Below, we see the Cloud Function, main.py, which processes the incoming Protobuf over HTTPS payload from DHT sensor types. Once the sensor data Protobuf message payload is received by the HTTP Function, it is deserialized to JSON and then serialized to a byte string. The byte string is then sent to a Google Cloud Pub/Sub Topic. In my tests, the Protobuf-based HTTP Functions took an average of 7–14 ms to execute successfully.
As before, note the import sensors_pb2
statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device. It is used to parse a serialized message into its original Protobuf’s SensorDHT
message type.
import logging import os import jwt import sensors_pb2 from flask import make_response, jsonify from flask_api import status from google.cloud import pubsub_v1 from google.protobuf.json_format import MessageToJson TOPIC = os.environ.get('TOPIC') SECRET_KEY = os.getenv('SECRET_KEY') ID = os.getenv('ID') PASSWORD = os.getenv('PASSWORD') def incoming_message(request): if not validate_token(request): return make_response(jsonify({'success': False}), status.HTTP_401_UNAUTHORIZED, {'ContentType': 'application/json'}) data = request.get_data() if not data: return make_response(jsonify({'success': False}), status.HTTP_400_BAD_REQUEST, {'ContentType': 'application/json'}) sensor_pb = sensors_pb2.SensorDHT() sensor_pb.ParseFromString(data) sensor_json = MessageToJson(sensor_pb) send_message(sensor_json) return make_response(jsonify({'success': True}), status.HTTP_201_CREATED, {'ContentType': 'application/json'}) def validate_token(request): auth_header = request.headers.get('Authorization') if not auth_header: return False auth_token = auth_header.split(" ")[1] if not auth_token: return False try: payload = jwt.decode(auth_token, SECRET_KEY) if payload['id'] == ID and payload['password'] == PASSWORD: return True except jwt.ExpiredSignatureError: return False except jwt.InvalidTokenError: return False def send_message(message): publisher = pubsub_v1.PublisherClient() publisher.publish(topic=TOPIC, data=bytes(message, 'utf-8'))
Cloud Pub/Sub Functions
In addition to HTTP Functions, the demonstration uses a function triggered by Google Cloud Pub/Sub Triggers. According to Google, Cloud Functions can be triggered by messages published to Cloud Pub/Sub Topics in the same GCP project as the function. The function automatically subscribes to the Topic. Below, we see that the function has automatically subscribed to iot-data-demo
Cloud Pub/Sub Topic.
Sending Telemetry to MongoDB Atlas
The common Cloud Function, triggered by messages published to Cloud Pub/Sub, then sends the messages to MongoDB Atlas. There is a minimal amount of cleanup required to re-format the Cloud Pub/Sub messages to BSON (binary JSON). Interestingly, according to bsonspec.org, BSON can be compared to binary interchange formats, like Protocol Buffers. BSON is more schema-less than Protocol Buffers, which can give it an advantage in flexibility but also a slight disadvantage in space efficiency (BSON has overhead for field names within the serialized data).
The function uses the PyMongo to connect to MongoDB Atlas. According to their website, PyMongo is a Python distribution containing tools for working with MongoDB and is the recommended way to work with MongoDB from Python.
import base64 import json import logging import os import pymongo MONGODB_CONN = os.environ.get('MONGODB_CONN') MONGODB_DB = os.environ.get('MONGODB_DB') MONGODB_COL = os.environ.get('MONGODB_COL') def read_message(event, context): message = base64.b64decode(event['data']).decode('utf-8') message = message.replace("'", '"') message = message.replace('True', 'true') message = json.loads(message) client = pymongo.MongoClient(MONGODB_CONN) db = client[MONGODB_DB] col = db[MONGODB_COL] col.insert_one(message)
The function responds to the published events and sends the messages to the MongoDB Atlas cluster, running in the same Region, us-central1, as the Cloud Functions and Pub/Sub Topic. Below, we see the current options available when provisioning an Atlas cluster.
MongoDB Atlas provides a rich, web-based UI for managing and monitoring MongoDB clusters, databases, collections, security, and performance.
Although Cloud Pub/Sub to Atlas function execution times are longer in duration than the HTTP functions, the latency is greatly reduced by locating the Cloud Pub/Sub Topic, Cloud Functions, and MongoDB Atlas cluster into the same GCP Region. Cross-region execution times were as high as 500-600 ms, while same-region execution times averaged 200-225 ms. Selecting a more performant Atlas cluster would likely result in even lower function execution times.
Aggregating Data with MongoDB Compass
MongoDB Compass is a free, convenient, desktop application for interacting with your MongoDB databases. You can view the collected sensor data, review message (document) schema, manage indexes, and build complex MongoDB aggregations.
When performing analytics or machine learning, I primarily use MongoDB Compass to preview the captured telemetry data and build aggregation pipelines. Aggregation operations process data records and returns computed results. This feature saves a ton of time, filtering and preparing data for further analysis, visualization, and machine learning with Jupyter Notebooks.
Aggregation pipelines can be directly exported to Java, Node, C#, and Python 3. The exported aggregation pipeline code can be placed directly into your Python applications and Jupyter Notebooks.
Below, the exported aggregation pipelines code from MongoDB Compass is used to load a resultset directly into a Pandas DataFrame. This particular aggregation returns time-series DHT sensor data from a specific IoT device over a 72-hour period.
DEVICE_1 = 'rp59adf374' pipeline = [ { '$match': { 'type': 'DHT22', 'device': DEVICE_1, 'timestamp': { '$gt': 1557619200, '$lt': 1557792000 } } }, { '$project': { '_id': 0, 'timestamp': 1, 'temperature': '$data.temperature', 'humidity': '$data.humidity' } }, { '$sort': { 'timestamp': 1 } } ] aggResult = iot_data.aggregate(pipeline) df1 = pd.DataFrame(list(aggResult))
MongoDB Atlas Performance
In this demonstration, from Python3-based Jupyter Notebooks, I was able to consistently query a MongoDB Atlas collection of almost 70k documents for resultsets containing 3 days (72 hours) worth of digital temperature and humidity data, roughly 10.2k documents, in an average of 825 ms. That is round trip from my local development laptop to MongoDB Atlas running on GCP, in a different geographic region.
Query times on GCP are much faster, such as when running a Notebook in JupyterLab on Google’s AI Platform, or a PySpark job with Cloud Dataproc, against Atlas. Running the same Jupyter Notebook directly on Google’s AI Platform, the same MongoDB Atlas query took an average of 450 ms versus 825 ms (1.83x faster). This was across two different GCP Regions; same Region times should be even faster.
GCP Observability
There are several choices for observing the system’s Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas. As shown above, the GCP Cloud Functions interface lets you see the individual function executions, execution times, memory usage, and active instances, over varying time intervals.
For a more detailed view of Google Cloud Functions and Google Cloud Pub/Sub, I built two custom dashboards using Stackdriver. According to Google, Stackdriver aggregates metrics, logs, and events from infrastructure, giving developers and operators a rich set of observable signals. I built a custom Stackdriver Cloud Functions dashboard (shown below) and a Cloud Pub/Sub Topics and Subscriptions dashboard.
For functions, I chose to display execution times, memory usage, the number of executions, and network egress, all in a single pane of glass, using four graphs. Below, I am using the 95th percentile average for monitoring. The 95th percentile asserts that 95% of the time, the observed values are below this amount and the remaining 5% of the time, the observed values are above that amount.
Data Analysis using Jupyter Notebooks
According to jupyter.org, the Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The widespread use of Jupyter Notebooks has grown significantly, as Big Data, AI, and ML have all experienced explosive growth.
PyCharm
JetBrains PyCharm, my favorite Python IDE, has direct integrations with Jupyter Notebooks. In fact, PyCharm’s most recent updates to the Professional Edition greatly enhanced those integrations. PyCharm offers round-trip editing in the IDE and the Jupyter Notebook web browser interface. PyCharm allows you to run and debug individual cells within the notebook. PyCharm automatically starts the Jupyter Server and appropriate kernel for the Notebook you have opened. And, one of my favorite features, PyCharm’s variable viewer tracks the current value of a variable, automatically.
Below, we see the example Analytics Notebook, included in the demonstration’s project, displayed in PyCharm 19.1.2 (Professional Edition). To effectively work with Notebooks in PyCharm really requires a full-size monitor. Working on a laptop with PyCharm’s crowded Notebook UI is workable, but certainly not as effective as on a larger monitor.
Jupyter Notebook Server
Below, we see the same Analytics Notebook, shown above in PyCharm, opened in Jupyter Notebook Server’s web-based client interface, running locally on the development workstation. The web browser-based interface also offers a rich set of features for Notebook development.
From within the Notebook, we are able to query the data from MongoDB Atlas, again using PyMongo, and load the resultsets into Panda DataFrames. As an alternative to hard-coded values and environment variables, with Notebooks, I use the python-dotenv Python package. This package allows me to place my environment variables in a common .env
file and reference them from any Notebook. The package has many options for managing environment variables.
We can then analyze the data using a number of common frameworks, including Pandas, Matplotlib, SciPy, PySpark, and NumPy, to name but a few. Below, we see time series data from four different sensors, on the same IoT device. Viewing the data together, we can study the causal effect of one environment variable on another, such as the impact of light on temperature or humidity.
Below, we can use histograms to visualize temperature frequencies for
intervals, over time, for a given device location.
Machine Learning using Jupyter Notebooks
In addition to data analytics, we can use Jupyter Notebooks with tools such as scikit-learn to build machine learning models based on our sensor telemetry. Scikit-learn is a set of machine learning tools in Python, built on NumPy, SciPy, and matplotlib. Below, I have used JupyterLab on Google’s AI Platform and scikit-learn to build several models, based on the sensor data.
Using scikit-learn, we can build models to predict such things as which IoT device generated a specific temperature and humidity reading, or the temperature and humidity, given the time of day, device location, and external environment variables, or discover anomalies in the sensor telemetry.
Scikit-learn makes it easy to construct randomized training and test datasets, to build models, using data from multiple IoT devices, as shown below.
The project includes a Jupyter Notebook that demonstrates how to build several ML models using sensor data. Examples of supervised learning algorithms used to build the classification models in this demonstration include Support Vector Machine (SVM), k-nearest neighbors (k-NN), and Random Forest Classifier.
Having data from multiple sensors, we are able to enrich the ML models by adding additional categorical (discrete) features to our training data. For example, we could look at the effect of light, motion, and time of day on temperature and humidity.
Conclusion
Hopefully, this post has demonstrated how to efficiently collect telemetry data from IoT devices using Google Protocol Buffers over HTTPS, serverless Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas, all on the Google Cloud Platform. Once captured, the telemetry data was easily aggregated and analyzed using common tools, such as MongoDB Compass and Jupyter Notebooks. Further, we used the data and tools to build machine learning models for prediction and anomaly detection.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Image: everythingpossible © 123RF.com
Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 2
Posted by Gary A. Stafford in Enterprise Software Development, Java Development, Software Development on June 18, 2018
** This post has been rewritten and updated in May 2021 **
Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we’ve been exploring one possible solution to this challenge, using Apache Kafka and the model of eventual consistency. In Part One, we examined the online storefront domain, the storefront’s microservices, and the system’s state change event message flows.
Part Two
In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.
Source code for deploying the Dockerized components of the online storefront, shown in this post, is available on GitHub. All Docker Images are available on Docker Hub. I have chosen the wurstmeister/kafka-docker version of Kafka, available on Docker Hub; it has 580+ stars and 10M+ pulls on Docker Hub. This version of Kafka works well, as long as you run it within a Docker Swarm, locally.
Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
Deployment Options
For simplicity, I’ve used Docker’s native Docker Swarm Mode to support the deployed online storefront. Docker requires minimal configuration as opposed to other CaaS platforms. Usually, I would recommend Minikube for local development if the final destination of the storefront were Kubernetes in Production (AKS, EKS, or GKE). Alternatively, if the final destination of the storefront was Red Hat OpenShift in Production, I would recommend Minishift for local development.
Docker Deployment
We will break up our deployment into two parts. First, we will deploy everything except our services. We will allow Kafka, MongoDB, Eureka, and the other components to start up fully. Afterward, we will deploy the three online storefront services. The storefront-kafka-docker
project on Github contains two Docker Compose files, which are divided between the two tasks.
The middleware Docker Compose file (gist).
version: '3.2' | |
services: | |
zuul: | |
image: garystafford/storefront-zuul:latest | |
expose: | |
- "8080" | |
ports: | |
- "8080:8080/tcp" | |
depends_on: | |
- kafka | |
- mongo | |
- eureka | |
hostname: zuul | |
environment: | |
# LOGGING_LEVEL_ROOT: DEBUG | |
RIBBON_READTIMEOUT: 3000 | |
RIBBON_SOCKETTIMEOUT: 3000 | |
ZUUL_HOST_CONNECT_TIMEOUT_MILLIS: 3000 | |
ZUUL_HOST_CONNECT_SOCKET_MILLIS: 3000 | |
networks: | |
- kafka-net | |
eureka: | |
image: garystafford/storefront-eureka:latest | |
expose: | |
- "8761" | |
ports: | |
- "8761:8761/tcp" | |
hostname: eureka | |
networks: | |
- kafka-net | |
mongo: | |
image: mongo:latest | |
command: --smallfiles | |
# expose: | |
# - "27017" | |
ports: | |
- "27017:27017/tcp" | |
hostname: mongo | |
networks: | |
- kafka-net | |
mongo_express: | |
image: mongo-express:latest | |
expose: | |
- "8081" | |
ports: | |
- "8081:8081/tcp" | |
hostname: mongo_express | |
networks: | |
- kafka-net | |
zookeeper: | |
image: wurstmeister/zookeeper:latest | |
ports: | |
- "2181:2181/tcp" | |
hostname: zookeeper | |
networks: | |
- kafka-net | |
kafka: | |
image: wurstmeister/kafka:latest | |
depends_on: | |
- zookeeper | |
# expose: | |
# - "9092" | |
ports: | |
- "9092:9092/tcp" | |
environment: | |
KAFKA_ADVERTISED_HOST_NAME: kafka | |
KAFKA_CREATE_TOPICS: "accounts.customer.change:1:1,fulfillment.order.change:1:1,orders.order.fulfill:1:1" | |
KAFKA_ADVERTISED_PORT: 9092 | |
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 | |
KAFKA_DELETE_TOPIC_ENABLE: "true" | |
volumes: | |
- /var/run/docker.sock:/var/run/docker.sock | |
hostname: kafka | |
networks: | |
- kafka-net | |
kafka_manager: | |
image: hlebalbau/kafka-manager:latest | |
ports: | |
- "9000:9000/tcp" | |
expose: | |
- "9000" | |
depends_on: | |
- kafka | |
environment: | |
ZK_HOSTS: "zookeeper:2181" | |
APPLICATION_SECRET: "random-secret" | |
command: -Dpidfile.path=/dev/null | |
hostname: kafka_manager | |
networks: | |
- kafka-net | |
networks: | |
kafka-net: | |
driver: overlay |
The services Docker Compose file (gist).
version: '3.2' | |
services: | |
accounts: | |
image: garystafford/storefront-accounts:latest | |
depends_on: | |
- kafka | |
- mongo | |
hostname: accounts | |
# environment: | |
# LOGGING_LEVEL_ROOT: DEBUG | |
networks: | |
- kafka-net | |
orders: | |
image: garystafford/storefront-orders:latest | |
depends_on: | |
- kafka | |
- mongo | |
- eureka | |
hostname: orders | |
# environment: | |
# LOGGING_LEVEL_ROOT: DEBUG | |
networks: | |
- kafka-net | |
fulfillment: | |
image: garystafford/storefront-fulfillment:latest | |
depends_on: | |
- kafka | |
- mongo | |
- eureka | |
hostname: fulfillment | |
# environment: | |
# LOGGING_LEVEL_ROOT: DEBUG | |
networks: | |
- kafka-net | |
networks: | |
kafka-net: | |
driver: overlay |
In the storefront-kafka-docker
project, there is a shell script, stack_deploy_local.sh
. This script will execute both Docker Compose files in succession, with a pause in between. You may need to adjust the timing for your own system (gist).
#!/bin/sh | |
# Deploys the storefront Docker stack | |
# usage: sh ./stack_deploy_local.sh | |
set -e | |
docker stack deploy -c docker-compose-middleware.yml storefront | |
echo "Starting the stack: middleware...pausing for 30 seconds..." | |
sleep 30 | |
docker stack deploy -c docker-compose-services.yml storefront | |
echo "Starting the stack: services...pausing for 10 seconds..." | |
sleep 10 | |
docker stack ls | |
docker stack services storefront | |
docker container ls | |
echo "Script completed..." | |
echo "Services may take up to several minutes to start, fully..." |
Start by running docker swarm init
. This command will initialize a Docker Swarm. Next, execute the stack deploy script, using an sh ./stack_deploy_local.sh
command. The script will deploy a new Docker Stack, within the Docker Swarm. The Docker Stack will hold all storefront components, deployed as individual Docker containers. The stack is deployed within its own isolated Docker overlay network, kafka-net
.
Note that we are not using host-based persistent storage for this local development demo. Destroying the Docker stack or the individual Kafka, Zookeeper, or MongoDB Docker containers will result in a loss of data.
Before completion, the stack deploy script runs docker stack ls
command, followed by a docker stack services storefront
command. You should see one stack, named storefront, with ten services. You should also see each of the ten services has 1/1 replicas running, indicating everything has started or is starting correctly, without failure. Failure would be reflected here as a service having 0/1 replicas.
Before completion, the stack deploy script also runs docker container ls
command. You should observe each of the ten running containers (‘services’ in the Docker stack), along with their instance names and ports.
There is also a shell script, stack_delete_local.sh
, which will issue a docker stack rm storefront
command to destroy the stack when you are done.
Using the names of the storefront’s Docker containers, you can check the start-up logs of any of the components, using the docker logs
command.
Testing the Stack
With the storefront stack deployed, we need to confirm that all the components have started correctly and are communicating with each other. To accomplish this, I’ve written a simple Python script, refresh.py
. The refresh script has multiple uses. It deletes any existing storefront service MongoDB databases. It also deletes any existing Kafka topics; I call the Kafka Manager’s API to accomplish this. We have no databases or topics since our stack was just created. However, if you are actively developing your data models, you will likely want to purge the databases and topics regularly (gist).
#!/usr/bin/env python3 | |
# Delete (3) MongoDB databases, (3) Kafka topics, | |
# create sample data by hitting Zuul API Gateway endpoints, | |
# and return MongoDB documents as verification. | |
# usage: python3 ./refresh.py | |
from pprint import pprint | |
from pymongo import MongoClient | |
import requests | |
import time | |
client = MongoClient('mongodb://localhost:27017/') | |
def main(): | |
delete_databases() | |
delete_topics() | |
create_sample_data() | |
get_mongo_doc('accounts', 'customer.accounts') | |
get_mongo_doc('orders', 'customer.orders') | |
get_mongo_doc('fulfillment', 'fulfillment.requests') | |
def delete_databases(): | |
dbs = ['accounts', 'orders', 'fulfillment'] | |
for db in dbs: | |
client.drop_database(db) | |
print('MongoDB dropped: ' + db) | |
dbs = client.database_names() | |
print('Reamining databases:') | |
print(dbs) | |
print('\n') | |
def delete_topics(): | |
# call Kafka Manager API | |
topics = ['accounts.customer.change', | |
'orders.order.fulfill', | |
'fulfillment.order.change'] | |
for topic in topics: | |
kafka_manager_url = 'http://localhost:9000/clusters/dev/topics/delete?t=' + topic | |
r = requests.post(kafka_manager_url, data={'topic': topic}) | |
time.sleep(3) | |
print('Kafka topic deleted: ' + topic) | |
print('\n') | |
def create_sample_data(): | |
sample_urls = [ | |
'http://localhost:8080/accounts/customers/sample', | |
'http://localhost:8080/orders/customers/sample/orders', | |
'http://localhost:8080/orders/customers/sample/fulfill', | |
'http://localhost:8080/fulfillment/fulfillments/sample/process', | |
'http://localhost:8080/fulfillment/fulfillments/sample/ship', | |
'http://localhost:8080/fulfillment/fulfillments/sample/in-transit', | |
'http://localhost:8080/fulfillment/fulfillments/sample/receive'] | |
for sample_url in sample_urls: | |
r = requests.get(sample_url) | |
print(r.text) | |
time.sleep(5) | |
print('\n') | |
def get_mongo_doc(db_name, collection_name): | |
db = client[db_name] | |
collection = db[collection_name] | |
pprint(collection.find_one()) | |
print('\n') | |
if __name__ == "__main__": | |
main() |
Next, the refresh script calls a series of RESTful HTTP endpoints, in a specific order, to create sample data. Our three storefront services each expose different endpoints. Different /sample
endpoints create sample customers, orders, order fulfillment requests, and shipping notifications. The create sample data endpoints include, in order:
- Sample Customer: /accounts/customers/sample
- Sample Orders: /orders/customers/sample/orders
- Sample Fulfillment Requests: /orders/customers/sample/fulfill
- Sample Processed Order Events: /fulfillment/fulfillment/sample/process
- Sample Shipped Order Events: /fulfillment/fulfillment/sample/ship
- Sample In-Transit Order Events: /fulfillment/fulfillment/sample/in-transit
- Sample Received Order Events: /fulfillment/fulfillment/sample/receive
You can create data on your own by POSTing to the exposed CRUD endpoints on each service. However, given the complex data objects required in the request payloads, it is too time-consuming for this demo.
To execute the script, use a python3 ./refresh.py
command. I am using Python 3 in the demo, but the script should also work with Python 2.x if you change shebang.
If everything was successful, the script returns one document from each of the three storefront service’s MongoDB database collections. A result of ‘None’ for any of the MongoDB documents usually indicates one of the earlier commands failed. Given an abnormally high response latency, due to the load of the ten running containers on my laptop, I had to increase the Zuul/Ribbon timeouts.
Observing the System
We should now have the online storefront Docker stack running, three MongoDB databases created and populated with sample documents (data), and three Kafka topics, which have messages in them. Based on the fact we saw database documents printed out with our refresh script, we know the topics were used to pass data between the message producing and message consuming services.
In most enterprise environments, a developer may not have the access, nor the operational knowledge to interact with Kafka or MongoDB from within a container, on the command line. So how else can we interact with the system?
Kafka Manager
Kafka Manager gives us the ability to interact with Kafka via a convenient browser-based user interface. For this demo, the Kafka Manager UI is available on the default port 9000.
To make Kafka Manager useful, define the Kafka cluster. The Cluster Name is up to you. The Cluster Zookeeper Host should be zookeeper:2181
, for our demo.
Kafka Manager gives us useful insights into many aspects of our simple, single-broker cluster. You should observe three topics, created during the deployment of Kafka.
Kafka Manager is an appealing alternative, as opposed to connecting with the Kafka container, with a docker exec command, to interact with Kafka. A typical use case might be deleting a topic or adding partitions to a topic. We can also see which Consumers are consuming which topics, from within Kafka Manager.
Mongo Express
Similar to Kafka Manager, Mongo Express gives us the ability to interact with Kafka via a user interface. For this demo, the Mongo Express browser-based user interface is available on the default port 8081. The initial view displays each of the existing databases. Note our three service’s databases, including accounts
, orders
, and fulfillment
.
Drilling into an individual database, we can view each of the database’s collections. Digging in further, we can interact with individual database collection documents.
We may even edit and save the documents.
SpringFox and Swagger
Each of the storefront services also implements SpringFox, the automated JSON API documentation for API’s built with Spring. With SpringFox, each service exposes a rich Swagger UI. The Swagger UI allows us to interact with service endpoints.
Since each service exposes its own Swagger interface, we must access them through the Zuul API Gateway on port 8080. In our demo environment, the Swagger browser-based user interface is accessible at /swagger-ui.html. Below is a fully self-documented Orders service API, as seen through the Swagger UI.
I believe there are still some incompatibilities with the latest SpringFox release and Spring Boot 2, which prevents Swagger from showing the default Spring Data REST CRUD endpoints. Currently, you only see the API endpoints you explicitly declare in your Controller classes.
The service’s data models (POJOs) are also exposed through the Swagger UI by default. Below we see the Orders service’s models.
The Swagger UI allows you to drill down into the complex structure of the models, such as the CustomerOrder
entity, exposing each of the entity’s nested data objects.
Spring Cloud Netflix Eureka
This post does not cover the use of Eureka or Zuul. Eureka gives us further valuable insight into our storefront system. Eureka is our systems service registry and provides load-balancing for our services if we have multiple instances.
For this demo, the Eureka browser-based user interface is available on the default port 8761. Within the Eureka user interface, we should observe the three storefront services and Zuul, the API Gateway, registered with Eureka. If we had more than one instance of each service, we would see all of them listed here.
Although of limited use in a local environment, we can observe some general information about our host.
Interacting with the Services
The three storefront services are fully functional Spring Boot / Spring Data REST / Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. Additionally, each service includes Spring Boot Actuator. Actuator exposes additional operational endpoints, allowing us to observe the running services. Again, this post is not intended to be a demonstration of Spring Boot or Spring Boot Actuator.
Using an application such as Postman, we can interact with our service’s RESTful HTTP endpoints. As shown below, we are calling the Account service’s customers
resource. The Accounts request is proxied through the Zuul API Gateway.
The above Postman Storefront Collection and Postman Environment are both exported and saved with the project.
Some key endpoints to observe the entities that were created using Event-Carried State Transfer are shown below. They assume you are using localhost
as a base URL.
- Zuul Registered Routes: /actuator/routes
- Accounts Service Customers: /accounts/customers
- Orders Service Customer Orders: /orders/customerOrderses
- Fulfillment Service Fulfillments: /fulfillment/fulfillments
References
Links to my GitHub projects for this post
- storefront-kafka-docker
- storefront-zuul-proxy
- storefront-eureka-server
- storefront-demo-accounts
- storefront-demo-orders
- storefront-demo-fulfillment
Some additional references I found useful while authoring this post and the online storefront code:
- Wurstmeister’s kafka-docker GitHub README
- Spring for Apache Kafka Reference Documentation
- Baeldung’s Intro to Apache Kafka with Spring
- CodeNotFound.com’s Spring Kafka – Consumer Producer Example
- MemoryNotFound’s Spring Kafka – Consumer and Producer Example
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1
Posted by Gary A. Stafford in Enterprise Software Development, Java Development, Software Development on June 17, 2018
** This post has been rewritten and updated in May 2021 **
Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge, using Apache Kafka and the model of eventual consistency.
I previously covered the topic of eventual consistency in a distributed system, using RabbitMQ, in the post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. This post is featured on Pivotal’s RabbitMQ website.
Introduction
To ground the discussion, let’s examine a common example of the online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.
Given this problem domain, we can assume we have the concept of the Customer. Further, the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of a Customer would require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record (SOR) for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program. Fulfillment may maintain a record of all the orders shipped to the customer. Security likely holds the customer’s access credentials and privacy settings.
Below, Customer data objects are shown in yellow. Orange represents logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example.
Distributed Data Consistency
If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts, or even between services within the same contexts, then we must ensure data consistency. Take, for example, a change in a customer’s address. The Accounting context is the system of record for the customer’s addresses. However, to fulfill orders, the Shipping context might also need to maintain the customer’s address. Likewise, the Marketing context, who is responsible for direct-mail advertising, also needs to be aware of the address change, and update its own customer records.
If a piece of shared data is changed, then the party making the change should be responsible for communicating the change, without the expectation of a response. They are stating a fact, not asking a question. Interested parties can choose if, and how, to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, as defined by Martin Fowler, of ThoughtWorks, in his insightful post, What do you mean by “Event-Driven”?. A change to a piece of data can be thought of as a state change event. Coincidentally, Fowler also uses a customer’s address change as an example of Event-Carried State Transfer. The Event-Carried State Transfer Pattern is also detailed by fellow ThoughtWorker and noted Architect, Graham Brooks.
Consistency Strategies
Multiple architectural approaches could be taken to solve for data consistency in a distributed system. For example, you could use a single relational database to persist all data, avoiding the distributed data model altogether. Although I would argue, using a single database just turned your distributed system back into a monolith.
You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.
Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages are persisted in Kafka, the service have the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity.
Storefront Example
In this post, our online storefront’s services will be built using Spring Boot. Thus, we will ensure the uniformity of distributed data by using a Publish/Subscribe model with the Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot service, if appropriate, that state change will trigger an event, which will be shared with other services using Kafka topics.
We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from each one another, while still ensuring the data is exchanged.
Given the use case of placing an order, we will examine the interactions of three services, the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other, in a decoupled manner.
The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service. Kafka Producers may also be Consumers within our domain.
Below is a view of the online storefront, through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you the idea of where Kafka, and Zookeeper, Kafka’s cluster manager, might sit in a typical, highly-available, microservice-based, distributed, application platform.
This post will focus on the storefront’s services, database, and messaging sub-systems.
Storefront Microservices
First, we will explore the functionality of each of the three microservices. We will examine how they share state change events using Kafka. Each storefront service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data REST, Spring Data MongoDB, Spring for Apache Kafka, Spring Cloud Sleuth, SpringFox, Spring Cloud Netflix Eureka, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream is not part of this post.
Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
Accounts Service
The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.
The Customer
class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. A Customer
, represented as a BSON document in the customer.accounts
database collection, looks as follows (gist).
{ | |
"_id": ObjectId("5b189af9a8d05613315b0212"), | |
"name": { | |
"title": "Mr.", | |
"firstName": "John", | |
"middleName": "S.", | |
"lastName": "Doe", | |
"suffix": "Jr." | |
}, | |
"contact": { | |
"primaryPhone": "555-666-7777", | |
"secondaryPhone": "555-444-9898", | |
"email": "john.doe@internet.com" | |
}, | |
"addresses": [{ | |
"type": "BILLING", | |
"description": "My cc billing address", | |
"address1": "123 Oak Street", | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}, | |
{ | |
"type": "SHIPPING", | |
"description": "My home address", | |
"address1": "123 Oak Street", | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
} | |
], | |
"creditCards": [{ | |
"type": "PRIMARY", | |
"description": "VISA", | |
"number": "1234-6789-0000-0000", | |
"expiration": "6/19", | |
"nameOnCard": "John S. Doe" | |
}, | |
{ | |
"type": "ALTERNATE", | |
"description": "Corporate American Express", | |
"number": "9999-8888-7777-6666", | |
"expiration": "3/20", | |
"nameOnCard": "John Doe" | |
} | |
], | |
"_class": "com.storefront.model.Customer" | |
} |
Along with the primary Customer
entity, the Accounts service contains a CustomerChangeEvent
class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent
domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added, or a change is made to an existing customer. The CustomerChangeEvent
object is not an exact duplicate of the Customer
object. For example, the CustomerChangeEvent
object does not share sensitive credit card information with other message Consumers (the CreditCard
data object).
Since the CustomerChangeEvent
domain event object is not persisted in MongoDB, to examine its structure, we can look at its JSON message payload in Kafka. Note the differences in the data structure between the Customer
document in MongoDB and the Kafka CustomerChangeEvent
message payload (gist).
{ | |
"id": "5b189af9a8d05613315b0212", | |
"name": { | |
"title": "Mr.", | |
"firstName": "John", | |
"middleName": "S.", | |
"lastName": "Doe", | |
"suffix": "Jr." | |
}, | |
"contact": { | |
"primaryPhone": "555-666-7777", | |
"secondaryPhone": "555-444-9898", | |
"email": "john.doe@internet.com" | |
}, | |
"addresses": [{ | |
"type": "BILLING", | |
"description": "My cc billing address", | |
"address1": "123 Oak Street", | |
"address2": null, | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}, { | |
"type": "SHIPPING", | |
"description": "My home address", | |
"address1": "123 Oak Street", | |
"address2": null, | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}] | |
} |
For simplicity, we will assume other services do not make changes to the customer’s name, contact information, or addresses. It is the sole responsibility of the Accounts service.
Source code for the Accounts service is available on GitHub.
Orders Service
The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.
The CustomerOrders
class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order
data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders
, represented as a BSON document in the customer.orders
database collection, looks as follows (gist).
{ | |
"_id": ObjectId("5b189af9a8d05613315b0212"), | |
"name": { | |
"title": "Mr.", | |
"firstName": "John", | |
"middleName": "S.", | |
"lastName": "Doe", | |
"suffix": "Jr." | |
}, | |
"contact": { | |
"primaryPhone": "555-666-7777", | |
"secondaryPhone": "555-444-9898", | |
"email": "john.doe@internet.com" | |
}, | |
"addresses": [{ | |
"type": "BILLING", | |
"description": "My cc billing address", | |
"address1": "123 Oak Street", | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}, | |
{ | |
"type": "SHIPPING", | |
"description": "My home address", | |
"address1": "123 Oak Street", | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
} | |
], | |
"orders": [{ | |
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4", | |
"orderStatusEvents": [{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "CREATED" | |
}, | |
{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "APPROVED" | |
}, | |
{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "PROCESSING" | |
}, | |
{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "COMPLETED" | |
} | |
], | |
"orderItems": [{ | |
"product": { | |
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37", | |
"title": "Green Widget", | |
"description": "Gorgeous Green Widget", | |
"price": "11.99" | |
}, | |
"quantity": 2 | |
}, | |
{ | |
"product": { | |
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48", | |
"title": "Red Widget", | |
"description": "Reliable Red Widget", | |
"price": "3.99" | |
}, | |
"quantity": 3 | |
} | |
] | |
}, | |
{ | |
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e", | |
"orderStatusEvents": [{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "CREATED" | |
}, | |
{ | |
"timestamp": NumberLong("1528339278058"), | |
"orderStatusType": "APPROVED" | |
} | |
], | |
"orderItems": [{ | |
"product": { | |
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d", | |
"title": "Yellow Widget", | |
"description": "Amazing Yellow Widget", | |
"price": "5.99" | |
}, | |
"quantity": 1 | |
}] | |
} | |
], | |
"_class": "com.storefront.model.CustomerOrders" | |
} |
Along with the primary CustomerOrders
entity, the Orders service contains the FulfillmentRequestEvent
class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent
domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. TheFulfillmentRequestEvent
object only contains the information it needs to share. In our example, it shares a single Order
, along with the customer’s name, contact information, and shipping address.
Since the FulfillmentRequestEvent
domain event object is not persisted in MongoDB, we can look at it’s JSON message payload in Kafka. Again, note the structural differences between the CustomerOrders
document in MongoDB and the FulfillmentRequestEvent
message payload in Kafka (gist).
{ | |
"timestamp": 1528334218821, | |
"name": { | |
"title": "Mr.", | |
"firstName": "John", | |
"middleName": "S.", | |
"lastName": "Doe", | |
"suffix": "Jr." | |
}, | |
"contact": { | |
"primaryPhone": "555-666-7777", | |
"secondaryPhone": "555-444-9898", | |
"email": "john.doe@internet.com" | |
}, | |
"address": { | |
"type": "SHIPPING", | |
"description": "My home address", | |
"address1": "123 Oak Street", | |
"address2": null, | |
"city": "Sunrise", | |
"state": "CA", | |
"postalCode": "12345-6789" | |
}, | |
"order": { | |
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652", | |
"orderStatusEvents": [{ | |
"timestamp": 1528333926586, | |
"orderStatusType": "CREATED", | |
"note": null | |
}, { | |
"timestamp": 1528333926586, | |
"orderStatusType": "APPROVED", | |
"note": null | |
}], | |
"orderItems": [{ | |
"product": { | |
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37", | |
"title": "Green Widget", | |
"description": "Gorgeous Green Widget", | |
"price": 11.99 | |
}, | |
"quantity": 5 | |
}] | |
} | |
} |
Source code for the Orders service is available on GitHub.
Fulfillment Service
Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.
The Fulfillment service’s primary entity, the Fulfillment
class, is persisted in MongoDB. This entity contains a single Order
data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment
entity to store the latest shipping event, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping addresses are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service, via Kafka, using the FulfillmentRequestEvent
entity.
In the Fulfillment MongoDB database, a Fulfillment
object, represented as a BSON document in the fulfillment.requests
database collection, looks as follows (gist).
{ | |
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"), | |
"timestamp": NumberLong("1528553706260"), | |
"name": { | |
"title": "Ms.", | |
"firstName": "Susan", | |
"lastName": "Blackstone" | |
}, | |
"contact": { | |
"primaryPhone": "433-544-6555", | |
"secondaryPhone": "223-445-6767", | |
"email": "susan.m.blackstone@emailisus.com" | |
}, | |
"address": { | |
"type": "SHIPPING", | |
"description": "Home Sweet Home", | |
"address1": "33 Oak Avenue", | |
"city": "Nowhere", | |
"state": "VT", | |
"postalCode": "444556-9090" | |
}, | |
"order": { | |
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44", | |
"orderStatusEvents": [{ | |
"timestamp": NumberLong("1528558453686"), | |
"orderStatusType": "RECEIVED" | |
}], | |
"orderItems": [{ | |
"product": { | |
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa", | |
"title": "Purple Widget" | |
}, | |
"quantity": 2 | |
}, | |
{ | |
"product": { | |
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897", | |
"title": "Blue Widget" | |
}, | |
"quantity": 5 | |
}, | |
{ | |
"product": { | |
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d", | |
"title": "Yellow Widget" | |
}, | |
"quantity": 2 | |
} | |
] | |
}, | |
"shippingMethod": "Drone", | |
"_class": "com.storefront.model.Fulfillment" | |
} |
Along with the primary Fulfillment
entity, the Fulfillment service has an OrderStatusChangeEvent
class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent
domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent
object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.
Since the OrderStatusChangeEvent
domain event object is not persisted in MongoDB, to examine it, we can again look at it’s JSON message payload in Kafka (gist).
{ | |
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652", | |
"orderStatusEvent": { | |
"timestamp": 1528334452746, | |
"orderStatusType": "PROCESSING", | |
"note": null | |
} | |
} |
Source code for the Fulfillment service is available on GitHub.
State Change Event Messaging Flows
There are three state change event messaging flows demonstrated in this post.
- Change to a Customer triggers an event message by the Accounts service;
- Order Approved triggers an event message by the Orders service;
- Change to the status of an Order triggers an event message by the Fulfillment service;
Each of these state change event messaging flows follow the exact same architectural pattern on both the Producer and Consumer sides of the Kafka topic.
Let’s examine each state change event messaging flow and the code behind them.
Customer State Change
When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent
message is produced and sent to the accounts.customer.change
Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. It can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information, by way of Kafka.
There are different methods to trigger a message to be sent to Kafka, For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity (gist).
@Slf4j | |
@Controller | |
public class AfterSaveListener extends AbstractMongoEventListener<Customer> { | |
@Value("${spring.kafka.topic.accounts-customer}") | |
private String topic; | |
private Sender sender; | |
@Autowired | |
public AfterSaveListener(Sender sender) { | |
this.sender = sender; | |
} | |
@Override | |
public void onAfterSave(AfterSaveEvent<Customer> event) { | |
log.info("onAfterSave event='{}'", event); | |
Customer customer = event.getSource(); | |
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent(); | |
customerChangeEvent.setId(customer.getId()); | |
customerChangeEvent.setName(customer.getName()); | |
customerChangeEvent.setContact(customer.getContact()); | |
customerChangeEvent.setAddresses(customer.getAddresses()); | |
sender.send(topic, customerChangeEvent); | |
} | |
} |
The listener handles the event by instantiating a new CustomerChangeEvent
with the Customer’s information and passes it to the Sender
class (gist).
@Slf4j | |
public class Sender { | |
@Autowired | |
private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate; | |
public void send(String topic, CustomerChangeEvent payload) { | |
log.info("sending payload='{}' to topic='{}'", payload, topic); | |
kafkaTemplate.send(topic, payload); | |
} | |
} |
The configuration of the Sender
is handled by the SenderConfig
class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent
object into a JSON message payload (gist).
@Configuration | |
@EnableKafka | |
public class SenderConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Bean | |
public Map<String, Object> producerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | |
return props; | |
} | |
@Bean | |
public ProducerFactory<String, CustomerChangeEvent> producerFactory() { | |
return new DefaultKafkaProducerFactory<>(producerConfigs()); | |
} | |
@Bean | |
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
@Bean | |
public Sender sender() { | |
return new Sender(); | |
} | |
} |
The Sender
uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.
The Orders service’s Receiver
class consumes the CustomerChangeEvent
messages, produced by the Accounts service (gist).
[gust]cc3c4e55bc291e5435eccdd679d03015[/gist]
The Orders service’s Receiver
class is configured differently, compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig
class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig
references Spring Kafka’s AbstractKafkaListenerContainerFactory
classes setMessageConverter method, which allows for dynamic object type matching (gist).
@Configuration | |
@EnableKafka | |
public class ReceiverConfigNotConfluent implements ReceiverConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.consumer.group-id}") | |
private String groupId; | |
@Value("${spring.kafka.consumer.auto-offset-reset}") | |
private String autoOffsetReset; | |
@Override | |
@Bean | |
public Map<String, Object> consumerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
return props; | |
} | |
@Override | |
@Bean | |
public ConsumerFactory<String, String> consumerFactory() { | |
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), | |
new StringDeserializer(), | |
new StringDeserializer() | |
); | |
} | |
@Bean | |
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, String> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
factory.setMessageConverter(new StringJsonMessageConverter()); | |
return factory; | |
} | |
@Override | |
@Bean | |
public Receiver receiver() { | |
return new Receiver(); | |
} | |
} |
Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver
class (shown above). That method accepts a specific object type as input, denoting the object type the message payload needs to be deserialized into. In this way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent
, the Orders service calls the receiveCustomerOrder
method to consume the message and properly deserialize it.
For all services, a Spring application.yaml
properties file, in each service’s resources
directory, contains the Kafka configuration (gist).
server: | |
port: 8080 | |
spring: | |
main: | |
allow-bean-definition-overriding: true | |
application: | |
name: orders | |
data: | |
mongodb: | |
uri: mongodb://mongo:27017/orders | |
kafka: | |
bootstrap-servers: kafka:9092 | |
topic: | |
accounts-customer: accounts.customer.change | |
orders-order: orders.order.fulfill | |
fulfillment-order: fulfillment.order.change | |
consumer: | |
group-id: orders | |
auto-offset-reset: earliest | |
zipkin: | |
sender: | |
type: kafka | |
management: | |
endpoints: | |
web: | |
exposure: | |
include: '*' | |
logging: | |
level: | |
root: INFO | |
--- | |
spring: | |
config: | |
activate: | |
on-profile: local | |
data: | |
mongodb: | |
uri: mongodb://localhost:27017/orders | |
kafka: | |
bootstrap-servers: localhost:9092 | |
server: | |
port: 8090 | |
management: | |
endpoints: | |
web: | |
exposure: | |
include: '*' | |
logging: | |
level: | |
root: DEBUG | |
--- | |
spring: | |
config: | |
activate: | |
on-profile: confluent | |
server: | |
port: 8080 | |
logging: | |
level: | |
root: INFO | |
--- | |
server: | |
port: 8080 | |
spring: | |
config: | |
activate: | |
on-profile: minikube | |
data: | |
mongodb: | |
uri: mongodb://mongo.dev:27017/orders | |
kafka: | |
bootstrap-servers: kafka-cluster.dev:9092 | |
management: | |
endpoints: | |
web: | |
exposure: | |
include: '*' | |
logging: | |
level: | |
root: DEBUG |
Order Approved for Fulfillment
When the status of the Order
in a CustomerOrders
entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent
message is produced and sent to the accounts.customer.change
Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.
Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent
is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent
is passed to the Sender
class (gist).
@Slf4j | |
public class Sender { | |
@Autowired | |
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate; | |
public void send(String topic, FulfillmentRequestEvent payload) { | |
log.info("sending payload='{}' to topic='{}'", payload, topic); | |
kafkaTemplate.send(topic, payload); | |
} | |
} |
The configuration of the Sender
class is handled by the SenderConfig
class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent
object into a JSON message payload (gist).
@Configuration | |
@EnableKafka | |
public class SenderConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Bean | |
public Map<String, Object> producerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | |
return props; | |
} | |
@Bean | |
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() { | |
return new DefaultKafkaProducerFactory<>(producerConfigs()); | |
} | |
@Bean | |
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
@Bean | |
public Sender sender() { | |
return new Sender(); | |
} | |
} |
The Sender
class uses a KafkaTemplate
to send the message to the Kafka topic, as shown below. Since message order is not critical messages could be sent to a topic with multiple partitions if the volume of messages required it.
The Fulfillment service’s Receiver
class consumes the FulfillmentRequestEvent
from the Kafka topic and instantiates a Fulfillment
object, containing the data passed in the FulfillmentRequestEvent
message payload. This includes the order to be fulfilled and the customer’s contact and shipping information (gist).
@Slf4j | |
@Component | |
public class Receiver { | |
@Autowired | |
private FulfillmentRepository fulfillmentRepository; | |
private CountDownLatch latch = new CountDownLatch(1); | |
public CountDownLatch getLatch() { | |
return latch; | |
} | |
@KafkaListener(topics = "${spring.kafka.topic.orders-order}") | |
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) { | |
log.info("received payload='{}'", fulfillmentRequestEvent.toString()); | |
latch.countDown(); | |
Fulfillment fulfillment = new Fulfillment(); | |
fulfillment.setId(fulfillmentRequestEvent.getId()); | |
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp()); | |
fulfillment.setName(fulfillmentRequestEvent.getName()); | |
fulfillment.setContact(fulfillmentRequestEvent.getContact()); | |
fulfillment.setAddress(fulfillmentRequestEvent.getAddress()); | |
fulfillment.setOrder(fulfillmentRequestEvent.getOrder()); | |
fulfillmentRepository.save(fulfillment); | |
} | |
} |
The Fulfillment service’s ReceiverConfig
class defines the DefaultKafkaConsumerFactory
and ConcurrentKafkaListenerContainerFactory
, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent
object (gist).
@Configuration | |
@EnableKafka | |
public class ReceiverConfigNotConfluent implements ReceiverConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.consumer.group-id}") | |
private String groupId; | |
@Value("${spring.kafka.consumer.auto-offset-reset}") | |
private String autoOffsetReset; | |
@Override | |
@Bean | |
public Map<String, Object> consumerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
return props; | |
} | |
@Override | |
@Bean | |
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() { | |
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), | |
new StringDeserializer(), | |
new JsonDeserializer<>(FulfillmentRequestEvent.class)); | |
} | |
@Override | |
@Bean | |
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
return factory; | |
} | |
@Override | |
@Bean | |
public Receiver receiver() { | |
return new Receiver(); | |
} | |
} |
Fulfillment Order Status State Change
When the status of the Order in a Fulfillment entity is changed to anything other than ‘Approved’, an OrderStatusChangeEvent
message is produced by the Fulfillment service and sent to the fulfillment.order.change
Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder
lifecycle events from the initial ‘Created’ status to the final happy path ‘Received’ status.
The Fulfillment service exposes several endpoints through the FulfillmentController
class, which are simulate a change the status of an order. They allow an order status to be changed from ‘Approved’ to ‘Processing’, to ‘Shipped’, to ‘In Transit’, and to ‘Received’. This change is applied to all orders that meet the criteria.
Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates an Kafka message, containing the OrderStatusChangeEvent
in the message payload. This is handled by the Fulfillment service’s Sender
class.
Note in this example, these two events are not handled in an atomic transaction. Either the updating the database or the sending of the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure both these disparate actions succeed or fail as a single transaction, to ensure data consistency (gist).
@Slf4j | |
public class Sender { | |
@Autowired | |
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate; | |
public void send(String topic, OrderStatusChangeEvent payload) { | |
log.info("sending payload='{}' to topic='{}'", payload, topic); | |
kafkaTemplate.send(topic, payload); | |
} | |
} |
The configuration of the Sender
class is handled by the SenderConfig
class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent
object into a JSON message payload. This class is almost identical to the SenderConfig
class in the Orders and Accounts services (gist).
@Configuration | |
@EnableKafka | |
public class SenderConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Bean | |
public Map<String, Object> producerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | |
return props; | |
} | |
@Bean | |
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() { | |
return new DefaultKafkaProducerFactory<>(producerConfigs()); | |
} | |
@Bean | |
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
@Bean | |
public Sender sender() { | |
return new Sender(); | |
} | |
} |
The Sender
class uses a KafkaTemplate
to send the message to the Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages could be sent to a topic with multiple partitions if the volume of messages required it.
The Orders service’s Receiver
class is responsible for consuming the OrderStatusChangeEvent
message, produced by the Fulfillment service (gist).
@Slf4j | |
@Component | |
public class Receiver { | |
@Autowired | |
private CustomerOrdersRepository customerOrdersRepository; | |
@Autowired | |
private MongoTemplate mongoTemplate; | |
private CountDownLatch latch = new CountDownLatch(1); | |
public CountDownLatch getLatch() { | |
return latch; | |
} | |
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}") | |
public void receiveCustomerOrder(CustomerOrders customerOrders) { | |
log.info("received payload='{}'", customerOrders); | |
latch.countDown(); | |
customerOrdersRepository.save(customerOrders); | |
} | |
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}") | |
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) { | |
log.info("received payload='{}'", orderStatusChangeEvent); | |
latch.countDown(); | |
Criteria criteria = Criteria.where("orders.guid") | |
.is(orderStatusChangeEvent.getGuid()); | |
Query query = Query.query(criteria); | |
Update update = new Update(); | |
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent()); | |
mongoTemplate.updateFirst(query, update, "customer.orders"); | |
} | |
} |
As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service needs to receive messages from more than one topic. The ReceiverConfig
class deserializes all message using the StringDeserializer
. The Orders service’s ReceiverConfig
class references the Spring Kafka AbstractKafkaListenerContainerFactory
classes setMessageConverter method, which allows for dynamic object type matching (gist).
@Configuration | |
@EnableKafka | |
public class ReceiverConfigNotConfluent implements ReceiverConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.consumer.group-id}") | |
private String groupId; | |
@Value("${spring.kafka.consumer.auto-offset-reset}") | |
private String autoOffsetReset; | |
@Override | |
@Bean | |
public Map<String, Object> consumerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
return props; | |
} | |
@Override | |
@Bean | |
public ConsumerFactory<String, String> consumerFactory() { | |
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), | |
new StringDeserializer(), | |
new StringDeserializer() | |
); | |
} | |
@Bean | |
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, String> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
factory.setMessageConverter(new StringJsonMessageConverter()); | |
return factory; | |
} | |
@Override | |
@Bean | |
public Receiver receiver() { | |
return new Receiver(); | |
} | |
} |
Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver
class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent
message, the receiveOrderStatusChangeEvents
method is called to consume a message from the fulfillment.order.change
Kafka topic.
Part Two
In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Architecting Cloud-Optimized Apps with AKS (Azure’s Managed Kubernetes), Azure Service Bus, and Cosmos DB
Posted by Gary A. Stafford in Azure, Cloud, Java Development, Software Development on December 10, 2017
An earlier post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ, demonstrated the use of a message-based, event-driven, decoupled architectural approach for communications between microservices, using Spring AMQP and RabbitMQ. This distributed computing model is known as eventual consistency. To paraphrase microservices.io, ‘using an event-driven, eventually consistent approach, each service publishes an event whenever it updates its data. Other services subscribe to events. When an event is received, a service (subscriber) updates its data.’
That earlier post illustrated a fairly simple example application, the Voter API, consisting of a set of three Spring Boot microservices backed by MongoDB and RabbitMQ, and fronted by an API Gateway built with HAProxy. All API components were containerized using Docker and designed for use with Docker CE for AWS as the Container-as-a-Service (CaaS) platform.
Optimizing for Kubernetes on Azure
This post will demonstrate how a modern application, such as the Voter API, is optimized for Kubernetes in the Cloud (Kubernetes-as-a-Service), in this case, AKS, Azure’s new public preview of Managed Kubernetes for Azure Container Service. According to Microsoft, the goal of AKS is to simplify the deployment, management, and operations of Kubernetes. I wrote about AKS in detail, in my last post, First Impressions of AKS, Azure’s New Managed Kubernetes Container Service.
In addition to migrating to AKS, the Voter API will take advantage of additional enterprise-grade Azure’s resources, including Azure’s Service Bus and Cosmos DB, replacements for the Voter API’s RabbitMQ and MongoDB. There are several architectural options for the Voter API’s messaging and NoSQL data source requirements when moving to Azure.
- Keep Dockerized RabbitMQ and MongoDB – Easy to deploy to Kubernetes, but not easily scalable, highly-available, or manageable. Would require storage optimized Azure VMs for nodes, node affinity, and persistent storage for data.
- Replace with Cloud-based Non-Azure Equivalents – Use SaaS-based equivalents, such as CloudAMQP (RabbitMQ-as-a-Service) and MongoDB Atlas, which will provide scalability, high-availability, and manageability.
- Replace with Azure Service Bus and Cosmos DB – Provides all the advantages of SaaS-based equivalents, and additionally as Azure resources, benefits from being in the Azure Cloud alongside AKS.
Source Code
The Kubernetes resource files and deployment scripts used in this post are all available on GitHub. This is the only project you need to clone to reproduce the AKS example in this post.
git clone \ --branch master --single-branch --depth 1 --no-tags \ https://github.com/garystafford/azure-aks-sb-cosmosdb-demo.git
The Docker images for the three Spring microservices deployed to AKS, Voter, Candidate, and Election, are available on Docker Hub. Optionally, the source code, including Dockerfiles, for the Voter, Candidate, and Election microservices, as well as the Voter Client are available on GitHub, in the kub-aks
branch.
git clone \ --branch kub-aks --single-branch --depth 1 --no-tags \ https://github.com/garystafford/candidate-service.git git clone \ --branch kub-aks --single-branch --depth 1 --no-tags \ https://github.com/garystafford/election-service.git git clone \ --branch kub-aks --single-branch --depth 1 --no-tags \ https://github.com/garystafford/voter-service.git git clone \ --branch kub-aks --single-branch --depth 1 --no-tags \ https://github.com/garystafford/voter-client.git
Azure Service Bus
To demonstrate the capabilities of Azure’s Service Bus, the Voter API’s Spring microservice’s source code has been re-written to work with Azure Service Bus instead of RabbitMQ. A future post will explore the microservice’s messaging code. It is more likely that a large application, written specifically for a technology that is easily portable such as RabbitMQ or MongoDB, would likely remain on that technology, even if the application was lifted and shifted to the Cloud or moved between Cloud Service Providers (CSPs). Something important to keep in mind when choosing modern technologies – portability.
Service Bus is Azure’s reliable cloud Messaging-as-a-Service (MaaS). Service Bus is an original Azure resource offering, available for several years. The core components of the Service Bus messaging infrastructure are queues, topics, and subscriptions. According to Microsoft, ‘the primary difference is that topics support publish/subscribe capabilities that can be used for sophisticated content-based routing and delivery logic, including sending to multiple recipients.’
Since the three Voter API’s microservices are not required to produce messages for more than one other service consumer, Service Bus queues are sufficient, as opposed to a pub/sub model using Service Bus topics.
Cosmos DB
Cosmos DB, Microsoft’s globally distributed, multi-model database, offers throughput, latency, availability, and consistency guarantees with comprehensive service level agreements (SLAs). Ideal for the Voter API, Cosmos DB supports MongoDB’s data models through the MongoDB API, a MongoDB database service built on top of Cosmos DB. The MongoDB API is compatible with existing MongoDB libraries, drivers, tools, and applications. Therefore, there are no code changes required to convert the Voter API from MongoDB to Cosmos DB. I simply had to change the database connection string.
NGINX Ingress Controller
Although the Voter API’s HAProxy-based API Gateway could be deployed to AKS, it is not optimal for Kubernetes. Instead, the Voter API will use an NGINX-based Ingress Controller. NGINX will serve as an API Gateway, as HAProxy did, previously.
According to NGINX, ‘an Ingress is a Kubernetes resource that lets you configure an HTTP load balancer for your Kubernetes services. Such a load balancer usually exposes your services to clients outside of your Kubernetes cluster.’
An Ingress resource requires an Ingress Controller to function. Continuing from NGINX, ‘an Ingress Controller is an application that monitors Ingress resources via the Kubernetes API and updates the configuration of a load balancer in case of any changes. Different load balancers require different Ingress controller implementations. In the case of software load balancers, such as NGINX, an Ingress controller is deployed in a pod along with a load balancer.’
There are currently two NGINX-based Ingress Controllers available, one from Kubernetes and one directly from NGINX. Both being equal, for this post, I chose the Kubernetes version, without RBAC (Kubernetes offers a version with and without RBAC). RBAC should always be used for actual cluster security. There are several advantages of using either version of the NGINX Ingress Controller for Kubernetes, including Layer 4 TCP and UDP and Layer 7 HTTP load balancing, reverse proxying, ease of SSL termination, dynamically-configurable path-based rules, and support for multiple hostnames.
Azure Web App
Lastly, the Voter Client application, not really part of the Voter API, but useful for demonstration purposes, will be converted from a containerized application to an Azure Web App. Since it is not part of the Voter API, separating the Client application from AKS makes better architectural sense. Web Apps are a powerful, richly-featured, yet incredibly simple way to host applications and services on Azure. For more information on using Azure Web Apps, read my recent post, Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.
Revised Component Architecture
Below is a simplified component diagram of the new architecture, including Azure Service Bus, Cosmos DB, and the NGINX Ingress Controller. The new architecture looks similar to the previous architecture, but as you will see, it is actually very different.
Process Flow
To understand the role of each API component, let’s look at one of the event-driven, decoupled process flows, the creation of a new election candidate. In the simplified flow diagram below, an API consumer executes an HTTP POST request containing the new candidate object as JSON. The Candidate microservice receives the HTTP request and creates a new document in the Cosmos DB Voter database. A Spring RepositoryEventHandler
within the Candidate microservice responds to the document creation and publishes a Create Candidate event message, containing the new candidate object as JSON, to the Azure Service Bus Candidate Queue.
Independently, the Voter microservice is listening to the Candidate Queue. Whenever a new message is produced by the Candidate microservice, the Voter microservice retrieves the message off the queue. The Voter microservice then transforms the new candidate object contained in the incoming message to its own candidate data model and creates a new document in its own Voter database.
The same process flows exist between the Election and the Candidate microservices. The Candidate microservice maintains current elections in its database, which are retrieved from the Election queue.
Data Models
It is useful to understand, the Candidate microservice’s candidate domain model is not necessarily identical to the Voter microservice’s candidate domain model. Each microservice may choose to maintain its own representation of a vote, a candidate, and an election. The Voter service transforms the new candidate object in the incoming message based on its own needs. In this case, the Voter microservice is only interested in a subset of the total fields in the Candidate microservice’s model. This is the beauty of decoupling microservices, their domain models, and their datastores.
Other Events
The versions of the Voter API microservices used for this post only support Election Created events and Candidate Created events. They do not handle Delete or Update events, which would be necessary to be fully functional. For example, if a candidate withdraws from an election, the Voter service would need to be notified so no one places votes for that candidate. This would normally happen through a Candidate Delete or Candidate Update event.
Provisioning Azure Service Bus
First, the Azure Service Bus is provisioned. Provisioning the Service Bus may be accomplished using several different methods, including manually using the Azure Portal or programmatically using Azure Resource Manager (ARM) with PowerShell or Terraform. I chose to provision the Azure Service Bus and the two queues using the Azure Portal for expediency. I chose the Basic Service Bus Tier of service, of which there are three tiers, Basic, Standard, and Premium.
The application requires two queues, the candidate.queue
, and the election.queue
.
Provisioning Cosmos DB
Next, Cosmos DB is provisioned. Like Azure Service Bus, Cosmos DB may be provisioned using several methods, including manually using the Azure Portal, programmatically using Azure Resource Manager (ARM) with PowerShell or Terraform, or using the Azure CLI, which was my choice.
az cosmosdb create \ --name cosmosdb_instance_name_goes_here \ --resource-group resource_group_name_goes_here \ --location "East US=0" \ --kind MongoDB
The post’s Cosmos DB instance exists within the single East US Region, with no failover. In a real Production environment, you would configure Cosmos DB with multi-region failover. I chose MongoDB as the type of Cosmos DB database account to create. The allowed values are GlobalDocumentDB, MongoDB, Parse. All other settings were left to the default values.
The three Spring microservices each have their own database. You do not have to create the databases in advance of consuming the Voter API. The databases and the database collections will be automatically created when new documents are first inserted by the microservices. Below, the three databases and their collections have been created and populated with documents.
The GitHub project repository also contains three shell scripts to generate sample vote, candidate, and election documents. The scripts will delete any previous documents from the database collections and generate new sets of sample documents. To use, you will have to update the scripts with your own Voter API URL.
MongoDB Aggregation Pipeline
Each of the three Spring microservices uses Spring Data MongoDB, which takes advantage of MongoDB’s Aggregation Framework. According to MongoDB, ‘the aggregation framework is modeled on the concept of data processing pipelines. Documents enter a multi-stage pipeline that transforms the documents into an aggregated result.’ Below is an example of aggregation from the Candidate microservice’s VoterContoller
class.
Aggregation aggregation = Aggregation.newAggregation( match(Criteria.where("election").is(election)), group("candidate").count().as("votes"), project("votes").and("candidate").previousOperation(), sort(Sort.Direction.DESC, "votes") );
To use MongoDB’s aggregation framework with Cosmos DB, it is currently necessary to activate the MongoDB Aggregation Pipeline Preview Feature of Cosmos DB. The feature can be activated from the Azure Portal, as shown below.
Cosmos DB Emulator
Be warned, Cosmos DB can be very expensive, even without database traffic or any Production-grade bells and whistles. Be careful when spinning up instances on Azure for learning purposes, the cost adds up quickly! In less than ten days, while writing this post, my cost was almost US$100 for the Voter API’s Cosmos DB instance.
I strongly recommend downloading the free Azure Cosmos DB Emulator to develop and test applications from your local machine. Although certainly not as convenient, it will save you the cost of developing for Cosmos DB directly on Azure.
With Cosmos DB, you pay for reserved throughput provisioned and data stored in containers (a collection of documents or a table or a graph). Yes, that’s right, Azure charges you per MongoDB collection, not even per database. Azure Cosmos DB’s current pricing model seems less than ideal for microservice architectures, each with their own database instance.
By default the reserved throughput, billed as Request Units (RU) per second or RU/s, is set to 1,000 RU/s per collection. For development and testing, you can reduce each collection to a minimum of 400 RU/s. The Voter API creates five collections at 1,000 RU/s or 5,000 RU/s total. Reducing this to a total of 2,000 RU/s makes Cosmos DB marginally more affordable to explore.
Building the AKS Cluster
An existing Azure Resource Group is required for AKS. I chose to use the latest available version of Kubernetes, 1.8.2.
# login to azure az login \ --username your_username \ --password your_password # create resource group az group create \ --resource-group resource_group_name_goes_here \ --location eastus # create aks cluster az aks create \ --name cluser_name_goes_here \ --resource-group resource_group_name_goes_here \ --ssh-key-value path_to_your_public_key \ --kubernetes-version 1.8.2 # get credentials to access aks cluster az aks get-credentials \ --name cluser_name_goes_here \ --resource-group resource_group_name_goes_here # display cluster's worker nodes kubectl get nodes --output=wide
By default, AKS will provision a three-node Kubernetes cluster using Azure’s Standard D1 v2 Virtual Machines. According to Microsoft, ‘D series VMs are general purpose VM sizes provide balanced CPU-to-memory ratio. They are ideal for testing and development, small to medium databases, and low to medium traffic web servers.’ Azure D1 v2 VM’s are based on Linux OS images, currently Debian 8 (Jessie), with 1 vCPU and 3.5 GB of memory. By default with AKS, each VM receives 30 GiB of Standard HDD attached storage.
You should always select the type and quantity of the cluster’s VMs and their attached storage, optimized for estimated traffic volumes and the specific workloads you are running. This can be done using the --node-count
, --node-vm-size
, and --node-osdisk-size
arguments with the az aks create
command.
Deployment
The Voter API resources are deployed to its own Kubernetes Namespace, voter-api
. The NGINX Ingress Controller resources are deployed to a different namespace, ingress-nginx
. Separate namespaces help organize individual Kubernetes resources and separate different concerns.
Voter API
First, the voter-api
namespace is created. Then, five required Kubernetes Secrets are created within the namespace. These secrets all contain sensitive information, such as passwords, that should not be shared. There is one secret for each of the three Cosmos DB database connection strings, one secret for the Azure Service Bus connection string, and one secret for the Let’s Encrypt SSL/TLS certificate and private key, used for secure HTTPS access to the Voter API.
Secrets
The Voter API’s secrets are used to populate environment variables within the pod’s containers. The environment variables are then available for use within the containers. Below is a snippet of the Voter pods resource file showing how the Cosmos DB and Service Bus connection strings secrets are used to populate environment variables.
env: - name: AZURE_SERVICE_BUS_CONNECTION_STRING valueFrom: secretKeyRef: name: azure-service-bus key: connection-string - name: SPRING_DATA_MONGODB_URI valueFrom: secretKeyRef: name: azure-cosmosdb-voter key: connection-string
Shown below, the Cosmos DB and Service Bus connection strings secrets have been injected into the Voter container and made available as environment variables to the microservice’s executable JAR file on start-up. As environment variables, the secrets are visible in plain text. Access to containers should be tightly controlled through Kubernetes RBAC and Azure AD, to ensure sensitive information, such as secrets, remain confidential.
Next, the three Kubernetes ReplicaSet resources, corresponding to the three Spring microservices, are created using Deployment controllers. According to Kubernetes, a Deployment that configures a ReplicaSet is now the recommended way to set up replication. The Deployments specify three replicas of each of the three Spring Services, resulting in a total of nine Kubernetes Pods.
Each pod, by default, will be scheduled on a different node if possible. According to Kubernetes, ‘the scheduler will automatically do a reasonable placement (e.g. spread your pods across nodes, not place the pod on a node with insufficient free resources, etc.).’ Note below how each of the three microservice’s three replicas has been scheduled on a different node in the three-node AKS cluster.
Next, the three corresponding Kubernetes ClusterIP-type Services are created. And lastly, the Kubernetes Ingress is created. According to Kubernetes, the Ingress resource is an API object that manages external access to the services in a cluster, typically HTTP. Ingress provides load balancing, SSL termination, and name-based virtual hosting.
The Ingress configuration contains the routing rules used with the NGINX Ingress Controller. Shown below are the routing rules for each of the three microservices within the Voter API. Incoming API requests are routed to the appropriate pod and service port by NGINX.
apiVersion: extensions/v1beta1 kind: Ingress metadata: name: voter-ingress namespace: voter-api annotations: ingress.kubernetes.io/ssl-redirect: "true" spec: tls: - hosts: - api.voter-demo.com secretName: api-voter-demo-secret rules: - http: paths: - path: /candidate backend: serviceName: candidate servicePort: 8080 - path: /election backend: serviceName: election servicePort: 8080 - path: /voter backend: serviceName: voter servicePort: 8080
The screengrab below shows all of the Voter API resources created on AKS.
NGINX Ingress Controller
After completing the deployment of the Voter API, the NGINX Ingress Controller is created. It starts with creating the ingress-nginx
namespace. Next, the NGINX Ingress Controller is created, consisting of the NGINX Ingress Controller, three Kubernetes ConfigMap resources, and a default back-end application. The Controller and backend each have their own Service resources. Like the Voter API, each has three replicas, for a total of six pods. Together, the Ingress resource and NGINX Ingress Controller manage traffic to the Spring microservices.
The screengrab below shows all of the NGINX Ingress Controller resources created on AKS.
The NGINX Ingress Controller Service, shown above, has an external public IP address associated with itself. This is because that Service is of the type, Load Balancer. External requests to the Voter API will be routed through the NGINX Ingress Controller, on this IP address.
kind: Service apiVersion: v1 metadata: name: ingress-nginx namespace: ingress-nginx labels: app: ingress-nginx spec: externalTrafficPolicy: Local type: LoadBalancer selector: app: ingress-nginx ports: - name: http port: 80 targetPort: http - name: https port: 443 targetPort: https
If you are only using HTTPS, not HTTP, then the references to HTTP and port 80 in the Ingress configuration are unnecessary. The NGINX Ingress Controller’s resources are explained in detail in the GitHub documentation, along with further configuration instructions.
DNS
To provide convenient access to the Voter API and the Voter Client, my domain, voter-demo.com
, is associated with the public IP address associated with the Voter API Ingress Controller and with the public IP address associated with the Voter Client Azure Web App. DNS configuration is done through Azure’s DNS Zone resource.
The two TXT
type records might not look as familiar as the SOA
, NS
, and A
type records. The TXT
records are required to associate the domain entries with the Voter Client Azure Web App. Browsing to http://www.voter-demo.com or simply http://voter-demo.com brings up the Voter Client.
The Client sends and receives data via the Voter API, available securely at https://api.voter-demo.com.
Routing API Requests
With the Pods, Services, Ingress, and NGINX Ingress Controller created and configured, as well as the Azure Layer 4 Load Balancer and DNS Zone, HTTP requests from API consumers are properly and securely routed to the appropriate microservices. In the example below, three back-to-back requests are made to the voter/info
API endpoint. HTTP requests are properly routed to one of the three Voter pod replicas using the default round-robin algorithm, as proven by the observing the different hostnames (pod names) and the IP addresses (private pod IPs) in each HTTP response.
Final Architecture
Shown below is the final Voter API Azure architecture. To simplify the diagram, I have deliberately left out the three microservice’s ClusterIP-type Services, the three default back-end application pods, and the default back-end application’s ClusterIP-type Service. All resources shown below are within the single East US Azure region, except DNS, which is a global resource.
Shown below is the new Azure Resource Group created by Azure during the AKS provisioning process. The Resource Group contains the various resources required to support the AKS cluster, NGINX Ingress Controller, and the Voter API. Necessary Azure resources were automatically provisioned when I provisioned AKS and when I created the new Voter API and NGINX resources.
In addition to the Resource Group above, the original Resource Group contains the AKS Container Service resource itself, Service Bus, Cosmos DB, and the DNS Zone resource.
The Voter Client Web App, consisting of the Azure App Service and App Service plan resource, is located in a third, separate Resource Group, not shown here.
Cleaning Up AKS
A nice feature of AKS, running a az aks delete
command will delete all the Azure resources created as part of provisioning AKS, the API, and the Ingress Controller. You will have to delete the Cosmos DB, Service Bus, and DNS Zone resources, separately.
az aks delete \ --name cluser_name_goes_here \ --resource-group resource_group_name_goes_here
Conclusion
Taking advantage of Kubernetes with AKS, and the array of Azure’s enterprise-grade resources, the Voter API was shifted from a simple Docker architecture to a production-ready solution. The Voter API is now easier to manage, horizontally scalable, fault-tolerant, and marginally more secure. It is capable of reliably supporting dozens more microservices, with multiple replicas. The Voter API will handle a high volume of data transactions and event messages.
There is much more that needs to be done to productionalize the Voter API on AKS, including:
- Add multi-region failover of Cosmos DB
- Upgrade to Service Bus Standard or Premium Tier
- Optimized Azure VMs and storage for anticipated traffic volumes and application-specific workloads
- Implement Kubernetes RBAC
- Add Monitoring, logging, and alerting with Envoy or similar
- Secure end-to-end TLS communications with Itsio or similar
- Secure the API with OAuth and Azure AD
- Automate everything with DevOps – AKS provisioning, testing code, creating resources, updating microservices, and managing data
All opinions in this post are my own, and not necessarily the views of my current or past employers or their clients.
Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ
Posted by Gary A. Stafford in DevOps, Java Development, Software Development on May 15, 2017
Introduction
In a recent post, Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ, we moved away from synchronous REST HTTP for inter-process communications (IPC) toward message-based IPC. Moving to asynchronous message-based communications allows us to decouple services from one another. It makes it easier to build, test, and release our individual services. In that post, we did not achieve fully asynchronous communications. Although, we did achieve a higher level of service decoupling using message-based Remote Procedure Call (RPC) IPC.
In this post, we will fully decouple our services using the distributed computing model of eventual consistency. More specifically, we will use a message-based, event-driven, loosely-coupled, eventually consistent architectural approach for communications between services.
What is eventual consistency? One of the best definitions of eventual consistency I have read was posted on microservices.io. To paraphrase, ‘using an event-driven, eventually consistent approach, each service publishes an event whenever it updates its data. Other services subscribe to events. When an event is received, a service updates its data.’
Example of Eventual Consistency
Imagine, Service A, the Customer service, inserts a new customer record into its database. Based on that ‘customer created’ event, Service A publishes a message containing the new customer object, serialized to JSON, to the lightweight, persistent, New Customer message queue.
Service B, the Customer Onboarding service, a subscriber to the New Customer queue, consumes and deserializes Service A’s message. Service B may or may not perform a data transformation of the Customer object to its own Customer data model. Service B then inserts the new customer record into its own database.
In the above example, it can be said that the customer records in Service B’s database are eventually consistent with the customer records in Service A’s database. Service A makes a change and publishes a message in response to the event. Service B consumes the message and makes the same change. Eventually (likely within milliseconds), Service B’s customer records are consistent with Service A’s customer records.
Why Eventual Consistency?
So what does this apparent added complexity and duplication of data buy us? Consider the advantages. Service B, the Onboarding service, requires no knowledge of, or a dependency on, Service A, the Customer service. Still, Service B has a current record of all the customers that Service A maintains. Instead of making repeated and potentially costly RESTful HTTP calls or RPC message-based calls to or from Service A to Service B for new customers, Service B queries its database for a list of customers.
The value of eventual consistency increases factorially as you scale a distributed system. Imagine dozens of distinct microservices, many requiring data from other microservices. Further, imagine multiple instances of each of those services all running in parallel. Decoupling services from one another, through asynchronous forms of IPC, messaging, and event-driven eventual consistency greatly simplifies the software development lifecycle and operations.
Demonstration
In this post, we could use a few different architectural patterns to demonstrate message passing with RabbitMQ and Spring AMQP. They including Work Queues, Publish/Subscribe, Routing, or Topics. To keep things as simple as possible, we will have a single Producer, publish messages to a single durable and persistent message queue. We will have a single Subscriber, a Consumer, consume the messages from that queue. We focus on a single type of event message.
Sample Code
To demonstrate Spring AMQP-based messaging with RabbitMQ, we will use a reference set of three Spring Boot microservices. The Election Service, Candidate Service, and Voter Service are all backed by MongoDB. The services and MongoDB, along with RabbitMQ and Voter API Gateway, are all part of the Voter API.
The Voter API Gateway, based on HAProxy, serves as a common entry point to all three services, as well as serving as a reverse proxy and load balancer. The API Gateway provides round-robin load-balanced access to multiple instances of each service.
All the source code found this post’s example is available on GitHub, within a few different project repositories. The Voter Service repository contains the Voter service source code, along with the scripts and Docker Compose files required to deploy the project. The Election Service repository, Candidate Service repository, and Voter API Gateway repository are also available on GitHub. There is also a new AngularJS/Node.js Web Client, to demonstrate how to use the Voter API.
For this post, you only need to clone the Voter Service repository.
Deploying Voter API
All components, including the Spring Boot services, MongoDB, RabbitMQ, API Gateway, and the Web Client, are individually deployed using Docker. Each component is publicly available as a Docker Image, on Docker Hub. The Voter Service repository contains scripts to deploy the entire set of Dockerized components, locally. The repository also contains optional scripts to provision a Docker Swarm, using Docker’s newer swarm mode, and deploy the components. We will only deploy the services locally for this post.
To clone and deploy the components locally, including the Spring Boot services, MongoDB, RabbitMQ, and the API Gateway, execute the following commands. If this is your first time running the commands, it may take a few minutes for your system to download all the required Docker Images from Docker Hub.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone –depth 1 –branch rabbitmq \ | |
https://github.com/garystafford/voter-service.git | |
cd voter-service/scripts-services | |
sh ./stack_deploy_local.sh |
If everything was deployed successfully, you should observe six running Docker containers, similar to the output, below.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES | |
32d73282ff3d garystafford/voter-api-gateway:rabbitmq "/docker-entrypoin…" 8 seconds ago Up 5 seconds 0.0.0.0:8080->8080/tcp voterstack_voter-api-gateway_1 | |
1ece438c5da4 garystafford/candidate-service:rabbitmq "java -Dspring.pro…" 10 seconds ago Up 7 seconds 0.0.0.0:8097->8080/tcp voterstack_candidate_1 | |
30391faa3422 garystafford/voter-service:rabbitmq "java -Dspring.pro…" 10 seconds ago Up 7 seconds 0.0.0.0:8099->8080/tcp voterstack_voter_1 | |
35063ccfe706 garystafford/election-service:rabbitmq "java -Dspring.pro…" 12 seconds ago Up 10 seconds 0.0.0.0:8095->8080/tcp voterstack_election_1 | |
23eae86967a2 rabbitmq:management-alpine "docker-entrypoint…" 14 seconds ago Up 11 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp voterstack_rabbitmq_1 | |
7e77ddecddbb mongo:latest "docker-entrypoint…" 24 seconds ago Up 21 seconds 0.0.0.0:27017->27017/tcp voterstack_mongodb_1 |
Using Voter API
The Voter Service, Election Service, and Candidate Service GitHub repositories each contain README files, which detail all the API endpoints each service exposes, and how to call them.
In addition to casting votes for candidates, the Voter service can simulate election results. Calling the /simulation
endpoint, and indicating the desired election, the Voter service will randomly generate a number of votes for each candidate in that election. This will save us the burden of casting votes for this demonstration. However, the Voter service has no knowledge of elections or candidates. The Voter service depends on the Candidate service to obtain a list of candidates.
The Candidate service manages electoral candidates, their political affiliation, and the election in which they are running. Like the Voter service, the Candidate service also has a /simulation
endpoint. The service will create a list of candidates based on the 2012 and 2016 US Presidential Elections. The simulation capability of the service saves us the burden of inputting candidates for this demonstration.
The Election service manages elections, their polling dates, and the type of election (federal, state, or local). Like the other services, the Election service also has a /simulation
endpoint, which will create a list of sample elections. The Election service will not be discussed in this post’s demonstration. We will examine communications between the Candidate and Voter services, only.
REST HTTP Endpoint
As you recall from our previous post, Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ, the Voter service exposes multiple, almost identical endpoints. Each endpoint uses a different means of IPC to retrieve candidates and generates random votes.
Calling the /voter/simulation/http/{election}
endpoint and providing a specific election, prompts the Voter service to request a list of candidates from the Candidate service, based on the election parameter you input. This request is done using synchronous REST HTTP. The Voter service uses the HTTP GET method to request the data from the Candidate service. The Voter service then waits for a response.
The Candidate service receives the HTTP request. The Candidate service responds to the Voter service with a list of candidates in JSON format. The Voter service receives the response payload containing the list of candidates. The Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, each new vote object (MongoDB document) is written back to the vote
collection in the Voter service’s voters
database.
Message-based RPC Endpoint
Similarly, calling the /voter/simulation/rpc/{election}
endpoint and providing a specific election, prompts the Voter service to request the same list of candidates. However, this time, the Voter service (the client) produces a request message and places in RabbitMQ’s voter.rpc.requests
queue. The Voter service then waits for a response. The Voter service has no direct dependency on the Candidate service; it only depends on a response to its request message. In this way, it is still a form of synchronous IPC, but the Voter service is now decoupled from the Candidate service.
The request message is consumed by the Candidate service (the server), who is listening to that queue. In response, the Candidate service produces a message containing the list of candidates serialized to JSON. The Candidate service (the server) sends a response back to the Voter service (the client) through RabbitMQ. This is done using the Direct reply-to feature of RabbitMQ or using a unique response queue, specified in the reply-to header of the request message, sent by the Voter Service.
The Voter service receives the message containing the list of candidates. The Voter service deserializes the JSON payload to candidate objects. The Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, identical to the previous example, each new vote object (MongoDB document) is written back to the vote
collection in the Voter service’s voters
database.
New Endpoint
Calling the new /voter/simulation/db/{election}
endpoint and providing a specific election, prompts the Voter service to query its own MongoDB database for a list of candidates.
But wait, where did the candidates come from? The Voter service didn’t call the Candidate service? The answer is message-based eventual consistency. Whenever a new candidate is created, using a REST HTTP POST request to the Candidate service’s /candidate/candidates
endpoint, a Spring Data Rest Repository Event Handler responds. Responding to the candidate created event, the event handler publishes a message, containing a serialized JSON representation of the new candidate object, to a durable and persistent RabbitMQ queue.
The Voter service is listening to that queue. The Voter service consumes messages off the queue, deserializes the candidate object, and saves it to its own voters
database, to the candidate
collection. For this example, we are saving the incoming candidate object as is, with no transformations. The candidate object model for both services is identical.
When /voter/simulation/db/{election}
endpoint is called, the Voter service queries its voters database for a list of candidates. They Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, identical to the previous two examples, each new vote object (MongoDB document) is written back to the vote
collection in the Voter service’s voters
database.
Exploring the Code
We will not review the REST HTTP or RPC IPC code in this post. It was covered in detail, in the previous post. Instead, we will explore the new code required for eventual consistency.
Spring Dependencies
To use AMQP with RabbitMQ, we need to add a project dependency on org.springframework.boot.spring-boot-starter-amqp
. Below is a snippet from the Candidate service’s build.gradle
file, showing project dependencies. The Voter service’s dependencies are identical.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dependencies { | |
compile group: 'org.springframework.boot', name: 'spring-boot-actuator-docs' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-rest' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-hateoas' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-logging' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web' | |
compile group: 'org.webjars', name: 'hal-browser' | |
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test' | |
} |
AMQP Configuration
Next, we need to add a small amount of RabbitMQ AMQP configuration to both services. We accomplish this by using Spring’s @Configuration
annotation on our configuration classes. Below is the abridged configuration class for the Voter service.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.voterapi.voter.configuration; | |
import org.springframework.amqp.core.DirectExchange; | |
import org.springframework.amqp.core.Queue; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
@Configuration | |
public class VoterConfig { | |
@Bean | |
public Queue candidateQueue() { | |
return new Queue("candidates.queue"); | |
} | |
} |
And here, the abridged configuration class for the Candidate service.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.voterapi.candidate.configuration; | |
import org.springframework.amqp.core.Binding; | |
import org.springframework.amqp.core.BindingBuilder; | |
import org.springframework.amqp.core.DirectExchange; | |
import org.springframework.amqp.core.Queue; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
@Configuration | |
public class CandidateConfig { | |
@Bean | |
public Queue candidateQueue() { | |
return new Queue("candidates.queue"); | |
} | |
} |
Event Handler
With our dependencies and configuration in place, we will define the CandidateEventHandler class. This class is annotated with the Spring Data Rest @RepositoryEventHandler
and Spring’s @Component
. The @Component
annotation ensures the event handler is registered.
The class contains the handleCandidateSave
method, which is annotated with the Spring Data Rest @HandleAfterCreate
. The event handler acts on the Candidate
object, which is the first parameter in the method signature.
Responding to the candidate created event, the event handler publishes a message, containing a serialized JSON representation of the new candidate object, to the candidates.queue
queue. This was the queue we configured earlier.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.voterapi.candidate.service; | |
import com.fasterxml.jackson.core.JsonProcessingException; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.voterapi.candidate.domain.Candidate; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.amqp.core.Queue; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.data.rest.core.annotation.HandleAfterCreate; | |
import org.springframework.data.rest.core.annotation.RepositoryEventHandler; | |
import org.springframework.stereotype.Component; | |
@Component | |
@RepositoryEventHandler | |
public class CandidateEventHandler { | |
private final Logger logger = LoggerFactory.getLogger(this.getClass()); | |
private RabbitTemplate rabbitTemplate; | |
private Queue candidateQueue; | |
@Autowired | |
public CandidateEventHandler(RabbitTemplate rabbitTemplate, Queue candidateQueue) { | |
this.rabbitTemplate = rabbitTemplate; | |
this.candidateQueue = candidateQueue; | |
} | |
@HandleAfterCreate | |
public void handleCandidateSave(Candidate candidate) { | |
sendMessage(candidate); | |
} | |
private void sendMessage(Candidate candidate) { | |
rabbitTemplate.convertAndSend( | |
candidateQueue.getName(), serializeToJson(candidate)); | |
} | |
private String serializeToJson(Candidate candidate) { | |
ObjectMapper mapper = new ObjectMapper(); | |
String jsonInString = ""; | |
try { | |
jsonInString = mapper.writeValueAsString(candidate); | |
} catch (JsonProcessingException e) { | |
logger.info(String.valueOf(e)); | |
} | |
logger.debug("Serialized message payload: {}", jsonInString); | |
return jsonInString; | |
} | |
} |
Consuming Messages
Next, we let’s switch to the Voter service’s CandidateListService
class. Below is an abridged version of the class with two new methods. First, the getCandidateMessage
method listens to the candidates.queue
queue. This was the queue we configured earlier. The method is annotated with theSpring AMQP Rabbit @RabbitListener
annotation.
The getCandidateMessage
retrieves the new candidate object from the message, deserializes the message’s JSON payload, maps it to the candidate object model and saves it to the Voter service’s database.
The second method, getCandidatesQueueDb
, retrieves the candidates from the Voter service’s database. The method makes use of the Spring Data MongoDB Aggregation package to return a list of candidates from MongoDB.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Consumes a new candidate message, deserializes, and save to MongoDB | |
* @param candidateMessage | |
*/ | |
@RabbitListener(queues = "#{candidateQueue.name}") | |
public void getCandidateMessage(String candidateMessage) { | |
ObjectMapper objectMapper = new ObjectMapper(); | |
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); | |
TypeReference<Candidate> mapType = new TypeReference<Candidate>() {}; | |
Candidate candidate = null; | |
try { | |
candidate = objectMapper.readValue(candidateMessage, mapType); | |
} catch (IOException e) { | |
logger.info(String.valueOf(e)); | |
} | |
candidateRepository.save(candidate); | |
logger.debug("Candidate {} saved to MongoDB", candidate.toString()); | |
} | |
/** | |
* Retrieves candidates from MongoDB and transforms to voter view | |
* @param election | |
* @return List of candidates | |
*/ | |
public List<CandidateVoterView> getCandidatesQueueDb(String election) { | |
Aggregation aggregation = Aggregation.newAggregation( | |
Aggregation.match(Criteria.where("election").is(election)), | |
project("firstName", "lastName", "politicalParty", "election") | |
.andExpression("concat(firstName,' ', lastName)") | |
.as("fullName"), | |
sort(Sort.Direction.ASC, "lastName") | |
); | |
AggregationResults<CandidateVoterView> groupResults | |
= mongoTemplate.aggregate(aggregation, Candidate.class, CandidateVoterView.class); | |
return groupResults.getMappedResults(); | |
} |
RabbitMQ Management Console
The easiest way to observe what is happening with the messages is using the RabbitMQ Management Console. To access the console, point your web browser to localhost
, on port 15672
. The default login credentials for the console are guest/guest. As you successfully produce and consume messages with RabbitMQ, you should see activity on the Overview tab.
Recall we said the queue, in this example, was durable. That means messages will survive the RabbitMQ broker stopping and starting. In the below view of the RabbitMQ Management Console, note the six messages persisted in memory. The Candidate service produced the messages in response to six new candidates being created. However, the Voter service was not running, and therefore, could not consume the messages. In addition, the RabbitMQ server was restarted, after receiving the candidate messages. The messages were persisted and still present in the queue after the successful reboot of RabbitMQ.
Once RabbitMQ and the Voter service instance were back online, the Voter service successfully consumed the six waiting messages from the queue.
Service Logs
In addition to using the RabbitMQ Management Console, we may obverse communications between the two services by looking at the Voter and Candidate service’s logs. I have grabbed a snippet of both service’s logs and added a few comments to show where different processes are being executed.
First the Candidate service logs. We observe a REST HTTP POST request containing a new candidate. We then observe the creation of the new candidate object in the Candidate service’s database, followed by the event handler publishing a message on the queue. Finally, we observe the response is returned in reply to the initial REST HTTP POST request.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# REST HTTP POST Request received | |
2017-05-11 22:43:46.667 DEBUG 18702 — [nio-8097-exec-5] o.a.coyote.http11.Http11InputBuffer : Received [POST /candidate/candidates HTTP/1.1 | |
Host: localhost:8097 | |
User-Agent: HTTPie/0.9.8 | |
Accept-Encoding: gzip, deflate | |
Accept: application/json, */* | |
Connection: keep-alive | |
Content-Type: application/json | |
Content-Length: 127 | |
{"firstName": "Hillary", "lastName": "Clinton", "politicalParty": "Democratic Party", "election": "2016 Presidential Election"}] | |
2017-05-11 22:43:46.667 DEBUG 18702 — [nio-8097-exec-5] o.a.c.authenticator.AuthenticatorBase : Security checking request POST /candidate/candidates | |
… | |
# Inserting new Candidate into database | |
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] o.s.data.mongodb.core.MongoTemplate : Inserting DBObject containing fields: [_class, _id, firstName, lastName, politicalParty, election] in collection: candidate | |
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[candidates] | |
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] org.mongodb.driver.protocol.insert : Inserting 1 documents into namespace candidates.candidate on connection [connectionId{localValue:2, serverValue:147}] to server localhost:27017 | |
2017-05-11 22:43:46.677 DEBUG 18702 — [nio-8097-exec-5] org.mongodb.driver.protocol.insert : Insert completed | |
# Publishing message on queue | |
2017-05-11 22:43:46.678 DEBUG 18702 — [nio-8097-exec-5] o.s.d.r.c.e.AnnotatedEventHandlerInvoker : Invoking AfterCreateEvent handler for Hillary Clinton (Democratic Party). | |
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] c.v.c.service.CandidateEventHandler : Serialized message payload: {"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"} | |
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@1bfb15f5 Shared Rabbit Connection: SimpleConnection@37d1ba14 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59422] | |
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message on exchange [], routingKey = [candidates.queue] | |
# Response to HTTP POST | |
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'persistentEntities' | |
2017-05-11 22:43:46.681 DEBUG 18702 — [nio-8097-exec-5] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'org.springframework.boot.actuate.autoconfigure.EndpointWebMvcHypermediaManagementContextConfiguration$ActuatorEndpointLinksAdvice' | |
2017-05-11 22:43:46.682 DEBUG 18702 — [nio-8097-exec-5] s.d.r.w.j.PersistentEntityJackson2Module : Serializing PersistentEntity org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity@1a4d1ab7. | |
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.w.s.m.m.a.HttpEntityMethodProcessor : Written [Resource { content: Hillary Clinton (Democratic Party), links: [<http://localhost:8097/candidate/candidates/591521621162e1490eb0d537>;rel="self", <http://localhost:8097/candidate/candidates/591521621162e1490eb0d537{?projection}>;rel="candidate"] }] as "application/json" using [org.springframework.data.rest.webmvc.config.RepositoryRestMvcConfiguration$ResourceSupportHttpMessageConverter@27329d2a] | |
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.web.servlet.DispatcherServlet : Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling | |
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.web.servlet.DispatcherServlet : Successfully completed request |
Now the Voter service logs. At the exact same second as the message and the response sent by the Candidate service, the Voter service consumes the message off the queue. The Voter service then deserializes the new candidate object and inserts it into its database.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Retrieving message from queue | |
2017-05-11 22:43:46.242 DEBUG 19001 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@78910096: tags=[{amq.ctag-WCLRWmQ6WRkGgxg-enVslA=candidates.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@386143c0 Shared Rabbit Connection: SimpleConnection@2d187d86 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59586], acknowledgeMode=AUTO local queue size=0 | |
2017-05-11 22:43:46.684 DEBUG 19001 — [pool-1-thread-9] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for Consumer@78910096: tags=[{amq.ctag-WCLRWmQ6WRkGgxg-enVslA=candidates.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@386143c0 Shared Rabbit Connection: SimpleConnection@2d187d86 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59586], acknowledgeMode=AUTO local queue size=0 | |
2017-05-11 22:43:46.685 DEBUG 19001 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:'{"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=candidates.queue, receivedDelay=null, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-WCLRWmQ6WRkGgxg-enVslA, consumerQueue=candidates.queue]) | |
2017-05-11 22:43:46.686 DEBUG 19001 — [cTaskExecutor-1] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload={"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=candidates.queue, amqp_contentEncoding=UTF-8, amqp_deliveryTag=6, amqp_consumerQueue=candidates.queue, amqp_redelivered=false, id=608b990a-919b-52c1-fb64-4af4be03b306, amqp_consumerTag=amq.ctag-WCLRWmQ6WRkGgxg-enVslA, contentType=text/plain, timestamp=1494557026686}]] | |
# Inserting new Candidate into database | |
2017-05-11 22:43:46.687 DEBUG 19001 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoTemplate : Saving DBObject containing fields: [_class, _id, firstName, lastName, politicalParty, election] | |
2017-05-11 22:43:46.687 DEBUG 19001 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[voters] | |
2017-05-11 22:43:46.688 DEBUG 19001 — [cTaskExecutor-1] org.mongodb.driver.protocol.update : Updating documents in namespace voters.candidate on connection [connectionId{localValue:2, serverValue:151}] to server localhost:27017 | |
2017-05-11 22:43:46.703 DEBUG 19001 — [cTaskExecutor-1] org.mongodb.driver.protocol.update : Update completed | |
2017-05-11 22:43:46.703 DEBUG 19001 — [cTaskExecutor-1] c.v.voter.service.CandidateListService : Candidate Hillary Clinton (Democratic Party) saved to MongoDB |
MongoDB
Using the mongo Shell, we can observe six new 2016 Presidential Election candidates in the Candidate service’s database.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
> show dbs | |
candidates 0.000GB | |
voters 0.000GB | |
> use candidates | |
switched to db candidates | |
> show collections | |
candidate | |
> db.candidate.find({}) | |
{ "_id" : ObjectId("5915220e1162e14b2a42e65e"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Donald", "lastName" : "Trump", "politicalParty" : "Republican Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("5915220f1162e14b2a42e65f"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Chris", "lastName" : "Keniston", "politicalParty" : "Veterans Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("591522101162e14b2a42e660"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Jill", "lastName" : "Stein", "politicalParty" : "Green Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("591522101162e14b2a42e661"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Gary", "lastName" : "Johnson", "politicalParty" : "Libertarian Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("591522111162e14b2a42e662"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Darrell", "lastName" : "Castle", "politicalParty" : "Constitution Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("591522111162e14b2a42e663"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Hillary", "lastName" : "Clinton", "politicalParty" : "Democratic Party", "election" : "2016 Presidential Election" } |
Now, looking at the Voter service’s database, we should find the same six 2016 Presidential Election candidates. Note the Object IDs are the same between the two service’s document sets, as are the rest of the fields (first name, last name, political party, and election). However, the class field is different between the two service’s records.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
> show dbs | |
candidates 0.000GB | |
voters 0.000GB | |
> use voters | |
> db.candidate.find({}) | |
{ "_id" : ObjectId("5915220e1162e14b2a42e65e"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Donald", "lastName" : "Trump", "politicalParty" : "Republican Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("5915220f1162e14b2a42e65f"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Chris", "lastName" : "Keniston", "politicalParty" : "Veterans Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("591522101162e14b2a42e660"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Jill", "lastName" : "Stein", "politicalParty" : "Green Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("591522101162e14b2a42e661"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Gary", "lastName" : "Johnson", "politicalParty" : "Libertarian Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("591522111162e14b2a42e662"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Darrell", "lastName" : "Castle", "politicalParty" : "Constitution Party", "election" : "2016 Presidential Election" } | |
{ "_id" : ObjectId("591522111162e14b2a42e663"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Hillary", "lastName" : "Clinton", "politicalParty" : "Democratic Party", "election" : "2016 Presidential Election" } |
Production Considerations
The post demonstrated a simple example of message-based, event-driven eventual consistency. In an actual Production environment, there are a few things that must be considered.
- We only addressed a ‘candidate created’ event. We would also have to code for other types of events, such as a ‘candidate deleted’ event and a ‘candidate updated’ event.
- If a candidate is added, deleted, then re-added, are the events published and consumed in the right order? What about with multiple instances of the Voter service running? Does this pattern guarantee event ordering?
- How should the Candidate service react on startup if RabbitMQ is not available
- What if RabbitMQ fails after the Candidate services have started?
- How should the Candidate service react if a new candidate record is added to the database, but a ‘candidate created’ event message cannot be published to RabbitMQ? The two actions are not wrapped in a single transaction.
- In all of the above scenarios, what response should be returned to the API end user?
Conclusion
In this post, using eventual consistency, we successfully decoupled our two microservices and achieved asynchronous inter-process communications. Adopting a message-based, event-driven, loosely-coupled architecture, wherever possible, in combination with REST HTTP when it makes sense, will improve the overall manageability and scalability of a microservices-based platform.
References
- RabbitMQ: Understanding Message Broker
- Building Microservices: Inter-Process Communication in a Microservices Architecture
- Spring: Understanding AMQP
- AMQP 0-9-1 Complete Reference Guide
- Handling Eventual Consistency in JVM Microservices with Event Sourcing (javaone 2016)
All opinions in this post are my own and not necessarily the views of my current employer or their clients.
Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ
Posted by Gary A. Stafford in DevOps, Enterprise Software Development, Java Development, Software Development on May 8, 2017
Introduction
There has been a considerable growth in modern, highly scalable, distributed application platforms, built around fine-grained RESTful microservices. Microservices generally use lightweight protocols to communicate with each other, such as HTTP, TCP, UDP, WebSockets, MQTT, and AMQP. Microservices commonly communicate with each other directly using REST-based HTTP, or indirectly, using messaging brokers.
There are several well-known, production-tested messaging queues, such as Apache Kafka, Apache ActiveMQ, Amazon Simple Queue Service (SQS), and Pivotal’s RabbitMQ. According to Pivotal, of these messaging brokers, RabbitMQ is the most widely deployed open source message broker.
RabbitMQ supports multiple messaging protocols. RabbitMQ’s primary protocol, the Advanced Message Queuing Protocol (AMQP), is an open standard wire-level protocol and semantic framework for high-performance enterprise messaging. According to Spring, ‘AMQP has exchanges, routes, and queues. Messages are first published to exchanges. Routes define on which queue(s) to pipe the message. Consumers subscribing to that queue then receive a copy of the message.’
Pivotal’s Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. The project’s libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration. The project provides a ‘template’ (RabbitTemplate) as a high-level abstraction for sending and receiving messages.
In this post, we will explore how to start moving Spring Boot Java services away from using synchronous REST HTTP for inter-process communications (IPC), and toward message-based IPC. Moving from synchronous IPC to messaging queues and asynchronous IPC decouples services from one another, allowing us to more easily build, test, and release individual microservices.
Message-Based RPC IPC
Decoupling services using asynchronous IPC is considered optimal by many enterprise software architects when developing modern distributed platforms. However, sometimes it is not easy or possible to get away from synchronous communications. Rightly or wrongly, often times services are architected, such that one service needs to retrieve data from another service or services, in order to process its own requests. It can be said, that service has a direct dependency on the other services. Many would argue, services, especially RESTful microservices, should not be coupled in this way.
There are several ways to break direct service-to-service dependencies using asynchronous IPC. We might implement request/async response REST HTTP-based IPC. We could also use publish/subscribe or publish/async response messaging queue-based IPC. These are all described by NGINX, in their article, Building Microservices: Inter-Process Communication in a Microservices Architecture; a must-read for anyone working with microservices. We might also implement an architecture which supports eventual consistency, eliminating the need for one service to obtain data from another service.
So what if we cannot implement asynchronous methods to break direct service dependencies, but we want to move toward message-based IPC? One answer is message-based Remote Procedure Call (RPC) IPC. I realize the mention of RPC might send cold shivers down the spine of many seasoned architected. Traditional RPC has several challenges, many which have been overcome with more modern architectural patterns.
According to Wikipedia, ‘in distributed computing, a remote procedure call (RPC) is when a computer program causes a procedure (subroutine) to execute in another address space (commonly on another computer on a shared network), which is coded as if it were a normal (local) procedure call, without the programmer explicitly coding the details for the remote interaction.’
Although still a form of RPC and not asynchronous, it is possible to replace REST HTTP IPC with message-based RPC IPC. Using message-based RPC, services have no direct dependencies on other services. A service only depends on a response to a message request it makes to that queue. The services are now decoupled from one another. The requestor service (the client) has no direct knowledge of the respondent service (the server).
RPC with RabbitMQ and AMQP
RabbitMQ has an excellent set of six tutorials, which cover the basics of creating messaging applications, applying different architectural patterns, using RabbitMQ, in several different programming languages. The sixth and final tutorial covers using RabbitMQ for RPC-based IPC, with the request/reply architectural pattern.
Pivotal recently added Spring AMPQ implementations to each RabbitMQ tutorial, based on their Spring AMQP project. If you recall, the Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions.
This post’s RPC IPC example is closely based on the architectural pattern found in the Spring AMQP RabbitMQ tutorial.
Sample Code
To demonstrate Spring AMQP-based RPC IPC messaging with RabbitMQ, we will use a pair of simple Spring Boot microservices. These services, the Voter and Candidate services, have been used in several previous posts, and for training and testing DevOps engineers. Both services are backed by MongoDB. The services and MongoDB, along with RabbitMQ, are all part of the Voter API project. The Voter API project also contains an HAProxy-based API Gateway, which provides indirect, load-balanced access to the two services.
All code necessary to build this post’s example is available on GitHub, within three projects. The Voter Service project repository contains the Voter service source code, along with the scripts and Docker Compose files required to deploy the project. The Candidate Service project repository and the Voter API Gateway project repository are also available on GitHub. For this post, you need only clone the Voter Service project repository.
Deploying Voter API
All components, including the two Spring services, MongoDB, RabbitMQ, and the API Gateway, are individually deployed using Docker. Each component is publicly available as a Docker Image, on Docker Hub.
The Voter Service repository contains scripts to deploy the entire set of Dockerized components, locally. The repository also contains optional scripts to provision a Docker Swarm, using Docker’s newer swarm mode, and deploy the components. We will only deploy the services locally for this post.
To clone and deploy the components locally, including the two Spring services, MongoDB, RabbitMQ, and the API Gateway, execute the following commands. If this is your first time running the commands, it may take a few minutes for your system to download all the required Docker Images from Docker Hub.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone –depth 1 –branch rabbitmq \ | |
https://github.com/garystafford/voter-service.git | |
cd voter-service/scripts-services | |
sh ./stack_deploy_local.sh |
If everything was deployed successfully, you should see the following output. You should observe five running Docker containers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
? docker ps | |
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES | |
8ef4866984c3 garystafford/voter-api-gateway:rabbitmq "/docker-entrypoin…" 25 hours ago Up 25 hours 0.0.0.0:8080->8080/tcp voterstack_voter-api-gateway_1 | |
cc28d084ab17 garystafford/candidate-service:rabbitmq "java -Dspring.pro…" 25 hours ago Up 25 hours 0.0.0.0:8097->8080/tcp voterstack_candidate_1 | |
e4c22258b77b garystafford/voter-service:rabbitmq "java -Dspring.pro…" 25 hours ago Up 25 hours 0.0.0.0:8099->8080/tcp voterstack_voter_1 | |
fdb4b9f58a53 rabbitmq:management-alpine "docker-entrypoint…" 25 hours ago Up 25 hours 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp voterstack_rabbitmq_1 | |
1678227b143c mongo:latest "docker-entrypoint…" 25 hours ago Up 25 hours 0.0.0.0:27017->27017/tcp voterstack_mongodb_1 |
Using Voter API
The Voter Service and Candidate Service GitHub repositories both contain README files, which detail all the API endpoints each service exposes, and how to call them.
In addition to casting votes for candidates, the Voter service has the ability to simulate election results. By calling a /simulation
endpoint, and indicating the desired election, the Voter service will randomly generate a number of votes for each candidate in that election. This will save us the burden of casting votes for this demonstration. However, the Voter service has no knowledge of elections or candidates. To obtain a list of candidates, the Voter service depends on the Candidate service.
The Candidate service manages electoral candidates, their political affiliation, and the election in which they are running. Like the Voter service, the Candidate service also has a /simulation
endpoint. The service will create a list of candidates based on the 2012 and 2016 US Presidential Elections. The simulation capability of the service saves us the burden of inputting candidates for this demonstration.
REST HTTP Endpoint
The Voter service exposes two almost identical endpoints. Both endpoints generate random votes. However, below the covers, the two endpoints are very different. Calling the /voter/simulation/http/{election}
endpoint, prompts the Voter service to request a list of candidates from the Candidate service, based on the election parameter you input. This request is done using synchronous REST HTTP. The Voter service uses the HTTP GET method to request the data from the Candidate service. The Voter service then waits for a response.
The HTTP request is received by the Candidate service. The Candidate service responds to the Voter service with a list of candidates, in JSON format. The Voter service receives the response containing the list of candidates. The Voter service then proceeds to generate a random number of votes for each candidate. Finally, each new vote object (MongoDB document) is written back to the vote
collection in the Voter service’s voters
database.
Message-based RPC Endpoint
Similarly, calling the /voter/simulation/rpc/{election}
endpoint with a specific election prompts the Voter service to request the same list of candidates. However, this time, the Voter service (the client), produces a request message and places in RabbitMQ’s voter.rpc.requests
queue. The Voter service then waits for a response. The Voter service has no direct dependency on the Candidate service. It only depends on a response to its message request. In this way, it is still a form of synchronous IPC, but the Voter service is now decoupled from the Candidate service.
The request message is consumed by the Candidate service (the server), who is listening to that queue. In response, the Candidate service produces a message containing the list of candidates, serialized to JSON. The Candidate service (the server) sends a response back to the Voter service (the client), through RabbitMQ. This is done using the Direct reply-to feature of RabbitMQ or using a unique response queue, specified in the reply-to header of the request message, sent by the Voter Service.
According to RabbitMQ, ‘the direct reply-to feature allows RPC clients to receive replies directly from their RPC server, without going through a reply queue. (“Directly” here still means going through AMQP and the RabbitMQ server; there is no separate network connection between RPC client and RPC server.)’
According to Spring, ‘starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with Spring AMQP version 1.4.1 Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues. When no replyQueue is provided (or it is set with the name amq.rabbitmq.reply-to), the RabbitTemplate will automatically detect whether Direct reply-to is supported and use it, or fall back to using a temporary reply queue. When using Direct reply-to, a reply-listener is not required and should not be configured.’ We are using the latest versions of both RabbitMQ and Spring AMQP, which should support Direct reply-to.
The Voter service receives the message containing the list of candidates. The Voter service deserializes the JSON payload to Candidate objects and proceeds to generate a random number of votes for each candidate in the list. Finally, each new vote object (MongoDB document) is written back to the vote
collection in the Voter service’s voters
database.
Exploring the RPC Code
We will not examine the REST HTTP IPC code in this post. Instead, we will explore the RPC code. You are welcome to download the source code and explore the REST HTTP code pattern; it uses some advanced features of Spring Boot and Spring Data.
Spring Dependencies
In order to use RabbitMQ, we need to add a project dependency on org.springframework.boot.spring-boot-starter-amqp
. Below is a snippet from the Candidate service’s build.gradle
file, showing project dependencies. The Voter service’s dependencies are identical.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dependencies { | |
compile group: 'org.springframework.boot', name: 'spring-boot-actuator-docs' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-rest' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-hateoas' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-logging' | |
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web' | |
compile group: 'org.webjars', name: 'hal-browser' | |
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test' | |
} |
AMQP Configuration
Next, we need to add a small amount of RabbitMQ AMQP configuration to both services. We accomplish this by using Spring’s @Configuration
annotation on our configuration classes. Below is the configuration class for the Voter service.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.voterapi.voter.configuration; | |
import org.springframework.amqp.core.DirectExchange; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
@Configuration | |
public class VoterConfig { | |
@Bean | |
public DirectExchange directExchange() { | |
return new DirectExchange("voter.rpc"); | |
} | |
} |
And here, the configuration class for the Candidate service.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.voterapi.candidate.configuration; | |
import org.springframework.amqp.core.Binding; | |
import org.springframework.amqp.core.BindingBuilder; | |
import org.springframework.amqp.core.DirectExchange; | |
import org.springframework.amqp.core.Queue; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
@Configuration | |
public class CandidateConfig { | |
@Bean | |
public Queue queue() { | |
return new Queue("voter.rpc.requests"); | |
} | |
@Bean | |
public DirectExchange exchange() { | |
return new DirectExchange("voter.rpc"); | |
} | |
@Bean | |
public Binding binding(DirectExchange exchange, Queue queue) { | |
return BindingBuilder.bind(queue).to(exchange).with("rpc"); | |
} | |
} |
Candidate Service Code
With the dependencies and configuration in place, we define the method in the Voter service, which will request the candidates from the Candidate service, using RabbitMQ. Below is an abridged version of the Voter service’s CandidateListService
class, containing the getCandidatesMessageRpc
method. This method calls the rabbitTemplate.convertSendAndReceive
method (see line 5, below).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public List<CandidateVoterView> getCandidatesMessageRpc(String election) { | |
logger.debug("Sending RPC request message for list of candidates…"); | |
String requestMessage = election; | |
String candidates = (String) rabbitTemplate.convertSendAndReceive( | |
directExchange.getName(), "rpc", requestMessage); | |
TypeReference<Map<String, List<CandidateVoterView>>> mapType = | |
new TypeReference<Map<String, List<CandidateVoterView>>>() {}; | |
ObjectMapper objectMapper = new ObjectMapper(); | |
Map<String, List<CandidateVoterView>> candidatesMap = null; | |
try { | |
candidatesMap = objectMapper.readValue(candidates, mapType); | |
} catch (IOException e) { | |
logger.info(String.valueOf(e)); | |
} | |
List<CandidateVoterView> candidatesList = candidatesMap.get("candidates"); | |
logger.debug("List of {} candidates received…", candidatesList.size()); | |
return candidatesList; | |
} |
Voter Service Code
Next, we define a method in the Candidate service, which will process the Voter service’s request. Below is an abridged version of the CandidateController
class, containing the getCandidatesMessageRpc
method. This method is decorated with Spring’s @RabbitListener
annotation (see line 1, below). This annotation marks c to be the target of a Rabbit message listener on the voter.rpc.requests
queue.
Also shown, are the getCandidatesMessageRpc
method’s two helper methods, getByElection
and serializeToJson
. These methods query MongoDB for the list of candidates and serialize the list to JSON.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@RabbitListener(queues = "voter.rpc.requests") | |
private String getCandidatesMessageRpc(String requestMessage) { | |
logger.debug("Request message: {}", requestMessage); | |
logger.debug("Sending RPC response message with list of candidates…"); | |
List<CandidateVoterView> candidates = getByElection(requestMessage); | |
return serializeToJson(candidates); | |
} | |
private List<CandidateVoterView> getByElection(String election) { | |
Aggregation aggregation = Aggregation.newAggregation( | |
Aggregation.match(Criteria.where("election").is(election)), | |
project("firstName", "lastName", "politicalParty", "election") | |
.andExpression("concat(firstName,' ', lastName)") | |
.as("fullName"), | |
sort(Sort.Direction.ASC, "lastName") | |
); | |
AggregationResults<CandidateVoterView> groupResults | |
= mongoTemplate.aggregate(aggregation, Candidate.class, CandidateVoterView.class); | |
return groupResults.getMappedResults(); | |
} | |
private String serializeToJson(List<CandidateVoterView> candidates) { | |
ObjectMapper mapper = new ObjectMapper(); | |
String jsonInString = ""; | |
final Map<String, List<CandidateVoterView>> dataMap = new HashMap<>(); | |
dataMap.put("candidates", candidates); | |
try { | |
jsonInString = mapper.writeValueAsString(dataMap); | |
} catch (JsonProcessingException e) { | |
logger.info(String.valueOf(e)); | |
} | |
logger.debug(jsonInString); | |
return jsonInString; | |
} |
Demonstration
To demonstrate both the synchronous REST HTTP IPC code and the Spring AMQP-based RPC IPC code, we will make a few REST HTTP calls to the Voter API Gateway. For convenience, I have provided a shell script, demostrate_ipc.sh
, which executes all the API calls necessary. I have added sleep commands to slow the output to the terminal down a bit, for easier analysis. The script requires HTTPie, a great time saver when working with RESTful services.
The demostrate_ipc.sh
script does three things. First, it calls the Candidate service to generate a group of sample candidates. Next, the script calls the Voter service to simulate votes, using synchronous REST HTTP. Lastly, the script repeats the voter simulation, this time using message-based RPC IPC. All API calls are done through the Voter API Gateway on port 8080
. To understand the API calls, examine the script, below.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
# Demostrate API calls for REST HTTP IPC and RPC IPC via API Gateway | |
# Requires HTTPie | |
# Requires all services are running | |
set -e | |
HOST=${1:-localhost:8080} | |
API_GATEWAY="http://${HOST}" | |
ELECTION="2016%20Presidential%20Election" | |
echo "Simulating candidates…" | |
http ${API_GATEWAY}/candidate/simulation && sleep 2 | |
http ${API_GATEWAY}/candidate/candidates/summary/${ELECTION} && sleep 2 | |
echo "Simulating voting using REST HTTP IPC…" | |
http ${API_GATEWAY}/voter/simulation/http/${ELECTION} && sleep 2 | |
http ${API_GATEWAY}/voter/results && sleep 4 | |
http ${API_GATEWAY}/voter/winners && sleep 2 | |
echo "Simulating voting using message-based RPC IPC…" | |
http ${API_GATEWAY}/voter/simulation/rpc/${ELECTION} && sleep 2 | |
http ${API_GATEWAY}/voter/results && sleep 4 | |
http ${API_GATEWAY}/voter/winners && sleep 2 | |
echo "Script completed…" |
Below is the list of candidates for the 2016 Presidential Election, generated by the Candidate service. The JSON payload was retrieved using the Voter service’s /voter/candidates/rpc/{election}
endpoint. This endpoint uses the same RPC IPC method as the Voter service’s /voter/simulation/rpc/{election}
endpoint.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HTTP/1.1 200 | |
Access-Control-Allow-Credentials: true | |
Access-Control-Allow-Headers: Content-Type, Accept, X-Requested-With, remember-me | |
Access-Control-Allow-Methods: POST, GET, OPTIONS, DELETE | |
Access-Control-Max-Age: 3600 | |
Content-Type: application/json;charset=UTF-8 | |
Date: Sun, 07 May 2017 19:10:22 GMT | |
Transfer-Encoding: chunked | |
X-Application-Context: Voter Service:docker-local:8099 | |
{ | |
"candidates": [ | |
{ | |
"election": "2016 Presidential Election", | |
"fullName": "Darrell Castle", | |
"politicalParty": "Constitution Party" | |
}, | |
{ | |
"election": "2016 Presidential Election", | |
"fullName": "Hillary Clinton", | |
"politicalParty": "Democratic Party" | |
}, | |
{ | |
"election": "2016 Presidential Election", | |
"fullName": "Gary Johnson", | |
"politicalParty": "Libertarian Party" | |
}, | |
{ | |
"election": "2016 Presidential Election", | |
"fullName": "Chris Keniston", | |
"politicalParty": "Veterans Party" | |
}, | |
{ | |
"election": "2016 Presidential Election", | |
"fullName": "Jill Stein", | |
"politicalParty": "Green Party" | |
}, | |
{ | |
"election": "2016 Presidential Election", | |
"fullName": "Donald Trump", | |
"politicalParty": "Republican Party" | |
} | |
] | |
} |
Based on the list of candidates, below are the simulated election results. This JSON payload was retrieved using the Voter service’s /voter/results
endpoint.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicod