Posts Tagged Messaging

IoT Telemetry Collection using Google Protocol Buffers, Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas

Business team meeting. Photo professional investor working new s
Collect IoT sensor telemetry using Google Protocol Buffers’ serialized binary format over HTTPS, serverless Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas on GCP, as an alternative to integrated Cloud IoT platforms and standard IoT protocols. Aggregate, analyze, and build machine learning models with the data using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

Introduction

Most of the dominant Cloud providers offer IoT (Internet of Things) and IIotT  (Industrial IoT) integrated services. Amazon has AWS IoT, Microsoft Azure has multiple offering including IoT Central, IBM’s offering including IBM Watson IoT Platform, Alibaba Cloud has multiple IoT/IIoT solutions for different vertical markets, and Google offers Google Cloud IoT platform. All of these solutions are marketed as industrial-grade, highly-performant, scalable technology stacks. They are capable of scaling to tens-of-thousands of IoT devices or more and massive amounts of streaming telemetry.

In reality, not everyone needs a fully integrated IoT solution. Academic institutions, research labs, tech start-ups, and many commercial enterprises want to leverage the Cloud for IoT applications, but may not be ready for a fully-integrated IoT platform or are resistant to Cloud vendor platform lock-in.

Similarly, depending on the performance requirements and the type of application, organizations may not need or want to start out using IoT/IIOT industry standard data and transport protocols, such as MQTT (Message Queue Telemetry Transport) or CoAP (Constrained Application Protocol), over UDP (User Datagram Protocol). They may prefer to transmit telemetry over HTTP using TCP, or securely, using HTTPS (HTTP over TLS).

Demonstration

In this demonstration, we will collect environmental sensor data from a number of IoT device sensors and stream that telemetry over the Internet to Google Cloud. Each IoT device is installed in a different physical location. The devices contain a variety of common sensors, including humidity and temperature, motion, and light intensity.

iot_3.jpg

Prototype IoT Devices used in this Demonstration

We will transmit the sensor telemetry data as JSON over HTTP to serverless Google Cloud Function HTTPS endpoints. We will then switch to using Google’s Protocol Buffers to transmit binary data over HTTP. We should observe a reduction in the message size contained in the request payload as we move from JSON to Protobuf, which should reduce system latency and cost.

Data received by Cloud Functions over HTTP will be published asynchronously to Google Cloud Pub/Sub. A second Cloud Function will respond to all published events and push the messages to MongoDB Atlas on GCP. Once in Atlas, we will aggregate, transform, analyze, and build machine learning models with the data, using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

For this demonstration, the architecture for JSON over HTTP will look as follows. All sensors will transmit data to a single Cloud Function HTTPS endpoint.

JSON IoT Basic Icons.png

For Protobuf over HTTP, the architecture will look as follows in the demonstration. Each type of sensor will transmit data to a different Cloud Function HTTPS endpoint.

Demo IoT Diagram Icons.png

Although the Cloud Functions will automatically scale horizontally to accommodate additional load created by the volume of telemetry being received, there are also other options to scale the system. For example, we could create individual pipelines of functions and topic/subscriptions for each sensor type. We could also split the telemetry data across multiple MongoDB Atlas Collections, based on sensor type, instead of a single collection. In all cases, we will still benefit from the Cloud Function’s horizontal scaling capabilities.

Complex IoT Diagram Icons.png

Source Code

All source code is all available on GitHub. Use the following command to clone the project.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/iot-protobuf-demo.git

You will need to adjust the project’s environment variables to fit your own development and Cloud environments. All source code for this post is written in Python. It is intended for Python 3 interpreters but has been tested using Python 2 interpreters. The project’s Jupyter Notebooks can be viewed from within the project on GitHub or using the free, online Jupyter nbviewer.

screen_shot_2019-05-21_at_12_55_25_pm

Technologies

Protocol Buffers

Image result for google developerAccording to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.

Each protocol buffer message is a small logical record of information, containing a series of strongly-typed name-value pairs. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto file to generate data access classes.

Google Cloud Functions

Cloud-Functions.png

According to Google, Cloud Functions is Google’s event-driven, serverless compute platform. Key features of Cloud Functions include automatic scaling, high-availability, fault-tolerance,
no servers to provision, manage, patch or update, only
pay while your code runs, and they easily connect and extend other cloud services. Cloud Functions natively support multiple event-types, including HTTP, Cloud Pub/Sub, Cloud Storage, and Firebase. Current language support includes Python, Go, and Node.

Google Cloud Pub/Sub

google pub-subAccording to Google, Cloud Pub/Sub is an enterprise message-oriented middleware for the Cloud. It is a scalable, durable event ingestion and delivery system. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independent applications. Cloud Pub/Sub delivers low-latency, durable messaging that integrates with systems hosted on the Google Cloud Platform and externally.

MongoDB Atlas

mongodbMongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime service-level agreements, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, free M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

Cost Effectiveness of Cloud Functions

At true IIoT scale, Google Cloud Functions may not be the most efficient or cost-effective method of ingesting telemetry data. Based on Google’s pricing model, you get two million free function invocations per month, with each additional million invocations costing USD $0.40. The total cost also includes memory usage, total compute time, and outbound data transfer. If your system is comprised of tens or hundreds of IoT devices, Cloud Functions may prove cost-effective.

However, with thousands of devices or more, each transmitting data multiple times per minutes, you could quickly outgrow the cost-effectiveness of Google Functions. In that case, you might look to Google’s Google Cloud IoT platform. Alternately, you can build your own platform with Google products such as Knative, letting you choose to run your containers either fully managed with the newly-released Cloud Run, or in your Google Kubernetes Engine cluster with Cloud Run on GKE.

Sensor Scripts

For each sensor type, I have developed separate Python scripts, which run on each IoT device. There are two versions of each script, one for JSON over HTTP and one for Protobuf over HTTP.

JSON over HTTPS

Below we see the script, dht_sensor_http_json.py, used to transmit humidity and temperature data via JSON over HTTP to a Google Cloud Function running on GCP. The JSON request payload contains a timestamp, IoT device ID, device type, and the temperature and humidity sensor readings. The URL for the Google Cloud Function is stored as an environment variable, local to the IoT devices, and set when the script is deployed.

import json
import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests

URL = os.environ.get('GCF_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)
        payload = {'device': device_id,
                   'type': TYPE,
                   'timestamp': time.time(),
                   'data': {'temperature': temperature,
                            'humidity': humidity}}
        post_data(payload)
        time.sleep(FREQUENCY)


def post_data(payload):
    payload = json.dumps(payload)

    headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': JWT
    }

    try:
        requests.post(URL, json=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Telemetry Frequency

Although the sensors are capable of producing data many times per minute, for this demonstration, sensor telemetry is intentionally limited to only being transmitted every 15 seconds. To reduce system complexity, potential latency, back-pressure, and cost, in my opinion, you should only produce telemetry data at the frequency your requirements dictate.

JSON Web Tokens

For security, in addition to the HTTPS endpoints exposed by the Google Cloud Functions, I have incorporated the use of a JSON Web Token (JWT). JSON Web Tokens are an open, industry standard RFC 7519 method for representing claims securely between two parties. In this case, the JWT is used to verify the identity of the sensor scripts sending telemetry to the Cloud Functions. The JWT contains an id, password, and expiration, all encrypted with a secret key, which is known to each Cloud Function, in order to verify the IoT device’s identity. Without the correct JWT being passed in the Authorization header, the request to the Cloud Function will fail with an HTTP status code of 401 Unauthorized. Below is an example of the JWT’s payload data.

{
  "sub": "IoT Protobuf Serverless Demo",
  "id": "iot-demo-key",
  "password": "t7J2gaQHCFcxMD6584XEpXyzWhZwRrNJ",
  "iat": 1557407124,
  "exp": 1564664724
}

For this demonstration, I created a temporary JWT using jwt.io. The HTTP Functions are using PyJWT, a Python library which allows you to encode and decode the JWT. The PyJWT library allows the Function to decode and validate the JWT (Bearer Token) from the incoming request’s Authorization header. The JWT token is stored as an environment variable. Deployment instructions are included in the GitHub project.

screen_shot_2019-05-09_at_5_13_28_pm

JSON Payload

Below is a typical JSON request payload (pretty-printed), containing DHT sensor data. This particular message is 148 bytes in size. The message format is intentionally reader-friendly. We could certainly shorten the message’s key fields, to reduce the payload size by an additional 15-20 bytes.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Protocol Buffers

For the demonstration, I have built a Protocol Buffers file, sensors.proto, to support the data output by three sensor types: digital humidity and temperature (DHT), passive infrared sensor (PIR), and digital light intensity (DLI). I am using the newer proto3 version of the protocol buffers language. I have created a common Protobuf sensor message schema, with the variable sensor telemetry stored in the nested data object, within each message type.

It is important to use the correct Protobuf Scalar Value Type to maintain numeric precision in the language you compile for. For simplicity, I am using a double to represent the timestamp, as well as the numeric humidity and temperature readings. Alternately, you could choose Google’s Protobuf WellKnownTypesTimestamp to store timestamp.

syntax = "proto3";

package sensors;

// DHT22
message SensorDHT {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDHT data = 4;
}

message DataDHT {
    double temperature = 1;
    double humidity = 2;
}

// Onyehn_PIR
message SensorPIR {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataPIR data = 4;
}

message DataPIR {
    bool motion = 1;
}

// Anmbest_MD46N
message SensorDLI {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDLI data = 4;
}

message DataDLI {
    bool light = 1;
}

Since the sensor data will be captured with scripts written in Python 3, the Protocol Buffers file is compiled for Python, resulting in the file, sensors_pb2.py.

protoc --python_out=. sensors.proto

Protocol Buffers over HTTPS

Below we see the alternate DHT sensor script, dht_sensor_http_pb.py, which transmits a Protocol Buffers-based binary request payload over HTTPS to a Google Cloud Function running on GCP. Note the request’s Content-Type header has been changed from application/json to application/x-protobuf. In this case, instead of JSON, the same data fields are stored in an instance of the Protobuf’s SensorDHT message type (sensors_pb2.SensorDHT()). Note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device.

import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests
import sensors_pb2

URL = os.environ.get('GCF_DHT_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        try:
            humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)

            sensor_dht = sensors_pb2.SensorDHT()
            sensor_dht.device = device_id
            sensor_dht.type = TYPE
            sensor_dht.timestamp = time.time()
            sensor_dht.data.temperature = temperature
            sensor_dht.data.humidity = humidity

            payload = sensor_dht.SerializeToString()

            post_data(payload)
            time.sleep(FREQUENCY)
        except TypeError:
            logging.error('Error getting sensor data!')


def post_data(payload):
    headers = {
        'Content-Type': 'application/x-protobuf',
        'Authorization': JWT
    }

    try:
        requests.post(URL, data=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Protobuf Binary Payload

To understand the binary Protocol Buffers-based payload, we can write a sample SensorDHT message to a file on disk as a byte array.

message = sensorDHT.SerializeToString()

binary_file_output = open("./data_binary.txt", "wb")
file_byte_array = bytearray(message)
binary_file_output.write(file_byte_array)

Then, using the hexdump command, we can view a representation of the binary data file.

> hexdump -C data_binary.txt
00000000  0a 08 38 32 39 63 37 65  30 65 12 05 44 48 54 32  |..829c7e0e..DHT2|
00000010  32 1d 05 a0 b9 4e 22 0a  0d ec 51 b2 41 15 cd cc  |2....N"...Q.A...|
00000020  38 42                                             |8B|
00000022

The binary data file size is 48 bytes on disk, as compared to the equivalent JSON file size of 148 bytes on disk (32% the size). As a test, we could then send that binary data file as the payload of a POST to the Cloud Function, as shown below using Postman. Postman will serialize the binary data file’s contents to a binary string before transmitting.

screen_shot_2019-05-14_at_7_00_39_am.png

Similarly, we can serialize the same binary Protocol Buffers-based SensorDHT message to a binary string using the SerializeToString method.

message = sensorDHT.SerializeToString()
print(message)

The resulting binary string resembles the following.

b'\n\nrp829c7e0e\x12\x05DHT22\x19c\xee\xbcg\xf5\x8e\xccA"\x12\t\x00\x00\x00\xa0\x99\x191@\x11\x00\x00\x00`f\x06Q@'

The binary string length of the serialized message, and therefore the request payload sent by Postman and received by the Cloud Function for this particular message, is 111 bytes, as compared to the JSON payload size of 148 bytes (75% the size).

Validate Protobuf Payload

To validate the data contained in the Protobuf payload is identical to the JSON payload, we can parse the payload from the serialized binary string using the Protobuf ParseFromString method. We then convert it to JSON using the Protobuf MessageToJson method.

message = sensorDHT.SerializeToString() 
message_parsed = sensors_pb2.SensorDHT()
message_parsed.ParseFromString(message)
print(MessageToJson(message_parsed))

The resulting JSON object is identical to the JSON payload sent using JSON over HTTPS, earlier in the demonstration.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Google Cloud Functions

There are a series of Google Cloud Functions, specifically four HTTP Functions, which accept the sensor data over HTTP from the IoT devices. Each function exposes an HTTPS endpoint. According to Google, you use HTTP functions when you want to invoke your function via an HTTP(S) request. To allow for HTTP semantics, HTTP function signatures accept HTTP-specific arguments.

Below, I have deployed a single function that accepts JSON sensor telemetry from all sensor types, and three functions for Protobuf, one for each sensor type: DHT, PIR, and DLI.

screen-shot-2019-05-13-at-8_34_49-pm

JSON Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming JSON over HTTPS payload from all sensor types. Once the request’s JWT is validated, the JSON message payload is serialized to a byte string and sent to a common Google Cloud Pub/Sub Topic. Note the JWT secret key, id, and password, and the Google Cloud Pub/Sub Topic are all stored as environment variables, local to the Cloud Functions. In my tests, the JSON-based HTTP Functions took an average of 9–18 ms to execute successfully.

import logging
import os

import jwt
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    request_json = request.get_json()
    if not request_json:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    send_message(request_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, 
                      data=bytes(str(message), 'utf-8'))

The Cloud Functions are deployed to GCP using the gcloud functions deploy CLI command (I use Jenkins to automate the deployments). I have wrapped the deploy commands into bash scripts. The script also copies over a common environment variables YAML file, consumed by the Cloud Function. Each Function has a deployment script, included in the project.

# get latest env vars file
cp -f ./../env_vars_file/env.yaml .

# deploy function
gcloud functions deploy http_json_to_pubsub \
  --runtime python37 \
  --trigger-http \
  --region us-central1 \
  --memory 256 \
  --entry-point incoming_message \
  --env-vars-file env.yaml

Using a .gcloudignore file, the gcloud functions deploy CLI command deploys three files: the cloud function (main.py), required Python packages file (requirements.txt), the environment variables file (env.yaml). Google automatically installs dependencies using the requirements.txt file.

Protobuf Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming Protobuf over HTTPS payload from DHT sensor types. Once the sensor data Protobuf message payload is received by the HTTP Function, it is deserialized to JSON and then serialized to a byte string. The byte string is then sent to a Google Cloud Pub/Sub Topic. In my tests, the Protobuf-based HTTP Functions took an average of 7–14 ms to execute successfully.

As before, note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device. It is used to parse a serialized message into its original Protobuf’s SensorDHT message type.

import logging
import os

import jwt
import sensors_pb2
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1
from google.protobuf.json_format import MessageToJson

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    data = request.get_data()
    if not data:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    sensor_pb = sensors_pb2.SensorDHT()
    sensor_pb.ParseFromString(data)
    sensor_json = MessageToJson(sensor_pb)
    send_message(sensor_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, data=bytes(message, 'utf-8'))

Cloud Pub/Sub Functions

In addition to HTTP Functions, the demonstration uses a function triggered by Google Cloud Pub/Sub Triggers. According to Google, Cloud Functions can be triggered by messages published to Cloud Pub/Sub Topics in the same GCP project as the function. The function automatically subscribes to the Topic. Below, we see that the function has automatically subscribed to iot-data-demo Cloud Pub/Sub Topic.

screen_shot_2019-05-09_at_2_41_17_pm

Sending Telemetry to MongoDB Atlas

The common Cloud Function, triggered by messages published to Cloud Pub/Sub, then sends the messages to MongoDB Atlas. There is a minimal amount of cleanup required to re-format the Cloud Pub/Sub messages to BSON (binary JSON). Interestingly, according to bsonspec.org, BSON can be com­pared to bin­ary inter­change for­mats, like Proto­col Buf­fers. BSON is more schema-less than Proto­col Buf­fers, which can give it an ad­vant­age in flex­ib­il­ity but also a slight dis­ad­vant­age in space ef­fi­ciency (BSON has over­head for field names with­in the seri­al­ized data).

The function uses the PyMongo to connect to MongoDB Atlas. According to their website, PyMongo is a Python distribution containing tools for working with MongoDB and is the recommended way to work with MongoDB from Python.

import base64
import json
import logging
import os
import pymongo

MONGODB_CONN = os.environ.get('MONGODB_CONN')
MONGODB_DB = os.environ.get('MONGODB_DB')
MONGODB_COL = os.environ.get('MONGODB_COL')


def read_message(event, context):
    message = base64.b64decode(event['data']).decode('utf-8')
    message = message.replace("'", '"')
    message = message.replace('True', 'true')
    message = json.loads(message)

    client = pymongo.MongoClient(MONGODB_CONN)
    db = client[MONGODB_DB]
    col = db[MONGODB_COL]
    col.insert_one(message)

The function responds to the published events and sends the messages to the MongoDB Atlas cluster, running in the same Region, us-central1, as the Cloud Functions and Pub/Sub Topic. Below, we see the current options available when provisioning an Atlas cluster.

screen_shot_2019-05-09_at_6_17_18_pm

MongoDB Atlas provides a rich, web-based UI for managing and monitoring MongoDB clusters, databases, collections, security, and performance.

screen_shot_2019-05-10_at_10_08_14_pm

Although Cloud Pub/Sub to Atlas function execution times are longer in duration than the HTTP functions, the latency is greatly reduced by locating the Cloud Pub/Sub Topic, Cloud Functions, and MongoDB Atlas cluster into the same GCP Region. Cross-region execution times were as high as 500-600 ms, while same-region execution times averaged 200-225 ms. Selecting a more performant Atlas cluster would likely result in even lower function execution times.

screen_shot_2019-05-10_at_10_16_49_pm

Aggregating Data with MongoDB Compass

MongoDB Compass is a free, convenient, desktop application for interacting with your MongoDB databases. You can view the collected sensor data, review message (document) schema, manage indexes, and build complex MongoDB aggregations.

screen_shot_2019-05-14_at_5_19_17_pm

screen_shot_2019-05-21_at_1_17_40_pm.pngscreen_shot_2019-05-21_at_1_15_09_pm

When performing analytics or machine learning, I primarily use MongoDB Compass to preview the captured telemetry data and build aggregation pipelines. Aggregation operations process data records and returns computed results. This feature saves a ton of time, filtering and preparing data for further analysis, visualization, and machine learning with Jupyter Notebooks.

screen_shot_2019-05-14_at_5_22_58_pm

Aggregation pipelines can be directly exported to Java, Node, C#, and Python 3. The exported aggregation pipeline code can be placed directly into your Python applications and Jupyter Notebooks.

screen_shot_2019-05-14_at_5_23_39_pm

Below, the exported aggregation pipelines code from MongoDB Compass is used to load a resultset directly into a Pandas DataFrame. This particular aggregation returns time-series DHT sensor data from a specific IoT device over a 72-hour period.

DEVICE_1 = 'rp59adf374'
pipeline = [
    {
        '$match': {
            'type': 'DHT22', 
            'device': DEVICE_1, 
            'timestamp': {
                '$gt': 1557619200,
                '$lt': 1557792000
            }
        }
    }, {
        '$project': {
            '_id': 0,
            'timestamp': 1, 
            'temperature': '$data.temperature', 
            'humidity': '$data.humidity'
        }
    }, {
        '$sort': {
            'timestamp': 1
        }
    }
]
aggResult = iot_data.aggregate(pipeline)
df1 = pd.DataFrame(list(aggResult))

MongoDB Atlas Performance

In this demonstration, from Python3-based Jupyter Notebooks, I was able to consistently query a MongoDB Atlas collection of almost 70k documents for resultsets containing 3 days (72 hours) worth of digital temperature and humidity data, roughly 10.2k documents, in an average of 825 ms. That is round trip from my local development laptop to MongoDB Atlas running on GCP, in a different geographic region.

Query times on GCP are much faster, such as when running a Notebook in JupyterLab on Google’s AI Platform, or a PySpark job with Cloud Dataproc, against Atlas. Running the same Jupyter Notebook directly on Google’s AI Platform, the same MongoDB Atlas query took an average of 450 ms versus 825 ms (1.83x faster). This was across two different GCP Regions; same Region times should be even faster.

screen-shot-2019-05-13-at-9_09_52-pm

GCP Observability

There are several choices for observing the system’s Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas. As shown above, the GCP Cloud Functions interface lets you see the individual function executions, execution times, memory usage, and active instances, over varying time intervals.

For a more detailed view of Google Cloud Functions and Google Cloud Pub/Sub, I built two custom dashboards using Stackdriver. According to Google, Stackdriver aggregates metrics, logs, and events from infrastructure, giving developers and operators a rich set of observable signals. I built a custom Stackdriver Cloud Functions dashboard (shown below) and a Cloud Pub/Sub Topics and Subscriptions dashboard.

For functions, I chose to display execution times, memory usage, the number of executions, and network egress, all in a single pane of glass, using four graphs. Below, I am using the 95th percentile average for monitoring. The 95th percentile asserts that 95% of the time, the observed values are below this amount and the remaining 5% of the time, the observed values are above that amount.

screen_shot_2019-05-10_at_10_13_37_pm

Data Analysis using Jupyter Notebooks

According to jupyter.org, the Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The widespread use of Jupyter Notebooks has grown significantly, as Big Data, AI, and ML have all experienced explosive growth.

PyCharm

JetBrains PyCharm, my favorite Python IDE, has direct integrations with Jupyter Notebooks. In fact, PyCharm’s most recent updates to the Professional Edition greatly enhanced those integrations. PyCharm offers round-trip editing in the IDE and the Jupyter Notebook web browser interface. PyCharm allows you to run and debug individual cells within the notebook. PyCharm automatically starts the Jupyter Server and appropriate kernel for the Notebook you have opened. And, one of my favorite features, PyCharm’s variable viewer tracks the current value of a variable, automatically.

screen_shot_2019-05-10_at_10_38_42_am

Below, we see the example Analytics Notebook, included in the demonstration’s project, displayed in PyCharm 19.1.2 (Professional Edition). To effectively work with Notebooks in PyCharm really requires a full-size monitor. Working on a laptop with PyCharm’s crowded Notebook UI is workable, but certainly not as effective as on a larger monitor.

screen_shot_2019-05-20_at_2_23_46_pm.png

Jupyter Notebook Server

Below, we see the same Analytics Notebook, shown above in PyCharm, opened in Jupyter Notebook Server’s web-based client interface, running locally on the development workstation. The web browser-based interface also offers a rich set of features for Notebook development.

From within the Notebook, we are able to query the data from MongoDB Atlas, again using PyMongo, and load the resultsets into Panda DataFrames. As an alternative to hard-coded values and environment variables, with Notebooks, I use the python-dotenv Python package. This package allows me to place my environment variables in a common .env file and reference them from any Notebook. The package has many options for managing environment variables.

screen_shot_2019-05-19_at_9_46_32_am.png

We can then analyze the data using a number of common frameworks, including PandasMatplotlib, SciPy, PySpark, and NumPy, to name but a few. Below, we see time series data from four different sensors, on the same IoT device. Viewing the data together, we can study the causal effect of one environment variable on another, such as the impact of light on temperature or humidity.

screen_shot_2019-05-23_at_5_25_44_pm

Below, we can use histograms to visualize temperature frequencies for
intervals, over time, for a given device location.

screen-shot-2019-05-13-at-9_13_35-pm

Machine Learning using Jupyter Notebooks

In addition to data analytics, we can use Jupyter Notebooks with tools such as scikit-learn to build machine learning models based on our sensor telemetry.  Scikit-learn is a set of machine learning tools in Python, built on NumPy, SciPy, and matplotlib. Below, I have used JupyterLab on Google’s AI Platform and scikit-learn to build several models, based on the sensor data.

screen_shot_2019-05-19_at_12_25_45_pm

screen_shot_2019-05-19_at_12_26_45_pm

Using scikit-learn, we can build models to predict such things as which IoT device generated a specific temperature and humidity reading, or the temperature and humidity, given the time of day, device location, and external environment variables, or discover anomalies in the sensor telemetry.

Scikit-learn makes it easy to construct randomized training and test datasets, to build models, using data from multiple IoT devices, as shown below.

screen_shot_2019-05-19_at_12_27_39_pm

The project includes a Jupyter Notebook that demonstrates how to build several ML models using sensor data. Examples of supervised learning algorithms used to build the classification models in this demonstration include Support Vector Machine (SVM), k-nearest neighbors (k-NN), and Random Forest Classifier.

screen_shot_2019-05-19_at_12_30_38_pm

Having data from multiple sensors, we are able to enrich the ML models by adding additional categorical (discrete) features to our training data. For example, we could look at the effect of light, motion, and time of day on temperature and humidity.

screen_shot_2019-05-19_at_12_29_25_pm

Conclusion

Hopefully, this post has demonstrated how to efficiently collect telemetry data from IoT devices using Google Protocol Buffers over HTTPS, serverless Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas, all on the Google Cloud Platform. Once captured, the telemetry data was easily aggregated and analyzed using common tools, such as MongoDB Compass and Jupyter Notebooks. Further, we used the data and tools to build machine learning models for prediction and anomaly detection.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

Image: everythingpossible © 123RF.com

, , , , , , , , , ,

1 Comment

Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 2

** This post has been rewritten and updated in May 2021 **

Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we’ve been exploring one possible solution to this challenge, using Apache Kafka and the model of eventual consistency. In Part One, we examined the online storefront domain, the storefront’s microservices, and the system’s state change event message flows.

Part Two

In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.

Kafka-Eventual-Cons-Swarm

Source code for deploying the Dockerized components of the online storefront, shown in this post, is available on GitHub. All Docker Images are available on Docker Hub. I have chosen the wurstmeister/kafka-docker version of Kafka, available on Docker Hub; it has 580+ stars and 10M+ pulls on Docker Hub. This version of Kafka works well, as long as you run it within a Docker Swarm, locally.

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Deployment Options

For simplicity, I’ve used Docker’s native Docker Swarm Mode to support the deployed online storefront. Docker requires minimal configuration as opposed to other CaaS platforms. Usually, I would recommend Minikube for local development if the final destination of the storefront were Kubernetes in Production (AKS, EKS, or GKE). Alternatively, if the final destination of the storefront was Red Hat OpenShift in Production, I would recommend Minishift for local development.

Docker Deployment

We will break up our deployment into two parts. First, we will deploy everything except our services. We will allow Kafka, MongoDB, Eureka, and the other components to start up fully. Afterward, we will deploy the three online storefront services. The storefront-kafka-docker project on Github contains two Docker Compose files, which are divided between the two tasks.

The middleware Docker Compose file (gist).

version: '3.2'
services:
zuul:
image: garystafford/storefront-zuul:latest
expose:
- "8080"
ports:
- "8080:8080/tcp"
depends_on:
- kafka
- mongo
- eureka
hostname: zuul
environment:
# LOGGING_LEVEL_ROOT: DEBUG
RIBBON_READTIMEOUT: 3000
RIBBON_SOCKETTIMEOUT: 3000
ZUUL_HOST_CONNECT_TIMEOUT_MILLIS: 3000
ZUUL_HOST_CONNECT_SOCKET_MILLIS: 3000
networks:
- kafka-net
eureka:
image: garystafford/storefront-eureka:latest
expose:
- "8761"
ports:
- "8761:8761/tcp"
hostname: eureka
networks:
- kafka-net
mongo:
image: mongo:latest
command: --smallfiles
# expose:
# - "27017"
ports:
- "27017:27017/tcp"
hostname: mongo
networks:
- kafka-net
mongo_express:
image: mongo-express:latest
expose:
- "8081"
ports:
- "8081:8081/tcp"
hostname: mongo_express
networks:
- kafka-net
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181/tcp"
hostname: zookeeper
networks:
- kafka-net
kafka:
image: wurstmeister/kafka:latest
depends_on:
- zookeeper
# expose:
# - "9092"
ports:
- "9092:9092/tcp"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_CREATE_TOPICS: "accounts.customer.change:1:1,fulfillment.order.change:1:1,orders.order.fulfill:1:1"
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DELETE_TOPIC_ENABLE: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
hostname: kafka
networks:
- kafka-net
kafka_manager:
image: hlebalbau/kafka-manager:latest
ports:
- "9000:9000/tcp"
expose:
- "9000"
depends_on:
- kafka
environment:
ZK_HOSTS: "zookeeper:2181"
APPLICATION_SECRET: "random-secret"
command: -Dpidfile.path=/dev/null
hostname: kafka_manager
networks:
- kafka-net
networks:
kafka-net:
driver: overlay

The services Docker Compose file (gist).

version: '3.2'
services:
accounts:
image: garystafford/storefront-accounts:latest
depends_on:
- kafka
- mongo
hostname: accounts
# environment:
# LOGGING_LEVEL_ROOT: DEBUG
networks:
- kafka-net
orders:
image: garystafford/storefront-orders:latest
depends_on:
- kafka
- mongo
- eureka
hostname: orders
# environment:
# LOGGING_LEVEL_ROOT: DEBUG
networks:
- kafka-net
fulfillment:
image: garystafford/storefront-fulfillment:latest
depends_on:
- kafka
- mongo
- eureka
hostname: fulfillment
# environment:
# LOGGING_LEVEL_ROOT: DEBUG
networks:
- kafka-net
networks:
kafka-net:
driver: overlay

In the storefront-kafka-docker project, there is a shell script, stack_deploy_local.sh. This script will execute both Docker Compose files in succession, with a pause in between. You may need to adjust the timing for your own system (gist).

#!/bin/sh
# Deploys the storefront Docker stack
# usage: sh ./stack_deploy_local.sh
set -e
docker stack deploy -c docker-compose-middleware.yml storefront
echo "Starting the stack: middleware...pausing for 30 seconds..."
sleep 30
docker stack deploy -c docker-compose-services.yml storefront
echo "Starting the stack: services...pausing for 10 seconds..."
sleep 10
docker stack ls
docker stack services storefront
docker container ls
echo "Script completed..."
echo "Services may take up to several minutes to start, fully..."

Start by running docker swarm init. This command will initialize a Docker Swarm. Next, execute the stack deploy script, using an sh ./stack_deploy_local.sh command. The script will deploy a new Docker Stack, within the Docker Swarm. The Docker Stack will hold all storefront components, deployed as individual Docker containers. The stack is deployed within its own isolated Docker overlay networkkafka-net.

Note that we are not using host-based persistent storage for this local development demo. Destroying the Docker stack or the individual Kafka, Zookeeper, or MongoDB Docker containers will result in a loss of data.

stack-deploy

Before completion, the stack deploy script runs docker stack ls command, followed by a docker stack services storefront command. You should see one stack, named storefront, with ten services. You should also see each of the ten services has 1/1 replicas running, indicating everything has started or is starting correctly, without failure. Failure would be reflected here as a service having 0/1 replicas.

docker-stack-ls

Before completion, the stack deploy script also runs docker container ls command. You should observe each of the ten running containers (‘services’ in the Docker stack), along with their instance names and ports.

docker-container-ls

There is also a shell script, stack_delete_local.sh, which will issue a docker stack rm storefront command to destroy the stack when you are done.

Using the names of the storefront’s Docker containers, you can check the start-up logs of any of the components, using the docker logs command.

docker-logs

Testing the Stack

With the storefront stack deployed, we need to confirm that all the components have started correctly and are communicating with each other. To accomplish this, I’ve written a simple Python script, refresh.py. The refresh script has multiple uses. It deletes any existing storefront service MongoDB databases. It also deletes any existing Kafka topics; I call the Kafka Manager’s API to accomplish this. We have no databases or topics since our stack was just created. However, if you are actively developing your data models, you will likely want to purge the databases and topics regularly (gist).

#!/usr/bin/env python3
# Delete (3) MongoDB databases, (3) Kafka topics,
# create sample data by hitting Zuul API Gateway endpoints,
# and return MongoDB documents as verification.
# usage: python3 ./refresh.py
from pprint import pprint
from pymongo import MongoClient
import requests
import time
client = MongoClient('mongodb://localhost:27017/')
def main():
delete_databases()
delete_topics()
create_sample_data()
get_mongo_doc('accounts', 'customer.accounts')
get_mongo_doc('orders', 'customer.orders')
get_mongo_doc('fulfillment', 'fulfillment.requests')
def delete_databases():
dbs = ['accounts', 'orders', 'fulfillment']
for db in dbs:
client.drop_database(db)
print('MongoDB dropped: ' + db)
dbs = client.database_names()
print('Reamining databases:')
print(dbs)
print('\n')
def delete_topics():
# call Kafka Manager API
topics = ['accounts.customer.change',
'orders.order.fulfill',
'fulfillment.order.change']
for topic in topics:
kafka_manager_url = 'http://localhost:9000/clusters/dev/topics/delete?t=' + topic
r = requests.post(kafka_manager_url, data={'topic': topic})
time.sleep(3)
print('Kafka topic deleted: ' + topic)
print('\n')
def create_sample_data():
sample_urls = [
'http://localhost:8080/accounts/customers/sample',
'http://localhost:8080/orders/customers/sample/orders',
'http://localhost:8080/orders/customers/sample/fulfill',
'http://localhost:8080/fulfillment/fulfillments/sample/process',
'http://localhost:8080/fulfillment/fulfillments/sample/ship',
'http://localhost:8080/fulfillment/fulfillments/sample/in-transit',
'http://localhost:8080/fulfillment/fulfillments/sample/receive']
for sample_url in sample_urls:
r = requests.get(sample_url)
print(r.text)
time.sleep(5)
print('\n')
def get_mongo_doc(db_name, collection_name):
db = client[db_name]
collection = db[collection_name]
pprint(collection.find_one())
print('\n')
if __name__ == "__main__":
main()
view raw refresh.py hosted with ❤ by GitHub

Next, the refresh script calls a series of RESTful HTTP endpoints, in a specific order, to create sample data. Our three storefront services each expose different endpoints. Different /sample endpoints create sample customers, orders, order fulfillment requests, and shipping notifications. The create sample data endpoints include, in order:

  1. Sample Customer: /accounts/customers/sample
  2. Sample Orders: /orders/customers/sample/orders
  3. Sample Fulfillment Requests: /orders/customers/sample/fulfill
  4. Sample Processed Order Events: /fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Events: /fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Events: /fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Events: /fulfillment/fulfillment/sample/receive

You can create data on your own by POSTing to the exposed CRUD endpoints on each service. However, given the complex data objects required in the request payloads, it is too time-consuming for this demo.

To execute the script, use a python3 ./refresh.py command. I am using Python 3 in the demo, but the script should also work with Python 2.x if you change shebang.

refresh-script

If everything was successful, the script returns one document from each of the three storefront service’s MongoDB database collections. A result of ‘None’ for any of the MongoDB documents usually indicates one of the earlier commands failed. Given an abnormally high response latency, due to the load of the ten running containers on my laptop, I had to increase the Zuul/Ribbon timeouts.

Observing the System

We should now have the online storefront Docker stack running, three MongoDB databases created and populated with sample documents (data), and three Kafka topics, which have messages in them. Based on the fact we saw database documents printed out with our refresh script, we know the topics were used to pass data between the message producing and message consuming services.

In most enterprise environments, a developer may not have the access, nor the operational knowledge to interact with Kafka or MongoDB from within a container, on the command line. So how else can we interact with the system?

Kafka Manager

Kafka Manager gives us the ability to interact with Kafka via a convenient browser-based user interface. For this demo, the Kafka Manager UI is available on the default port 9000.

kafka_manager_00

To make Kafka Manager useful, define the Kafka cluster. The Cluster Name is up to you. The Cluster Zookeeper Host should be zookeeper:2181, for our demo.

kafka_manager_01

Kafka Manager gives us useful insights into many aspects of our simple, single-broker cluster. You should observe three topics, created during the deployment of Kafka.

kafka_manager_02

Kafka Manager is an appealing alternative, as opposed to connecting with the Kafka container, with a docker exec command, to interact with Kafka. A typical use case might be deleting a topic or adding partitions to a topic. We can also see which Consumers are consuming which topics, from within Kafka Manager.

kafka_manager_03

Mongo Express

Similar to Kafka Manager, Mongo Express gives us the ability to interact with Kafka via a user interface. For this demo, the Mongo Express browser-based user interface is available on the default port 8081. The initial view displays each of the existing databases. Note our three service’s databases, including accounts, orders, and fulfillment.

mongo-express-01

Drilling into an individual database, we can view each of the database’s collections. Digging in further, we can interact with individual database collection documents.

mongo-express-02

We may even edit and save the documents.

mongo-express-03

SpringFox and Swagger

Each of the storefront services also implements SpringFox, the automated JSON API documentation for API’s built with Spring. With SpringFox, each service exposes a rich Swagger UI. The Swagger UI allows us to interact with service endpoints.

Since each service exposes its own Swagger interface, we must access them through the Zuul API Gateway on port 8080. In our demo environment, the Swagger browser-based user interface is accessible at /swagger-ui.html. Below is a fully self-documented Orders service API, as seen through the Swagger UI.

I believe there are still some incompatibilities with the latest SpringFox release and Spring Boot 2, which prevents Swagger from showing the default Spring Data REST CRUD endpoints. Currently, you only see the API  endpoints you explicitly declare in your Controller classes.

swagger-ui-1

The service’s data models (POJOs) are also exposed through the Swagger UI by default. Below we see the Orders service’s models.

swagger-ui-3

The Swagger UI allows you to drill down into the complex structure of the models, such as the CustomerOrder entity, exposing each of the entity’s nested data objects.

swagger-ui-2

Spring Cloud Netflix Eureka

This post does not cover the use of Eureka or Zuul. Eureka gives us further valuable insight into our storefront system. Eureka is our systems service registry and provides load-balancing for our services if we have multiple instances.

For this demo, the Eureka browser-based user interface is available on the default port 8761. Within the Eureka user interface, we should observe the three storefront services and Zuul, the API Gateway, registered with Eureka. If we had more than one instance of each service, we would see all of them listed here.

eureka-ui

Although of limited use in a local environment, we can observe some general information about our host.

eureka-ui-02

Interacting with the Services

The three storefront services are fully functional Spring Boot / Spring Data REST / Spring HATEOAS-enabled applications. Each service exposes a rich set of CRUD endpoints for interacting with the service’s data entities. Additionally, each service includes Spring Boot Actuator. Actuator exposes additional operational endpoints, allowing us to observe the running services. Again, this post is not intended to be a demonstration of Spring Boot or Spring Boot Actuator.

Using an application such as Postman, we can interact with our service’s RESTful HTTP endpoints. As shown below, we are calling the Account service’s customers resource. The Accounts request is proxied through the Zuul API Gateway.

postman

The above Postman Storefront Collection and Postman Environment are both exported and saved with the project.

Some key endpoints to observe the entities that were created using Event-Carried State Transfer are shown below. They assume you are using localhost as a base URL.

References

Links to my GitHub projects for this post

Some additional references I found useful while authoring this post and the online storefront code:

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , ,

1 Comment

Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1

** This post has been rewritten and updated in May 2021 **

Given a modern distributed system, composed of multiple microservices, each possessing a sub-set of the domain’s aggregate data they need to perform their functions autonomously, we will almost assuredly have some duplication of data. Given this duplication, how do we maintain data consistency? In this two-part post, we will explore one possible solution to this challenge, using Apache Kafka and the model of eventual consistency.

I previously covered the topic of eventual consistency in a distributed system, using RabbitMQ, in the post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ. This post is featured on Pivotal’s RabbitMQ website.

Introduction

To ground the discussion, let’s examine a common example of the online storefront. Using a domain-driven design (DDD) approach, we would expect our problem domain, the online storefront, to be composed of multiple bounded contexts. Bounded contexts would likely include Shopping, Customer Service, Marketing, Security, Fulfillment, Accounting, and so forth, as shown in the context map, below.

mid-map-final-03

Given this problem domain, we can assume we have the concept of the Customer. Further, the unique properties that define a Customer are likely to be spread across several bounded contexts. A complete view of a Customer would require you to aggregate data from multiple contexts. For example, the Accounting context may be the system of record (SOR) for primary customer information, such as the customer’s name, contact information, contact preferences, and billing and shipping addresses. Marketing may possess additional information about the customer’s use of the store’s loyalty program. Fulfillment may maintain a record of all the orders shipped to the customer. Security likely holds the customer’s access credentials and privacy settings.

Below, Customer data objects are shown in yellow. Orange represents logical divisions of responsibility within each bounded context. These divisions will manifest themselves as individual microservices in our online storefront example. mid-map-final-01

Distributed Data Consistency

If we agree that the architecture of our domain’s data model requires some duplication of data across bounded contexts, or even between services within the same contexts, then we must ensure data consistency. Take, for example, a change in a customer’s address. The Accounting context is the system of record for the customer’s addresses. However, to fulfill orders, the Shipping context might also need to maintain the customer’s address. Likewise, the Marketing context, who is responsible for direct-mail advertising, also needs to be aware of the address change, and update its own customer records.

If a piece of shared data is changed, then the party making the change should be responsible for communicating the change, without the expectation of a response. They are stating a fact, not asking a question. Interested parties can choose if, and how, to act upon the change notification. This decoupled communication model is often described as Event-Carried State Transfer, as defined by Martin Fowler, of ThoughtWorks, in his insightful post, What do you mean by “Event-Driven”?. A change to a piece of data can be thought of as a state change event. Coincidentally, Fowler also uses a customer’s address change as an example of Event-Carried State Transfer. The Event-Carried State Transfer Pattern is also detailed by fellow ThoughtWorker and noted Architect, Graham Brooks.

Consistency Strategies

Multiple architectural approaches could be taken to solve for data consistency in a distributed system. For example, you could use a single relational database to persist all data, avoiding the distributed data model altogether. Although I would argue, using a single database just turned your distributed system back into a monolith.

You could use Change Data Capture (CDC) to track changes to each database and send a record of those changes to Kafka topics for consumption by interested parties. Kafka Connect is an excellent choice for this, as explained in the article, No More Silos: How to Integrate your Databases with Apache Kafka and CDC, by Robin Moffatt of Confluent.

Alternately, we could use a separate data service, independent of the domain’s other business services, whose sole role is to ensure data consistency across domains. If messages are persisted in Kafka, the service have the added ability to provide data auditability through message replay. Of course, another set of services adds additional operational complexity.

Storefront Example

In this post, our online storefront’s services will be built using Spring Boot. Thus, we will ensure the uniformity of distributed data by using a Publish/Subscribe model with the Spring for Apache Kafka Project. When a piece of data is changed by one Spring Boot service, if appropriate, that state change will trigger an event, which will be shared with other services using Kafka topics.

We will explore different methods of leveraging Spring Kafka to communicate state change events, as they relate to the specific use case of a customer placing an order through the online storefront. An abridged view of the storefront ordering process is shown in the diagram below. The arrows represent the exchange of data. Kafka will serve as a means of decoupling services from each one another, while still ensuring the data is exchanged.

order-process-flow

Given the use case of placing an order, we will examine the interactions of three services, the Accounts service within the Accounting bounded context, the Fulfillment service within the Fulfillment context, and the Orders service within the Order Management context. We will examine how the three services use Kafka to communicate state changes (changes to their data) to each other, in a decoupled manner.

The diagram below shows the event flows between sub-systems discussed in the post. The numbering below corresponds to the numbering in the ordering process above. We will look at event flows 2, 5, and 6. We will simulate event flow 3, the order being created by the Shopping Cart service. Kafka Producers may also be Consumers within our domain.

kafka-data-flow-diagram

Below is a view of the online storefront, through the lens of the major sub-systems involved. Although the diagram is overly simplified, it should give you the idea of where Kafka, and Zookeeper, Kafka’s cluster manager, might sit in a typical, highly-available, microservice-based, distributed, application platform.

kafka-based-systems-diagram

This post will focus on the storefront’s services, database, and messaging sub-systems.

full-system-partial-view.png

Storefront Microservices

First, we will explore the functionality of each of the three microservices. We will examine how they share state change events using Kafka. Each storefront service is built using Spring Boot 2.0 and Gradle. Each Spring Boot service includes Spring Data RESTSpring Data MongoDBSpring for Apache KafkaSpring Cloud SleuthSpringFox, Spring Cloud Netflix Eureka, and Spring Boot Actuator. For simplicity, Kafka Streams and the use of Spring Cloud Stream is not part of this post.

Code samples in this post are displayed as Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Accounts Service

The Accounts service is responsible for managing basic customer information, such as name, contact information, addresses, and credit cards for purchases. A partial view of the data model for the Accounts service is shown below. This cluster of domain objects represents the Customer Account Aggregate.

accounts-diagram

The Customer class, the Accounts service’s primary data entity, is persisted in the Accounts MongoDB database. A Customer, represented as a BSON document in the customer.accounts database collection, looks as follows (gist).

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"creditCards": [{
"type": "PRIMARY",
"description": "VISA",
"number": "1234-6789-0000-0000",
"expiration": "6/19",
"nameOnCard": "John S. Doe"
},
{
"type": "ALTERNATE",
"description": "Corporate American Express",
"number": "9999-8888-7777-6666",
"expiration": "3/20",
"nameOnCard": "John Doe"
}
],
"_class": "com.storefront.model.Customer"
}

Along with the primary Customer entity, the Accounts service contains a CustomerChangeEvent class. As a Kafka producer, the Accounts service uses the CustomerChangeEvent domain event object to carry state information about the client the Accounts service wishes to share when a new customer is added, or a change is made to an existing customer. The CustomerChangeEvent object is not an exact duplicate of the Customer object. For example, the CustomerChangeEvent object does not share sensitive credit card information with other message Consumers (the CreditCard data object).

accounts-events-diagram.png

Since the CustomerChangeEvent domain event object is not persisted in MongoDB, to examine its structure, we can look at its JSON message payload in Kafka. Note the differences in the data structure between the Customer document in MongoDB and the Kafka CustomerChangeEvent message payload (gist).

{
"id": "5b189af9a8d05613315b0212",
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}, {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}]
}

For simplicity, we will assume other services do not make changes to the customer’s name, contact information, or addresses. It is the sole responsibility of the Accounts service.

Source code for the Accounts service is available on GitHub.

Orders Service

The Orders service is responsible for managing a customer’s past and current orders; it is the system of record for the customer’s order history. A partial view of the data model for the Orders service is shown below. This cluster of domain objects represents the Customer Orders Aggregate.

orders-diagram

The CustomerOrders class, the Order service’s primary data entity, is persisted in MongoDB. This entity contains a history of all the customer’s orders (Order data objects), along with the customer’s name, contact information, and addresses. In the Orders MongoDB database, a CustomerOrders, represented as a BSON document in the customer.orders database collection, looks as follows (gist).

{
"_id": ObjectId("5b189af9a8d05613315b0212"),
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"addresses": [{
"type": "BILLING",
"description": "My cc billing address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
{
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
}
],
"orders": [{
"guid": "df78784f-4d1d-48ad-a3e3-26a4fe7317a4",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "PROCESSING"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "COMPLETED"
}
],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": "11.99"
},
"quantity": 2
},
{
"product": {
"guid": "d01fde07-7c24-49c5-a5f1-bc2ce1f14c48",
"title": "Red Widget",
"description": "Reliable Red Widget",
"price": "3.99"
},
"quantity": 3
}
]
},
{
"guid": "29692d7f-3ca5-4684-b5fd-51dbcf40dc1e",
"orderStatusEvents": [{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "CREATED"
},
{
"timestamp": NumberLong("1528339278058"),
"orderStatusType": "APPROVED"
}
],
"orderItems": [{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget",
"description": "Amazing Yellow Widget",
"price": "5.99"
},
"quantity": 1
}]
}
],
"_class": "com.storefront.model.CustomerOrders"
}

Along with the primary CustomerOrders entity, the Orders service contains the FulfillmentRequestEvent class. As a Kafka producer, the Orders service uses the FulfillmentRequestEvent domain event object to carry state information about an approved order, ready for fulfillment, which it sends to Kafka for consumption by the Fulfillment service. TheFulfillmentRequestEvent object only contains the information it needs to share. In our example, it shares a single Order, along with the customer’s name, contact information, and shipping address.

orders-event-diagram

Since the FulfillmentRequestEvent domain event object is not persisted in MongoDB, we can look at it’s JSON message payload in Kafka. Again, note the structural differences between the CustomerOrders document in MongoDB and the FulfillmentRequestEvent message payload in Kafka (gist).

{
"timestamp": 1528334218821,
"name": {
"title": "Mr.",
"firstName": "John",
"middleName": "S.",
"lastName": "Doe",
"suffix": "Jr."
},
"contact": {
"primaryPhone": "555-666-7777",
"secondaryPhone": "555-444-9898",
"email": "john.doe@internet.com"
},
"address": {
"type": "SHIPPING",
"description": "My home address",
"address1": "123 Oak Street",
"address2": null,
"city": "Sunrise",
"state": "CA",
"postalCode": "12345-6789"
},
"order": {
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvents": [{
"timestamp": 1528333926586,
"orderStatusType": "CREATED",
"note": null
}, {
"timestamp": 1528333926586,
"orderStatusType": "APPROVED",
"note": null
}],
"orderItems": [{
"product": {
"guid": "7f3c9c22-3c0a-47a5-9a92-2bd2e23f6e37",
"title": "Green Widget",
"description": "Gorgeous Green Widget",
"price": 11.99
},
"quantity": 5
}]
}
}

Source code for the Orders service is available on GitHub.

Fulfillment Service

Lastly, the Fulfillment service is responsible for fulfilling orders. A partial view of the data model for the Fulfillment service is shown below. This cluster of domain objects represents the Fulfillment Aggregate.

fulfillment-diagram

The Fulfillment service’s primary entity, the Fulfillment class, is persisted in MongoDB. This entity contains a single Order data object, along with the customer’s name, contact information, and shipping address. The Fulfillment service also uses the Fulfillment entity to store the latest shipping event, such as ‘Shipped’, ‘In Transit’, and ‘Received’. The customer’s name, contact information, and shipping addresses are managed by the Accounts service, replicated to the Orders service, and passed to the Fulfillment service, via Kafka, using the FulfillmentRequestEvent entity.

In the Fulfillment MongoDB database, a Fulfillment object, represented as a BSON document in the fulfillment.requests database collection, looks as follows (gist).

{
"_id": ObjectId("5b1bf1b8a8d0562de5133d64"),
"timestamp": NumberLong("1528553706260"),
"name": {
"title": "Ms.",
"firstName": "Susan",
"lastName": "Blackstone"
},
"contact": {
"primaryPhone": "433-544-6555",
"secondaryPhone": "223-445-6767",
"email": "susan.m.blackstone@emailisus.com"
},
"address": {
"type": "SHIPPING",
"description": "Home Sweet Home",
"address1": "33 Oak Avenue",
"city": "Nowhere",
"state": "VT",
"postalCode": "444556-9090"
},
"order": {
"guid": "2932a8bf-aa9c-4539-8cbf-133a5bb65e44",
"orderStatusEvents": [{
"timestamp": NumberLong("1528558453686"),
"orderStatusType": "RECEIVED"
}],
"orderItems": [{
"product": {
"guid": "4efe33a1-722d-48c8-af8e-7879edcad2fa",
"title": "Purple Widget"
},
"quantity": 2
},
{
"product": {
"guid": "b5efd4a0-4eb9-4ad0-bc9e-2f5542cbe897",
"title": "Blue Widget"
},
"quantity": 5
},
{
"product": {
"guid": "a9d5a5c7-4245-4b4e-b1c3-1d3968f36b2d",
"title": "Yellow Widget"
},
"quantity": 2
}
]
},
"shippingMethod": "Drone",
"_class": "com.storefront.model.Fulfillment"
}

Along with the primary Fulfillment entity, the Fulfillment service has an OrderStatusChangeEvent class. As a Kafka producer, the Fulfillment service uses the OrderStatusChangeEvent domain event object to carry state information about an order’s fulfillment statuses. The OrderStatusChangeEvent object contains the order’s UUID, a timestamp, shipping status, and an option for order status notes.

fulfillment-event-diagram

Since the OrderStatusChangeEvent domain event object is not persisted in MongoDB, to examine it, we can again look at it’s JSON message payload in Kafka (gist).

{
"guid": "facb2d0c-4ae7-4d6c-96a0-293d9c521652",
"orderStatusEvent": {
"timestamp": 1528334452746,
"orderStatusType": "PROCESSING",
"note": null
}
}

Source code for the Fulfillment service is available on GitHub.

State Change Event Messaging Flows

There are three state change event messaging flows demonstrated in this post.

  1. Change to a Customer triggers an event message by the Accounts service;
  2. Order Approved triggers an event message by the Orders service;
  3. Change to the status of an Order triggers an event message by the Fulfillment service;

Each of these state change event messaging flows follow the exact same architectural pattern on both the Producer and Consumer sides of the Kafka topic.

kafka-event-flow

Let’s examine each state change event messaging flow and the code behind them.

Customer State Change

When a new Customer entity is created or updated by the Accounts service, a CustomerChangeEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service eventually has a record of all customers who may place an order. It can be said that the Order’s Customer contact information is eventually consistent with the Account’s Customer contact information, by way of Kafka.

kafka-topic-01

There are different methods to trigger a message to be sent to Kafka, For this particular state change, the Accounts service uses a listener. The listener class, which extends AbstractMongoEventListener, listens for an onAfterSave event for a Customer entity (gist).

@Slf4j
@Controller
public class AfterSaveListener extends AbstractMongoEventListener<Customer> {
@Value("${spring.kafka.topic.accounts-customer}")
private String topic;
private Sender sender;
@Autowired
public AfterSaveListener(Sender sender) {
this.sender = sender;
}
@Override
public void onAfterSave(AfterSaveEvent<Customer> event) {
log.info("onAfterSave event='{}'", event);
Customer customer = event.getSource();
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent();
customerChangeEvent.setId(customer.getId());
customerChangeEvent.setName(customer.getName());
customerChangeEvent.setContact(customer.getContact());
customerChangeEvent.setAddresses(customer.getAddresses());
sender.send(topic, customerChangeEvent);
}
}

The listener handles the event by instantiating a new CustomerChangeEvent with the Customer’s information and passes it to the Sender class (gist).

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate;
public void send(String topic, CustomerChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The configuration of the Sender is handled by the SenderConfig class. This Spring Kafka producer configuration class uses Spring Kafka’s JsonSerializer class to serialize the CustomerChangeEvent object into a JSON message payload (gist).

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Since message order is critical to ensure changes to a Customer’s information are processed in order, all messages are sent to a single topic with a single partition.

kafka-events-01.png

The Orders service’s Receiver class consumes the CustomerChangeEvent messages, produced by the Accounts service (gist).

[gust]cc3c4e55bc291e5435eccdd679d03015[/gist]

The Orders service’s Receiver class is configured differently, compared to the Fulfillment service. The Orders service receives messages from multiple topics, each containing messages with different payload structures. Each type of message must be deserialized into different object types. To accomplish this, the ReceiverConfig class uses Apache Kafka’s StringDeserializer. The Orders service’s ReceiverConfig references Spring Kafka’s AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching (gist).

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). That method accepts a specific object type as input, denoting the object type the message payload needs to be deserialized into. In this way, we can receive multiple message payloads, serialized from multiple object types, and successfully deserialize each type into the correct data object. In the case of a CustomerChangeEvent, the Orders service calls the receiveCustomerOrder method to consume the message and properly deserialize it.

For all services, a Spring application.yaml properties file, in each service’s resources directory, contains the Kafka configuration (gist).

server:
port: 8080
spring:
main:
allow-bean-definition-overriding: true
application:
name: orders
data:
mongodb:
uri: mongodb://mongo:27017/orders
kafka:
bootstrap-servers: kafka:9092
topic:
accounts-customer: accounts.customer.change
orders-order: orders.order.fulfill
fulfillment-order: fulfillment.order.change
consumer:
group-id: orders
auto-offset-reset: earliest
zipkin:
sender:
type: kafka
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: INFO
---
spring:
config:
activate:
on-profile: local
data:
mongodb:
uri: mongodb://localhost:27017/orders
kafka:
bootstrap-servers: localhost:9092
server:
port: 8090
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG
---
spring:
config:
activate:
on-profile: confluent
server:
port: 8080
logging:
level:
root: INFO
---
server:
port: 8080
spring:
config:
activate:
on-profile: minikube
data:
mongodb:
uri: mongodb://mongo.dev:27017/orders
kafka:
bootstrap-servers: kafka-cluster.dev:9092
management:
endpoints:
web:
exposure:
include: '*'
logging:
level:
root: DEBUG

 Order Approved for Fulfillment

When the status of the Order in a CustomerOrders entity is changed to ‘Approved’ from ‘Created’, a FulfillmentRequestEvent message is produced and sent to the accounts.customer.change Kafka topic. This message is retrieved and consumed by the Fulfillment service. This is how the Fulfillment service has a record of what Orders are ready for fulfillment.

Kafka-Eventual-Cons Order Flow 2

Since we did not create the Shopping Cart service for this post, the Orders service simulates an order approval event, containing an approved order, being received, through Kafka, from the Shopping Cart Service. To simulate order creation and approval, the Orders service can create a random order history for each customer. Further, the Orders service can scan all customer orders for orders that contain both a ‘Created’ and ‘Approved’ order status. This state is communicated as an event message to Kafka for all orders matching those criteria. A FulfillmentRequestEvent is produced, which contains the order to be fulfilled, and the customer’s contact and shipping information. The FulfillmentRequestEvent is passed to the Sender class (gist).

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate;
public void send(String topic, FulfillmentRequestEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The configuration of the Sender class is handled by the SenderConfig class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the FulfillmentRequestEvent object into a JSON message payload (gist).

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, FulfillmentRequestEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, FulfillmentRequestEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender class uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Since message order is not critical messages could be sent to a topic with multiple partitions if the volume of messages required it.

kafka-events-02

The Fulfillment service’s Receiver class consumes the FulfillmentRequestEvent from the Kafka topic and instantiates a Fulfillment object, containing the data passed in the FulfillmentRequestEvent message payload. This includes the order to be fulfilled and the customer’s contact and shipping information (gist).

@Slf4j
@Component
public class Receiver {
@Autowired
private FulfillmentRepository fulfillmentRepository;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.orders-order}")
public void receive(FulfillmentRequestEvent fulfillmentRequestEvent) {
log.info("received payload='{}'", fulfillmentRequestEvent.toString());
latch.countDown();
Fulfillment fulfillment = new Fulfillment();
fulfillment.setId(fulfillmentRequestEvent.getId());
fulfillment.setTimestamp(fulfillmentRequestEvent.getTimestamp());
fulfillment.setName(fulfillmentRequestEvent.getName());
fulfillment.setContact(fulfillmentRequestEvent.getContact());
fulfillment.setAddress(fulfillmentRequestEvent.getAddress());
fulfillment.setOrder(fulfillmentRequestEvent.getOrder());
fulfillmentRepository.save(fulfillment);
}
}
view raw Receiver.java hosted with ❤ by GitHub

The Fulfillment service’s ReceiverConfig class defines the DefaultKafkaConsumerFactory and ConcurrentKafkaListenerContainerFactory, responsible for deserializing the message payload from JSON into a FulfillmentRequestEvent object (gist).

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, FulfillmentRequestEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(FulfillmentRequestEvent.class));
}
@Override
@Bean
public ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, FulfillmentRequestEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Fulfillment Order Status State Change

When the status of the Order in a Fulfillment entity is changed to anything other than ‘Approved’, an OrderStatusChangeEvent message is produced by the Fulfillment service and sent to the fulfillment.order.change Kafka topic. This message is retrieved and consumed by the Orders service. This is how the Orders service tracks all CustomerOrder lifecycle events from the initial ‘Created’ status to the final happy path ‘Received’ status.

kafka-topic-03

The Fulfillment service exposes several endpoints through the FulfillmentController class, which are simulate a change the status of an order. They allow an order status to be changed from ‘Approved’ to ‘Processing’, to ‘Shipped’, to ‘In Transit’, and to ‘Received’. This change is applied to all orders that meet the criteria.

Each of these state changes triggers a change to the Fulfillment document in MongoDB. Each change also generates an Kafka message, containing the OrderStatusChangeEvent in the message payload. This is handled by the Fulfillment service’s Sender class.

Note in this example, these two events are not handled in an atomic transaction. Either the updating the database or the sending of the message could fail independently, which would cause a loss of data consistency. In the real world, we must ensure both these disparate actions succeed or fail as a single transaction, to ensure data consistency (gist).

@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;
public void send(String topic, OrderStatusChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
view raw Sender.java hosted with ❤ by GitHub

The configuration of the Sender class is handled by the SenderConfig class. This Spring Kafka producer configuration class uses the Spring Kafka’s JsonSerializer class to serialize the OrderStatusChangeEvent object into a JSON message payload. This class is almost identical to the SenderConfig class in the Orders and Accounts services (gist).

@Configuration
@EnableKafka
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}

The Sender class uses a KafkaTemplate to send the message to the Kafka topic, as shown below. Message order is not critical since a timestamp is recorded, which ensures the proper sequence of order status events can be maintained. Messages could be sent to a topic with multiple partitions if the volume of messages required it.

kafka-events-03

The Orders service’s Receiver class is responsible for consuming the OrderStatusChangeEvent message, produced by the Fulfillment service (gist).

@Slf4j
@Component
public class Receiver {
@Autowired
private CustomerOrdersRepository customerOrdersRepository;
@Autowired
private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders);
latch.countDown();
customerOrdersRepository.save(customerOrders);
}
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent);
latch.countDown();
Criteria criteria = Criteria.where("orders.guid")
.is(orderStatusChangeEvent.getGuid());
Query query = Query.query(criteria);
Update update = new Update();
update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
mongoTemplate.updateFirst(query, update, "customer.orders");
}
}
view raw Receiver.java hosted with ❤ by GitHub

As explained above, the Orders service is configured differently compared to the Fulfillment service, to receive messages from Kafka. The Orders service needs to receive messages from more than one topic. The ReceiverConfig class deserializes all message using the StringDeserializer. The Orders service’s ReceiverConfig class references the Spring Kafka AbstractKafkaListenerContainerFactory classes setMessageConverter method, which allows for dynamic object type matching (gist).

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Override
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Override
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer()
);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Override
@Bean
public Receiver receiver() {
return new Receiver();
}
}

Each Kafka topic the Orders service consumes messages from is associated with a method in the Receiver class (shown above). This method accepts a specific object type as an input parameter, denoting the object type the message payload needs to be deserialized into. In the case of an OrderStatusChangeEvent message, the receiveOrderStatusChangeEvents method is called to consume a message from the fulfillment.order.change Kafka topic.

Part Two

In Part Two of this post, I will briefly cover how to deploy and run a local development version of the storefront components, using Docker. The storefront’s microservices will be exposed through an API Gateway, Netflix’s Zuul. Service discovery and load balancing will be handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. To provide operational visibility, we will add Yahoo’s Kafka Manager and Mongo Express to our system.

Kafka-Eventual-Cons-Swarm

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , ,

6 Comments

Architecting Cloud-Optimized Apps with AKS (Azure’s Managed Kubernetes), Azure Service Bus, and Cosmos DB

An earlier post, Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ, demonstrated the use of a message-based, event-driven, decoupled architectural approach for communications between microservices, using Spring AMQP and RabbitMQ. This distributed computing model is known as eventual consistency. To paraphrase microservices.io, ‘using an event-driven, eventually consistent approach, each service publishes an event whenever it updates its data. Other services subscribe to events. When an event is received, a service (subscriber) updates its data.

That earlier post illustrated a fairly simple example application, the Voter API, consisting of a set of three Spring Boot microservices backed by MongoDB and RabbitMQ, and fronted by an API Gateway built with HAProxy. All API components were containerized using Docker and designed for use with Docker CE for AWS as the Container-as-a-Service (CaaS) platform.

Optimizing for Kubernetes on Azure

This post will demonstrate how a modern application, such as the Voter API, is optimized for Kubernetes in the Cloud (Kubernetes-as-a-Service), in this case, AKS, Azure’s new public preview of Managed Kubernetes for Azure Container Service. According to Microsoft, the goal of AKS is to simplify the deployment, management, and operations of Kubernetes. I wrote about AKS in detail, in my last post, First Impressions of AKS, Azure’s New Managed Kubernetes Container Service.

In addition to migrating to AKS, the Voter API will take advantage of additional enterprise-grade Azure’s resources, including Azure’s Service Bus and Cosmos DB, replacements for the Voter API’s RabbitMQ and MongoDB. There are several architectural options for the Voter API’s messaging and NoSQL data source requirements when moving to Azure.

  1. Keep Dockerized RabbitMQ and MongoDB – Easy to deploy to Kubernetes, but not easily scalable, highly-available, or manageable. Would require storage optimized Azure VMs for nodes, node affinity, and persistent storage for data.
  2. Replace with Cloud-based Non-Azure Equivalents – Use SaaS-based equivalents, such as CloudAMQP (RabbitMQ-as-a-Service) and MongoDB Atlas, which will provide scalability, high-availability, and manageability.
  3. Replace with Azure Service Bus and Cosmos DB – Provides all the advantages of  SaaS-based equivalents, and additionally as Azure resources, benefits from being in the Azure Cloud alongside AKS.

Source Code

The Kubernetes resource files and deployment scripts used in this post are all available on GitHub. This is the only project you need to clone to reproduce the AKS example in this post.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/azure-aks-sb-cosmosdb-demo.git

The Docker images for the three Spring microservices deployed to AKS, Voter, Candidate, and Election, are available on Docker Hub. Optionally, the source code, including Dockerfiles, for the Voter, Candidate, and Election microservices, as well as the Voter Client are available on GitHub, in the kub-aks branch.

git clone \
  --branch kub-aks --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/candidate-service.git

git clone \
  --branch kub-aks --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/election-service.git

git clone \
  --branch kub-aks --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/voter-service.git

git clone \
  --branch kub-aks --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/voter-client.git

Azure Service Bus

To demonstrate the capabilities of Azure’s Service Bus, the Voter API’s Spring microservice’s source code has been re-written to work with Azure Service Bus instead of RabbitMQ. A future post will explore the microservice’s messaging code. It is more likely that a large application, written specifically for a technology that is easily portable such as RabbitMQ or MongoDB, would likely remain on that technology, even if the application was lifted and shifted to the Cloud or moved between Cloud Service Providers (CSPs). Something important to keep in mind when choosing modern technologies – portability.

Service Bus is Azure’s reliable cloud Messaging-as-a-Service (MaaS). Service Bus is an original Azure resource offering, available for several years. The core components of the Service Bus messaging infrastructure are queues, topics, and subscriptions. According to Microsoft, ‘the primary difference is that topics support publish/subscribe capabilities that can be used for sophisticated content-based routing and delivery logic, including sending to multiple recipients.

Since the three Voter API’s microservices are not required to produce messages for more than one other service consumer, Service Bus queues are sufficient, as opposed to a pub/sub model using Service Bus topics.

Cosmos DB

Cosmos DB, Microsoft’s globally distributed, multi-model database, offers throughput, latency, availability, and consistency guarantees with comprehensive service level agreements (SLAs). Ideal for the Voter API, Cosmos DB supports MongoDB’s data models through the MongoDB API, a MongoDB database service built on top of Cosmos DB. The MongoDB API is compatible with existing MongoDB libraries, drivers, tools, and applications. Therefore, there are no code changes required to convert the Voter API from MongoDB to Cosmos DB. I simply had to change the database connection string.

NGINX Ingress Controller

Although the Voter API’s HAProxy-based API Gateway could be deployed to AKS, it is not optimal for Kubernetes. Instead, the Voter API will use an NGINX-based Ingress Controller. NGINX will serve as an API Gateway, as HAProxy did, previously.

According to NGINX, ‘an Ingress is a Kubernetes resource that lets you configure an HTTP load balancer for your Kubernetes services. Such a load balancer usually exposes your services to clients outside of your Kubernetes cluster.

An Ingress resource requires an Ingress Controller to function. Continuing from NGINX, ‘an Ingress Controller is an application that monitors Ingress resources via the Kubernetes API and updates the configuration of a load balancer in case of any changes. Different load balancers require different Ingress controller implementations. In the case of software load balancers, such as NGINX, an Ingress controller is deployed in a pod along with a load balancer.

There are currently two NGINX-based Ingress Controllers available, one from Kubernetes and one directly from NGINX. Both being equal, for this post, I chose the Kubernetes version, without RBAC (Kubernetes offers a version with and without RBAC). RBAC should always be used for actual cluster security. There are several advantages of using either version of the NGINX Ingress Controller for Kubernetes, including Layer 4 TCP and UDP and Layer 7 HTTP load balancing, reverse proxying, ease of SSL termination, dynamically-configurable path-based rules, and support for multiple hostnames.

Azure Web App

Lastly, the Voter Client application, not really part of the Voter API, but useful for demonstration purposes, will be converted from a containerized application to an Azure Web App. Since it is not part of the Voter API, separating the Client application from AKS makes better architectural sense. Web Apps are a powerful, richly-featured, yet incredibly simple way to host applications and services on Azure. For more information on using Azure Web Apps, read my recent post, Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.

Revised Component Architecture

Below is a simplified component diagram of the new architecture, including Azure Service Bus, Cosmos DB, and the NGINX Ingress Controller. The new architecture looks similar to the previous architecture, but as you will see, it is actually very different.

AKS-r8.png

Process Flow

To understand the role of each API component, let’s look at one of the event-driven, decoupled process flows, the creation of a new election candidate. In the simplified flow diagram below, an API consumer executes an HTTP POST request containing the new candidate object as JSON. The Candidate microservice receives the HTTP request and creates a new document in the Cosmos DB Voter database. A Spring RepositoryEventHandler within the Candidate microservice responds to the document creation and publishes a Create Candidate event message, containing the new candidate object as JSON, to the Azure Service Bus Candidate Queue.

Candidate_Producer

Independently, the Voter microservice is listening to the Candidate Queue. Whenever a new message is produced by the Candidate microservice, the Voter microservice retrieves the message off the queue. The Voter microservice then transforms the new candidate object contained in the incoming message to its own candidate data model and creates a new document in its own Voter database.

Voter_Consumer

The same process flows exist between the Election and the Candidate microservices. The Candidate microservice maintains current elections in its database, which are retrieved from the Election queue.

Data Models

It is useful to understand, the Candidate microservice’s candidate domain model is not necessarily identical to the Voter microservice’s candidate domain model. Each microservice may choose to maintain its own representation of a vote, a candidate, and an election. The Voter service transforms the new candidate object in the incoming message based on its own needs. In this case, the Voter microservice is only interested in a subset of the total fields in the Candidate microservice’s model. This is the beauty of decoupling microservices, their domain models, and their datastores.

Other Events

The versions of the Voter API microservices used for this post only support Election Created events and Candidate Created events. They do not handle Delete or Update events, which would be necessary to be fully functional. For example, if a candidate withdraws from an election, the Voter service would need to be notified so no one places votes for that candidate. This would normally happen through a Candidate Delete or Candidate Update event.

Provisioning Azure Service Bus

First, the Azure Service Bus is provisioned. Provisioning the Service Bus may be accomplished using several different methods, including manually using the Azure Portal or programmatically using Azure Resource Manager (ARM) with PowerShell or Terraform. I chose to provision the Azure Service Bus and the two queues using the Azure Portal for expediency. I chose the Basic Service Bus Tier of service, of which there are three tiers, Basic, Standard, and Premium.

Azure_006_Full_ServiceBus

The application requires two queues, the candidate.queue, and the election.queue.

Azure_008_ServiceBus

Provisioning Cosmos DB

Next, Cosmos DB is provisioned. Like Azure Service Bus, Cosmos DB may be provisioned using several methods, including manually using the Azure Portal, programmatically using Azure Resource Manager (ARM) with PowerShell or Terraform, or using the Azure CLI, which was my choice.

az cosmosdb create \
  --name cosmosdb_instance_name_goes_here \
  --resource-group resource_group_name_goes_here \
  --location "East US=0" \
  --kind MongoDB

The post’s Cosmos DB instance exists within the single East US Region, with no failover. In a real Production environment, you would configure Cosmos DB with multi-region failover. I chose MongoDB as the type of Cosmos DB database account to create. The allowed values are GlobalDocumentDB, MongoDB, Parse. All other settings were left to the default values.

The three Spring microservices each have their own database. You do not have to create the databases in advance of consuming the Voter API. The databases and the database collections will be automatically created when new documents are first inserted by the microservices. Below, the three databases and their collections have been created and populated with documents.

Azure_001_CosmosDB

The GitHub project repository also contains three shell scripts to generate sample vote, candidate, and election documents. The scripts will delete any previous documents from the database collections and generate new sets of sample documents. To use, you will have to update the scripts with your own Voter API URL.

AKS_012_AddVotes2

MongoDB Aggregation Pipeline

Each of the three Spring microservices uses Spring Data MongoDB, which takes advantage of MongoDB’s Aggregation Framework. According to MongoDB, ‘the aggregation framework is modeled on the concept of data processing pipelines. Documents enter a multi-stage pipeline that transforms the documents into an aggregated result.’ Below is an example of aggregation from the Candidate microservice’s VoterContoller class.

Aggregation aggregation = Aggregation.newAggregation(
    match(Criteria.where("election").is(election)),
    group("candidate").count().as("votes"),
    project("votes").and("candidate").previousOperation(),
    sort(Sort.Direction.DESC, "votes")
);

To use MongoDB’s aggregation framework with Cosmos DB, it is currently necessary to activate the MongoDB Aggregation Pipeline Preview Feature of Cosmos DB. The feature can be activated from the Azure Portal, as shown below.

Azure_002_CosmosDB

Cosmos DB Emulator

Be warned, Cosmos DB can be very expensive, even without database traffic or any Production-grade bells and whistles. Be careful when spinning up instances on Azure for learning purposes, the cost adds up quickly! In less than ten days, while writing this post, my cost was almost US$100 for the Voter API’s Cosmos DB instance.

I strongly recommend downloading the free Azure Cosmos DB Emulator to develop and test applications from your local machine. Although certainly not as convenient, it will save you the cost of developing for Cosmos DB directly on Azure.

With Cosmos DB, you pay for reserved throughput provisioned and data stored in containers (a collection of documents or a table or a graph). Yes, that’s right, Azure charges you per MongoDB collection, not even per database. Azure Cosmos DB’s current pricing model seems less than ideal for microservice architectures, each with their own database instance.

By default the reserved throughput, billed as Request Units (RU) per second or RU/s, is set to 1,000 RU/s per collection. For development and testing, you can reduce each collection to a minimum of 400 RU/s. The Voter API creates five collections at 1,000 RU/s or 5,000 RU/s total. Reducing this to a total of 2,000 RU/s makes Cosmos DB marginally more affordable to explore.

Azure_003_CosmosDB.PNG

Building the AKS Cluster

An existing Azure Resource Group is required for AKS. I chose to use the latest available version of Kubernetes, 1.8.2.

# login to azure
az login \
  --username your_username  \
  --password your_password

# create resource group
az group create \
  --resource-group resource_group_name_goes_here \
  --location eastus

# create aks cluster
az aks create \
  --name cluser_name_goes_here \
  --resource-group resource_group_name_goes_here \
  --ssh-key-value path_to_your_public_key \
  --kubernetes-version 1.8.2

# get credentials to access aks cluster
az aks get-credentials \
  --name cluser_name_goes_here \
  --resource-group resource_group_name_goes_here

# display cluster's worker nodes
kubectl get nodes --output=wide

By default, AKS will provision a three-node Kubernetes cluster using Azure’s Standard D1 v2 Virtual Machines. According to Microsoft, ‘D series VMs are general purpose VM sizes provide balanced CPU-to-memory ratio. They are ideal for testing and development, small to medium databases, and low to medium traffic web servers.’ Azure D1 v2 VM’s are based on Linux OS images, currently Debian 8 (Jessie), with 1 vCPU and 3.5 GB of memory. By default with AKS, each VM receives 30 GiB of Standard HDD attached storage.

AKS_002_CreateCluster
You should always select the type and quantity of the cluster’s VMs and their attached storage, optimized for estimated traffic volumes and the specific workloads you are running. This can be done using the --node-count--node-vm-size, and --node-osdisk-size arguments with the az aks create command.

Deployment

The Voter API resources are deployed to its own Kubernetes Namespace, voter-api. The NGINX Ingress Controller resources are deployed to a different namespace, ingress-nginx. Separate namespaces help organize individual Kubernetes resources and separate different concerns.

Voter API

First, the voter-api namespace is created. Then, five required Kubernetes Secrets are created within the namespace. These secrets all contain sensitive information, such as passwords, that should not be shared. There is one secret for each of the three Cosmos DB database connection strings, one secret for the Azure Service Bus connection string, and one secret for the Let’s Encrypt SSL/TLS certificate and private key, used for secure HTTPS access to the Voter API.

AKS_004_CreateVoterAPI

Secrets

The Voter API’s secrets are used to populate environment variables within the pod’s containers. The environment variables are then available for use within the containers. Below is a snippet of the Voter pods resource file showing how the Cosmos DB and Service Bus connection strings secrets are used to populate environment variables.

env:
  - name: AZURE_SERVICE_BUS_CONNECTION_STRING
    valueFrom:
      secretKeyRef:
        name: azure-service-bus
        key: connection-string
  - name: SPRING_DATA_MONGODB_URI
    valueFrom:
      secretKeyRef:
        name: azure-cosmosdb-voter
        key: connection-string

Shown below, the Cosmos DB and Service Bus connection strings secrets have been injected into the Voter container and made available as environment variables to the microservice’s executable JAR file on start-up. As environment variables, the secrets are visible in plain text. Access to containers should be tightly controlled through Kubernetes RBAC and Azure AD, to ensure sensitive information, such as secrets, remain confidential.

AKS_014_EnvVars

Next, the three Kubernetes ReplicaSet resources, corresponding to the three Spring microservices, are created using Deployment controllers. According to Kubernetes, a Deployment that configures a ReplicaSet is now the recommended way to set up replication. The Deployments specify three replicas of each of the three Spring Services, resulting in a total of nine Kubernetes Pods.

Each pod, by default, will be scheduled on a different node if possible. According to Kubernetes, ‘the scheduler will automatically do a reasonable placement (e.g. spread your pods across nodes, not place the pod on a node with insufficient free resources, etc.).’ Note below how each of the three microservice’s three replicas has been scheduled on a different node in the three-node AKS cluster.

AKS_005B_CreateVoterPods

Next, the three corresponding Kubernetes ClusterIP-type Services are created. And lastly, the Kubernetes Ingress is created. According to Kubernetes, the Ingress resource is an API object that manages external access to the services in a cluster, typically HTTP. Ingress provides load balancing, SSL termination, and name-based virtual hosting.

The Ingress configuration contains the routing rules used with the NGINX Ingress Controller. Shown below are the routing rules for each of the three microservices within the Voter API. Incoming API requests are routed to the appropriate pod and service port by NGINX.

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: voter-ingress
  namespace: voter-api
  annotations:
    ingress.kubernetes.io/ssl-redirect: "true"
spec:
  tls:
  - hosts:
    - api.voter-demo.com
    secretName: api-voter-demo-secret
  rules:
  - http:
      paths:
      - path: /candidate
        backend:
          serviceName: candidate
          servicePort: 8080
      - path: /election
        backend:
          serviceName: election
          servicePort: 8080
      - path: /voter
        backend:
          serviceName: voter
          servicePort: 8080

The screengrab below shows all of the Voter API resources created on AKS.

AKS_005B_CreateVoterAPI

NGINX Ingress Controller

After completing the deployment of the Voter API, the NGINX Ingress Controller is created. It starts with creating the ingress-nginx namespace. Next, the NGINX Ingress Controller is created, consisting of the NGINX Ingress Controller, three Kubernetes ConfigMap resources, and a default back-end application. The Controller and backend each have their own Service resources. Like the Voter API, each has three replicas, for a total of six pods. Together, the Ingress resource and NGINX Ingress Controller manage traffic to the Spring microservices.

The screengrab below shows all of the NGINX Ingress Controller resources created on AKS.

AKS_015_NGINX_Resources.PNG

The  NGINX Ingress Controller Service, shown above, has an external public IP address associated with itself.  This is because that Service is of the type, Load Balancer. External requests to the Voter API will be routed through the NGINX Ingress Controller, on this IP address.

kind: Service
apiVersion: v1
metadata:
  name: ingress-nginx
  namespace: ingress-nginx
  labels:
    app: ingress-nginx
spec:
  externalTrafficPolicy: Local
  type: LoadBalancer
  selector:
    app: ingress-nginx
  ports:
  - name: http
    port: 80
    targetPort: http
  - name: https
    port: 443
    targetPort: https

If you are only using HTTPS, not HTTP, then the references to HTTP and port 80 in the Ingress configuration are unnecessary. The NGINX Ingress Controller’s resources are explained in detail in the GitHub documentation, along with further configuration instructions.

DNS

To provide convenient access to the Voter API and the Voter Client, my domain, voter-demo.com, is associated with the public IP address associated with the Voter API Ingress Controller and with the public IP address associated with the Voter Client Azure Web App. DNS configuration is done through Azure’s DNS Zone resource.

AKS_008_FinalDNS

The two TXT type records might not look as familiar as the SOA, NS, and A type records. The TXT records are required to associate the domain entries with the Voter Client Azure Web App. Browsing to http://www.voter-demo.com or simply http://voter-demo.com brings up the Voter Client.

VoterClientAngular5_small.png

The Client sends and receives data via the Voter API, available securely at https://api.voter-demo.com.

Routing API Requests

With the Pods, Services, Ingress, and NGINX Ingress Controller created and configured, as well as the Azure Layer 4 Load Balancer and DNS Zone, HTTP requests from API consumers are properly and securely routed to the appropriate microservices. In the example below, three back-to-back requests are made to the voter/info API endpoint. HTTP requests are properly routed to one of the three Voter pod replicas using the default round-robin algorithm, as proven by the observing the different hostnames (pod names) and the IP addresses (private pod IPs) in each HTTP response.

AKS_013B_LoadBalancingReplicaSets.PNG

Final Architecture

Shown below is the final Voter API Azure architecture. To simplify the diagram, I have deliberately left out the three microservice’s ClusterIP-type Services, the three default back-end application pods, and the default back-end application’s ClusterIP-type Service. All resources shown below are within the single East US Azure region, except DNS, which is a global resource.

kub-aks-v10-small.png

Shown below is the new Azure Resource Group created by Azure during the AKS provisioning process. The Resource Group contains the various resources required to support the AKS cluster, NGINX Ingress Controller, and the Voter API. Necessary Azure resources were automatically provisioned when I provisioned AKS and when I created the new Voter API and NGINX resources.

AKS_006_FinalRG

In addition to the Resource Group above, the original Resource Group contains the AKS Container Service resource itself, Service Bus, Cosmos DB, and the DNS Zone resource.

AKS_007_FinalRG

The Voter Client Web App, consisting of the Azure App Service and App Service plan resource, is located in a third, separate Resource Group, not shown here.

Cleaning Up AKS

A nice feature of AKS, running a az aks delete command will delete all the Azure resources created as part of provisioning AKS, the API, and the Ingress Controller. You will have to delete the Cosmos DB, Service Bus, and DNS Zone resources, separately.

az aks delete \
  --name cluser_name_goes_here \
  --resource-group resource_group_name_goes_here

Conclusion

Taking advantage of Kubernetes with AKS, and the array of Azure’s enterprise-grade resources, the Voter API was shifted from a simple Docker architecture to a production-ready solution. The Voter API is now easier to manage, horizontally scalable, fault-tolerant, and marginally more secure. It is capable of reliably supporting dozens more microservices, with multiple replicas. The Voter API will handle a high volume of data transactions and event messages.

There is much more that needs to be done to productionalize the Voter API on AKS, including:

  • Add multi-region failover of Cosmos DB
  • Upgrade to Service Bus Standard or Premium Tier
  • Optimized Azure VMs and storage for anticipated traffic volumes and application-specific workloads
  • Implement Kubernetes RBAC
  • Add Monitoring, logging, and alerting with Envoy or similar
  • Secure end-to-end TLS communications with Itsio or similar
  • Secure the API with OAuth and Azure AD
  • Automate everything with DevOps – AKS provisioning, testing code, creating resources, updating microservices, and managing data

All opinions in this post are my own, and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , , , , ,

4 Comments

Eventual Consistency: Decoupling Microservices with Spring AMQP and RabbitMQ

RabbitMQEnventCons.png

Introduction

In a recent post, Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ, we moved away from synchronous REST HTTP for inter-process communications (IPC) toward message-based IPC. Moving to asynchronous message-based communications allows us to decouple services from one another. It makes it easier to build, test, and release our individual services. In that post, we did not achieve fully asynchronous communications. Although, we did achieve a higher level of service decoupling using message-based Remote Procedure Call (RPC) IPC.

In this post, we will fully decouple our services using the distributed computing model of eventual consistency. More specifically, we will use a message-based, event-driven, loosely-coupled, eventually consistent architectural approach for communications between services.

What is eventual consistency? One of the best definitions of eventual consistency I have read was posted on microservices.io. To paraphrase, ‘using an event-driven, eventually consistent approach, each service publishes an event whenever it updates its data. Other services subscribe to events. When an event is received, a service updates its data.

Example of Eventual Consistency

Imagine, Service A, the Customer service, inserts a new customer record into its database. Based on that ‘customer created’ event, Service A publishes a message containing the new customer object, serialized to JSON, to the lightweight, persistent, New Customer message queue.

Service B, the Customer Onboarding service, a subscriber to the New Customer queue, consumes and deserializes Service A’s message. Service B may or may not perform a data transformation of the Customer object to its own Customer data model. Service B then inserts the new customer record into its own database.

In the above example, it can be said that the customer records in Service B’s database are eventually consistent with the customer records in Service A’s database. Service A makes a change and publishes a message in response to the event. Service B consumes the message and makes the same change. Eventually (likely within milliseconds), Service B’s customer records are consistent with Service A’s customer records.

Why Eventual Consistency?

So what does this apparent added complexity and duplication of data buy us? Consider the advantages. Service B, the Onboarding service, requires no knowledge of, or a dependency on, Service A, the Customer service. Still, Service B has a current record of all the customers that Service A maintains. Instead of making repeated and potentially costly RESTful HTTP calls or RPC message-based calls to or from Service A to Service B for new customers, Service B queries its database for a list of customers.

The value of eventual consistency increases factorially as you scale a distributed system. Imagine dozens of distinct microservices, many requiring data from other microservices. Further, imagine multiple instances of each of those services all running in parallel. Decoupling services from one another, through asynchronous forms of IPC, messaging, and event-driven eventual consistency greatly simplifies the software development lifecycle and operations.

Demonstration

In this post, we could use a few different architectural patterns to demonstrate message passing with RabbitMQ and Spring AMQP. They including Work Queues, Publish/Subscribe, Routing, or Topics. To keep things as simple as possible, we will have a single Producer, publish messages to a single durable and persistent message queue. We will have a single Subscriber, a Consumer, consume the messages from that queue. We focus on a single type of event message.

Sample Code

To demonstrate Spring AMQP-based messaging with RabbitMQ, we will use a reference set of three Spring Boot microservices. The Election ServiceCandidate Service, and Voter Service are all backed by MongoDB. The services and MongoDB, along with RabbitMQ and Voter API Gateway, are all part of the Voter API.

The Voter API Gateway, based on HAProxy, serves as a common entry point to all three services, as well as serving as a reverse proxy and load balancer. The API Gateway provides round-robin load-balanced access to multiple instances of each service.

Voter_API_Architecture

All the source code found this post’s example is available on GitHub, within a few different project repositories. The Voter Service repository contains the Voter service source code, along with the scripts and Docker Compose files required to deploy the project. The Election Service repository, Candidate Service repository, and Voter API Gateway repository are also available on GitHub. There is also a new AngularJS/Node.js Web Client, to demonstrate how to use the Voter API.

For this post, you only need to clone the Voter Service repository.

Deploying Voter API

All components, including the Spring Boot services, MongoDB, RabbitMQ, API Gateway, and the Web Client, are individually deployed using Docker. Each component is publicly available as a Docker Image, on Docker Hub. The Voter Service repository contains scripts to deploy the entire set of Dockerized components, locally. The repository also contains optional scripts to provision a Docker Swarm, using Docker’s newer swarm mode, and deploy the components. We will only deploy the services locally for this post.

To clone and deploy the components locally, including the Spring Boot services, MongoDB, RabbitMQ, and the API Gateway, execute the following commands. If this is your first time running the commands, it may take a few minutes for your system to download all the required Docker Images from Docker Hub.


git clone –depth 1 –branch rabbitmq \
https://github.com/garystafford/voter-service.git
cd voter-service/scripts-services
sh ./stack_deploy_local.sh

If everything was deployed successfully, you should observe six running Docker containers, similar to the output, below.


CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
32d73282ff3d garystafford/voter-api-gateway:rabbitmq "/docker-entrypoin…" 8 seconds ago Up 5 seconds 0.0.0.0:8080->8080/tcp voterstack_voter-api-gateway_1
1ece438c5da4 garystafford/candidate-service:rabbitmq "java -Dspring.pro…" 10 seconds ago Up 7 seconds 0.0.0.0:8097->8080/tcp voterstack_candidate_1
30391faa3422 garystafford/voter-service:rabbitmq "java -Dspring.pro…" 10 seconds ago Up 7 seconds 0.0.0.0:8099->8080/tcp voterstack_voter_1
35063ccfe706 garystafford/election-service:rabbitmq "java -Dspring.pro…" 12 seconds ago Up 10 seconds 0.0.0.0:8095->8080/tcp voterstack_election_1
23eae86967a2 rabbitmq:management-alpine "docker-entrypoint…" 14 seconds ago Up 11 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp voterstack_rabbitmq_1
7e77ddecddbb mongo:latest "docker-entrypoint…" 24 seconds ago Up 21 seconds 0.0.0.0:27017->27017/tcp voterstack_mongodb_1

Using Voter API

The Voter Service, Election Service, and Candidate Service GitHub repositories each contain README files, which detail all the API endpoints each service exposes, and how to call them.

In addition to casting votes for candidates, the Voter service can simulate election results. Calling the /simulation endpoint, and indicating the desired election, the Voter service will randomly generate a number of votes for each candidate in that election. This will save us the burden of casting votes for this demonstration. However, the Voter service has no knowledge of elections or candidates. The Voter service depends on the Candidate service to obtain a list of candidates.

The Candidate service manages electoral candidates, their political affiliation, and the election in which they are running. Like the Voter service, the Candidate service also has a /simulation endpoint. The service will create a list of candidates based on the 2012 and 2016 US Presidential Elections. The simulation capability of the service saves us the burden of inputting candidates for this demonstration.

The Election service manages elections, their polling dates, and the type of election (federal, state, or local). Like the other services, the Election service also has a /simulation endpoint, which will create a list of sample elections. The Election service will not be discussed in this post’s demonstration. We will examine communications between the Candidate and Voter services, only.

REST HTTP Endpoint

As you recall from our previous post, Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ, the Voter service exposes multiple, almost identical endpoints. Each endpoint uses a different means of IPC to retrieve candidates and generates random votes.

Calling the /voter/simulation/http/{election} endpoint and providing a specific election, prompts the Voter service to request a list of candidates from the Candidate service, based on the election parameter you input. This request is done using synchronous REST HTTP. The Voter service uses the HTTP GET method to request the data from the Candidate service. The Voter service then waits for a response.

The Candidate service receives the HTTP request. The Candidate service responds to the Voter service with a list of candidates in JSON format. The Voter service receives the response payload containing the list of candidates. The Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message-based RPC Endpoint

Similarly, calling the /voter/simulation/rpc/{election} endpoint and providing a specific election, prompts the Voter service to request the same list of candidates. However, this time, the Voter service (the client) produces a request message and places in RabbitMQ’s voter.rpc.requests queue. The Voter service then waits for a response. The Voter service has no direct dependency on the Candidate service; it only depends on a response to its request message. In this way, it is still a form of synchronous IPC, but the Voter service is now decoupled from the Candidate service.

The request message is consumed by the Candidate service (the server), who is listening to that queue. In response, the Candidate service produces a message containing the list of candidates serialized to JSON. The Candidate service (the server) sends a response back to the Voter service (the client) through RabbitMQ. This is done using the Direct reply-to feature of RabbitMQ or using a unique response queue, specified in the reply-to header of the request message, sent by the Voter Service.

The Voter service receives the message containing the list of candidates. The Voter service deserializes the JSON payload to candidate objects. The Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, identical to the previous example, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters database.

New Endpoint

Calling the new /voter/simulation/db/{election} endpoint and providing a specific election, prompts the Voter service to query its own MongoDB database for a list of candidates.

But wait, where did the candidates come from? The Voter service didn’t call the Candidate service? The answer is message-based eventual consistency. Whenever a new candidate is created, using a REST HTTP POST request to the Candidate service’s /candidate/candidates endpoint, a Spring Data Rest Repository Event Handler responds. Responding to the candidate created event, the event handler publishes a message, containing a serialized JSON representation of the new candidate object, to a durable and persistent RabbitMQ queue.

The Voter service is listening to that queue. The Voter service consumes messages off the queue, deserializes the candidate object, and saves it to its own voters database, to the candidate collection. For this example, we are saving the incoming candidate object as is, with no transformations. The candidate object model for both services is identical.

When /voter/simulation/db/{election} endpoint is called, the Voter service queries its voters database for a list of candidates. They Voter service then proceeds to generate a random number of votes for each candidate in the list. Finally, identical to the previous two examples, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message_Queue_Diagram_Final3B

Exploring the Code

We will not review the REST HTTP or RPC IPC code in this post. It was covered in detail, in the previous post. Instead, we will explore the new code required for eventual consistency.

Spring Dependencies

To use AMQP with RabbitMQ, we need to add a project dependency on org.springframework.boot.spring-boot-starter-amqp. Below is a snippet from the Candidate service’s build.gradle file, showing project dependencies. The Voter service’s dependencies are identical.


dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-actuator-docs'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-rest'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-hateoas'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-logging'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web'
compile group: 'org.webjars', name: 'hal-browser'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test'
}

view raw

build.gradle

hosted with ❤ by GitHub

AMQP Configuration

Next, we need to add a small amount of RabbitMQ AMQP configuration to both services. We accomplish this by using Spring’s @Configuration annotation on our configuration classes. Below is the abridged configuration class for the Voter service.


package com.voterapi.voter.configuration;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class VoterConfig {
@Bean
public Queue candidateQueue() {
return new Queue("candidates.queue");
}
}

And here, the abridged configuration class for the Candidate service.


package com.voterapi.candidate.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CandidateConfig {
@Bean
public Queue candidateQueue() {
return new Queue("candidates.queue");
}
}

Event Handler

With our dependencies and configuration in place, we will define the CandidateEventHandler class. This class is annotated with the Spring Data Rest @RepositoryEventHandler and Spring’s @Component. The @Component annotation ensures the event handler is registered.

The class contains the handleCandidateSave method, which is annotated with the Spring Data Rest @HandleAfterCreate. The event handler acts on the Candidate object, which is the first parameter in the method signature.

Responding to the candidate created event, the event handler publishes a message, containing a serialized JSON representation of the new candidate object, to the candidates.queue queue. This was the queue we configured earlier.


package com.voterapi.candidate.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.voterapi.candidate.domain.Candidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.rest.core.annotation.HandleAfterCreate;
import org.springframework.data.rest.core.annotation.RepositoryEventHandler;
import org.springframework.stereotype.Component;
@Component
@RepositoryEventHandler
public class CandidateEventHandler {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private RabbitTemplate rabbitTemplate;
private Queue candidateQueue;
@Autowired
public CandidateEventHandler(RabbitTemplate rabbitTemplate, Queue candidateQueue) {
this.rabbitTemplate = rabbitTemplate;
this.candidateQueue = candidateQueue;
}
@HandleAfterCreate
public void handleCandidateSave(Candidate candidate) {
sendMessage(candidate);
}
private void sendMessage(Candidate candidate) {
rabbitTemplate.convertAndSend(
candidateQueue.getName(), serializeToJson(candidate));
}
private String serializeToJson(Candidate candidate) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(candidate);
} catch (JsonProcessingException e) {
logger.info(String.valueOf(e));
}
logger.debug("Serialized message payload: {}", jsonInString);
return jsonInString;
}
}

Consuming Messages

Next, we let’s switch to the Voter service’s CandidateListService class. Below is an abridged version of the class with two new methods. First, the getCandidateMessage method listens to the candidates.queue queue. This was the queue we configured earlier. The method is annotated with theSpring AMQP Rabbit @RabbitListener annotation.

The getCandidateMessage retrieves the new candidate object from the message, deserializes the message’s JSON payload, maps it to the candidate object model and saves it to the Voter service’s database.

The second method, getCandidatesQueueDb, retrieves the candidates from the Voter service’s database. The method makes use of the Spring Data MongoDB Aggregation package to return a list of candidates from MongoDB.


/**
* Consumes a new candidate message, deserializes, and save to MongoDB
* @param candidateMessage
*/
@RabbitListener(queues = "#{candidateQueue.name}")
public void getCandidateMessage(String candidateMessage) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
TypeReference<Candidate> mapType = new TypeReference<Candidate>() {};
Candidate candidate = null;
try {
candidate = objectMapper.readValue(candidateMessage, mapType);
} catch (IOException e) {
logger.info(String.valueOf(e));
}
candidateRepository.save(candidate);
logger.debug("Candidate {} saved to MongoDB", candidate.toString());
}
/**
* Retrieves candidates from MongoDB and transforms to voter view
* @param election
* @return List of candidates
*/
public List<CandidateVoterView> getCandidatesQueueDb(String election) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("election").is(election)),
project("firstName", "lastName", "politicalParty", "election")
.andExpression("concat(firstName,' ', lastName)")
.as("fullName"),
sort(Sort.Direction.ASC, "lastName")
);
AggregationResults<CandidateVoterView> groupResults
= mongoTemplate.aggregate(aggregation, Candidate.class, CandidateVoterView.class);
return groupResults.getMappedResults();
}

RabbitMQ Management Console

The easiest way to observe what is happening with the messages is using the RabbitMQ Management Console. To access the console, point your web browser to localhost, on port 15672. The default login credentials for the console are guest/guest. As you successfully produce and consume messages with RabbitMQ, you should see activity on the Overview tab.

RabbitMQ_EC_Durable3.png

Recall we said the queue, in this example, was durable. That means messages will survive the RabbitMQ broker stopping and starting. In the below view of the RabbitMQ Management Console, note the six messages persisted in memory. The Candidate service produced the messages in response to six new candidates being created. However, the Voter service was not running, and therefore, could not consume the messages. In addition, the RabbitMQ server was restarted, after receiving the candidate messages. The messages were persisted and still present in the queue after the successful reboot of RabbitMQ.

RabbitMQ_EC_Durable

Once RabbitMQ and the Voter service instance were back online, the Voter service successfully consumed the six waiting messages from the queue.

RabbitMQ_EC_Durable2.png

Service Logs

In addition to using the RabbitMQ Management Console, we may obverse communications between the two services by looking at the Voter and Candidate service’s logs. I have grabbed a snippet of both service’s logs and added a few comments to show where different processes are being executed.

First the Candidate service logs. We observe a REST HTTP POST request containing a new candidate. We then observe the creation of the new candidate object in the Candidate service’s database, followed by the event handler publishing a message on the queue. Finally, we observe the response is returned in reply to the initial REST HTTP POST request.


# REST HTTP POST Request received
2017-05-11 22:43:46.667 DEBUG 18702 — [nio-8097-exec-5] o.a.coyote.http11.Http11InputBuffer : Received [POST /candidate/candidates HTTP/1.1
Host: localhost:8097
User-Agent: HTTPie/0.9.8
Accept-Encoding: gzip, deflate
Accept: application/json, */*
Connection: keep-alive
Content-Type: application/json
Content-Length: 127
{"firstName": "Hillary", "lastName": "Clinton", "politicalParty": "Democratic Party", "election": "2016 Presidential Election"}]
2017-05-11 22:43:46.667 DEBUG 18702 — [nio-8097-exec-5] o.a.c.authenticator.AuthenticatorBase : Security checking request POST /candidate/candidates
# Inserting new Candidate into database
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] o.s.data.mongodb.core.MongoTemplate : Inserting DBObject containing fields: [_class, _id, firstName, lastName, politicalParty, election] in collection: candidate
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[candidates]
2017-05-11 22:43:46.674 DEBUG 18702 — [nio-8097-exec-5] org.mongodb.driver.protocol.insert : Inserting 1 documents into namespace candidates.candidate on connection [connectionId{localValue:2, serverValue:147}] to server localhost:27017
2017-05-11 22:43:46.677 DEBUG 18702 — [nio-8097-exec-5] org.mongodb.driver.protocol.insert : Insert completed
# Publishing message on queue
2017-05-11 22:43:46.678 DEBUG 18702 — [nio-8097-exec-5] o.s.d.r.c.e.AnnotatedEventHandlerInvoker : Invoking AfterCreateEvent handler for Hillary Clinton (Democratic Party).
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] c.v.c.service.CandidateEventHandler : Serialized message payload: {"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@1bfb15f5 Shared Rabbit Connection: SimpleConnection@37d1ba14 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59422]
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message on exchange [], routingKey = [candidates.queue]
# Response to HTTP POST
2017-05-11 22:43:46.679 DEBUG 18702 — [nio-8097-exec-5] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'persistentEntities'
2017-05-11 22:43:46.681 DEBUG 18702 — [nio-8097-exec-5] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'org.springframework.boot.actuate.autoconfigure.EndpointWebMvcHypermediaManagementContextConfiguration$ActuatorEndpointLinksAdvice'
2017-05-11 22:43:46.682 DEBUG 18702 — [nio-8097-exec-5] s.d.r.w.j.PersistentEntityJackson2Module : Serializing PersistentEntity org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity@1a4d1ab7.
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.w.s.m.m.a.HttpEntityMethodProcessor : Written [Resource { content: Hillary Clinton (Democratic Party), links: [<http://localhost:8097/candidate/candidates/591521621162e1490eb0d537&gt;;rel="self", <http://localhost:8097/candidate/candidates/591521621162e1490eb0d537{?projection}>;rel="candidate"] }] as "application/json" using [org.springframework.data.rest.webmvc.config.RepositoryRestMvcConfiguration$ResourceSupportHttpMessageConverter@27329d2a]
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.web.servlet.DispatcherServlet : Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling
2017-05-11 22:43:46.683 DEBUG 18702 — [nio-8097-exec-5] o.s.web.servlet.DispatcherServlet : Successfully completed request

Now the Voter service logs. At the exact same second as the message and the response sent by the Candidate service, the Voter service consumes the message off the queue. The Voter service then deserializes the new candidate object and inserts it into its database.


# Retrieving message from queue
2017-05-11 22:43:46.242 DEBUG 19001 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Retrieving delivery for Consumer@78910096: tags=[{amq.ctag-WCLRWmQ6WRkGgxg-enVslA=candidates.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@386143c0 Shared Rabbit Connection: SimpleConnection@2d187d86 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59586], acknowledgeMode=AUTO local queue size=0
2017-05-11 22:43:46.684 DEBUG 19001 — [pool-1-thread-9] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for Consumer@78910096: tags=[{amq.ctag-WCLRWmQ6WRkGgxg-enVslA=candidates.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@386143c0 Shared Rabbit Connection: SimpleConnection@2d187d86 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59586], acknowledgeMode=AUTO local queue size=0
2017-05-11 22:43:46.685 DEBUG 19001 — [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:'{"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=candidates.queue, receivedDelay=null, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-WCLRWmQ6WRkGgxg-enVslA, consumerQueue=candidates.queue])
2017-05-11 22:43:46.686 DEBUG 19001 — [cTaskExecutor-1] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload={"id":"591521621162e1490eb0d537","firstName":"Hillary","lastName":"Clinton","politicalParty":"Democratic Party","election":"2016 Presidential Election","fullName":"Hillary Clinton"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=candidates.queue, amqp_contentEncoding=UTF-8, amqp_deliveryTag=6, amqp_consumerQueue=candidates.queue, amqp_redelivered=false, id=608b990a-919b-52c1-fb64-4af4be03b306, amqp_consumerTag=amq.ctag-WCLRWmQ6WRkGgxg-enVslA, contentType=text/plain, timestamp=1494557026686}]]
# Inserting new Candidate into database
2017-05-11 22:43:46.687 DEBUG 19001 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoTemplate : Saving DBObject containing fields: [_class, _id, firstName, lastName, politicalParty, election]
2017-05-11 22:43:46.687 DEBUG 19001 — [cTaskExecutor-1] o.s.data.mongodb.core.MongoDbUtils : Getting Mongo Database name=[voters]
2017-05-11 22:43:46.688 DEBUG 19001 — [cTaskExecutor-1] org.mongodb.driver.protocol.update : Updating documents in namespace voters.candidate on connection [connectionId{localValue:2, serverValue:151}] to server localhost:27017
2017-05-11 22:43:46.703 DEBUG 19001 — [cTaskExecutor-1] org.mongodb.driver.protocol.update : Update completed
2017-05-11 22:43:46.703 DEBUG 19001 — [cTaskExecutor-1] c.v.voter.service.CandidateListService : Candidate Hillary Clinton (Democratic Party) saved to MongoDB

MongoDB

Using the mongo Shell, we can observe six new 2016 Presidential Election candidates in the Candidate service’s database.


> show dbs
candidates 0.000GB
voters 0.000GB
> use candidates
switched to db candidates
> show collections
candidate
> db.candidate.find({})
{ "_id" : ObjectId("5915220e1162e14b2a42e65e"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Donald", "lastName" : "Trump", "politicalParty" : "Republican Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("5915220f1162e14b2a42e65f"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Chris", "lastName" : "Keniston", "politicalParty" : "Veterans Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e660"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Jill", "lastName" : "Stein", "politicalParty" : "Green Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e661"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Gary", "lastName" : "Johnson", "politicalParty" : "Libertarian Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e662"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Darrell", "lastName" : "Castle", "politicalParty" : "Constitution Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e663"), "_class" : "com.voterapi.candidate.domain.Candidate", "firstName" : "Hillary", "lastName" : "Clinton", "politicalParty" : "Democratic Party", "election" : "2016 Presidential Election" }

Now, looking at the Voter service’s database, we should find the same six 2016 Presidential Election candidates. Note the Object IDs are the same between the two service’s document sets, as are the rest of the fields (first name, last name, political party, and election). However, the class field is different between the two service’s records.


> show dbs
candidates 0.000GB
voters 0.000GB
> use voters
> db.candidate.find({})
{ "_id" : ObjectId("5915220e1162e14b2a42e65e"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Donald", "lastName" : "Trump", "politicalParty" : "Republican Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("5915220f1162e14b2a42e65f"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Chris", "lastName" : "Keniston", "politicalParty" : "Veterans Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e660"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Jill", "lastName" : "Stein", "politicalParty" : "Green Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522101162e14b2a42e661"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Gary", "lastName" : "Johnson", "politicalParty" : "Libertarian Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e662"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Darrell", "lastName" : "Castle", "politicalParty" : "Constitution Party", "election" : "2016 Presidential Election" }
{ "_id" : ObjectId("591522111162e14b2a42e663"), "_class" : "com.voterapi.voter.domain.Candidate", "firstName" : "Hillary", "lastName" : "Clinton", "politicalParty" : "Democratic Party", "election" : "2016 Presidential Election" }

Production Considerations

The post demonstrated a simple example of message-based, event-driven eventual consistency. In an actual Production environment, there are a few things that must be considered.

  • We only addressed a ‘candidate created’ event. We would also have to code for other types of events, such as a ‘candidate deleted’ event and a ‘candidate updated’ event.
  • If a candidate is added, deleted, then re-added, are the events published and consumed in the right order? What about with multiple instances of the Voter service running? Does this pattern guarantee event ordering?
  • How should the Candidate service react on startup if RabbitMQ is not available
  • What if RabbitMQ fails after the Candidate services have started?
  • How should the Candidate service react if a new candidate record is added to the database, but a ‘candidate created’ event message cannot be published to RabbitMQ? The two actions are not wrapped in a single transaction.
  • In all of the above scenarios, what response should be returned to the API end user?

Conclusion

In this post, using eventual consistency, we successfully decoupled our two microservices and achieved asynchronous inter-process communications. Adopting a message-based, event-driven, loosely-coupled architecture, wherever possible, in combination with REST HTTP when it makes sense, will improve the overall manageability and scalability of a microservices-based platform.

References

All opinions in this post are my own and not necessarily the views of my current employer or their clients.

, , , , , , , , , , ,

5 Comments

Decoupling Microservices using Message-based RPC IPC, with Spring, RabbitMQ, and AMPQ

RabbitMQ_Screen_3

Introduction

There has been a considerable growth in modern, highly scalable, distributed application platforms, built around fine-grained RESTful microservices. Microservices generally use lightweight protocols to communicate with each other, such as HTTP, TCP, UDP, WebSockets, MQTT, and AMQP. Microservices commonly communicate with each other directly using REST-based HTTP, or indirectly, using messaging brokers.

There are several well-known, production-tested messaging queues, such as Apache Kafka, Apache ActiveMQAmazon Simple Queue Service (SQS), and Pivotal’s RabbitMQ. According to Pivotal, of these messaging brokers, RabbitMQ is the most widely deployed open source message broker.

RabbitMQ supports multiple messaging protocols. RabbitMQ’s primary protocol, the Advanced Message Queuing Protocol (AMQP), is an open standard wire-level protocol and semantic framework for high-performance enterprise messaging. According to Spring, ‘AMQP has exchanges, routes, and queues. Messages are first published to exchanges. Routes define on which queue(s) to pipe the message. Consumers subscribing to that queue then receive a copy of the message.

Pivotal’s Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. The project’s libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration. The project provides a ‘template’ (RabbitTemplate) as a high-level abstraction for sending and receiving messages.

In this post, we will explore how to start moving Spring Boot Java services away from using synchronous REST HTTP for inter-process communications (IPC), and toward message-based IPC. Moving from synchronous IPC to messaging queues and asynchronous IPC decouples services from one another, allowing us to more easily build, test, and release individual microservices.

Message-Based RPC IPC

Decoupling services using asynchronous IPC is considered optimal by many enterprise software architects when developing modern distributed platforms. However, sometimes it is not easy or possible to get away from synchronous communications. Rightly or wrongly, often times services are architected, such that one service needs to retrieve data from another service or services, in order to process its own requests. It can be said, that service has a direct dependency on the other services. Many would argue, services, especially RESTful microservices, should not be coupled in this way.

There are several ways to break direct service-to-service dependencies using asynchronous IPC. We might implement request/async response REST HTTP-based IPC. We could also use publish/subscribe or publish/async response messaging queue-based IPC. These are all described by NGINX, in their article, Building Microservices: Inter-Process Communication in a Microservices Architecture; a must-read for anyone working with microservices. We might also implement an architecture which supports eventual consistency, eliminating the need for one service to obtain data from another service.

So what if we cannot implement asynchronous methods to break direct service dependencies, but we want to move toward message-based IPC? One answer is message-based Remote Procedure Call (RPC) IPC. I realize the mention of RPC might send cold shivers down the spine of many seasoned architected. Traditional RPC has several challenges, many which have been overcome with more modern architectural patterns.

According to Wikipedia, ‘in distributed computing, a remote procedure call (RPC) is when a computer program causes a procedure (subroutine) to execute in another address space (commonly on another computer on a shared network), which is coded as if it were a normal (local) procedure call, without the programmer explicitly coding the details for the remote interaction.

Although still a form of RPC and not asynchronous, it is possible to replace REST HTTP IPC with message-based RPC IPC. Using message-based RPC, services have no direct dependencies on other services. A service only depends on a response to a message request it makes to that queue. The services are now decoupled from one another. The requestor service (the client) has no direct knowledge of the respondent service (the server).

RPC with RabbitMQ and AMQP

RabbitMQ has an excellent set of six tutorials, which cover the basics of creating messaging applications, applying different architectural patterns, using RabbitMQ, in several different programming languages. The sixth and final tutorial covers using RabbitMQ for RPC-based IPC, with the request/reply architectural pattern.

Pivotal recently added Spring AMPQ implementations to each RabbitMQ tutorial, based on their Spring AMQP project. If you recall, the Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions.

This post’s RPC IPC example is closely based on the architectural pattern found in the Spring AMQP RabbitMQ tutorial.

Sample Code

To demonstrate Spring AMQP-based RPC IPC messaging with RabbitMQ, we will use a pair of simple Spring Boot microservices. These services, the Voter and Candidate services, have been used in several previous posts, and for training and testing DevOps engineers. Both services are backed by MongoDB. The services and MongoDB, along with RabbitMQ, are all part of the Voter API project. The Voter API project also contains an HAProxy-based API Gateway, which provides indirect, load-balanced access to the two services.

All code necessary to build this post’s example is available on GitHub, within three projects. The Voter Service project repository contains the Voter service source code, along with the scripts and Docker Compose files required to deploy the project. The Candidate Service project repository and the Voter API Gateway project repository are also available on GitHub. For this post, you need only clone the Voter Service project repository.

Deploying Voter API

All components, including the two Spring services, MongoDB, RabbitMQ, and the API Gateway, are individually deployed using Docker. Each component is publicly available as a Docker Image, on Docker Hub.

The Voter Service repository contains scripts to deploy the entire set of Dockerized components, locally. The repository also contains optional scripts to provision a Docker Swarm, using Docker’s newer swarm mode, and deploy the components. We will only deploy the services locally for this post.

To clone and deploy the components locally, including the two Spring services, MongoDB, RabbitMQ, and the API Gateway, execute the following commands. If this is your first time running the commands, it may take a few minutes for your system to download all the required Docker Images from Docker Hub.


git clone –depth 1 –branch rabbitmq \
https://github.com/garystafford/voter-service.git
cd voter-service/scripts-services
sh ./stack_deploy_local.sh

If everything was deployed successfully, you should see the following output. You should observe five running Docker containers.


? docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8ef4866984c3 garystafford/voter-api-gateway:rabbitmq "/docker-entrypoin…" 25 hours ago Up 25 hours 0.0.0.0:8080->8080/tcp voterstack_voter-api-gateway_1
cc28d084ab17 garystafford/candidate-service:rabbitmq "java -Dspring.pro…" 25 hours ago Up 25 hours 0.0.0.0:8097->8080/tcp voterstack_candidate_1
e4c22258b77b garystafford/voter-service:rabbitmq "java -Dspring.pro…" 25 hours ago Up 25 hours 0.0.0.0:8099->8080/tcp voterstack_voter_1
fdb4b9f58a53 rabbitmq:management-alpine "docker-entrypoint…" 25 hours ago Up 25 hours 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp voterstack_rabbitmq_1
1678227b143c mongo:latest "docker-entrypoint…" 25 hours ago Up 25 hours 0.0.0.0:27017->27017/tcp voterstack_mongodb_1

Using Voter API

The Voter Service and Candidate Service GitHub repositories both contain README files, which detail all the API endpoints each service exposes, and how to call them.

In addition to casting votes for candidates, the Voter service has the ability to simulate election results. By calling a /simulation endpoint, and indicating the desired election, the Voter service will randomly generate a number of votes for each candidate in that election. This will save us the burden of casting votes for this demonstration. However, the Voter service has no knowledge of elections or candidates. To obtain a list of candidates, the Voter service depends on the Candidate service.

The Candidate service manages electoral candidates, their political affiliation, and the election in which they are running. Like the Voter service, the Candidate service also has a /simulation endpoint. The service will create a list of candidates based on the 2012 and 2016 US Presidential Elections. The simulation capability of the service saves us the burden of inputting candidates for this demonstration.

REST HTTP Endpoint

The Voter service exposes two almost identical endpoints. Both endpoints generate random votes. However, below the covers, the two endpoints are very different. Calling the /voter/simulation/http/{election} endpoint, prompts the Voter service to request a list of candidates from the Candidate service, based on the election parameter you input. This request is done using synchronous REST HTTP. The Voter service uses the HTTP GET method to request the data from the Candidate service. The Voter service then waits for a response.

The HTTP request is received by the Candidate service. The Candidate service responds to the Voter service with a list of candidates, in JSON format. The Voter service receives the response containing the list of candidates. The Voter service then proceeds to generate a random number of votes for each candidate. Finally, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message Queue Diagram 1D

Message-based RPC Endpoint

Similarly, calling the /voter/simulation/rpc/{election} endpoint with a specific election prompts the Voter service to request the same list of candidates. However, this time, the Voter service (the client), produces a request message and places in RabbitMQ’s voter.rpc.requests queue. The Voter service then waits for a response. The Voter service has no direct dependency on the Candidate service. It only depends on a response to its message request. In this way, it is still a form of synchronous IPC, but the Voter service is now decoupled from the Candidate service.

The request message is consumed by the Candidate service (the server), who is listening to that queue. In response, the Candidate service produces a message containing the list of candidates, serialized to JSON. The Candidate service (the server) sends a response back to the Voter service (the client), through RabbitMQ. This is done using the Direct reply-to feature of RabbitMQ or using a unique response queue, specified in the reply-to header of the request message, sent by the Voter Service.

According to RabbitMQ, ‘the direct reply-to feature allows RPC clients to receive replies directly from their RPC server, without going through a reply queue. (“Directly” here still means going through AMQP and the RabbitMQ server; there is no separate network connection between RPC client and RPC server.)

According to Spring, ‘starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request). Starting with Spring AMQP version 1.4.1 Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues. When no replyQueue is provided (or it is set with the name amq.rabbitmq.reply-to), the RabbitTemplate will automatically detect whether Direct reply-to is supported and use it, or fall back to using a temporary reply queue. When using Direct reply-to, a reply-listener is not required and should not be configured.’ We are using the latest versions of both RabbitMQ and Spring AMQP, which should support Direct reply-to.

The Voter service receives the message containing the list of candidates. The Voter service deserializes the JSON payload to Candidate objects and proceeds to generate a random number of votes for each candidate in the list. Finally, each new vote object (MongoDB document) is written back to the vote collection in the Voter service’s voters  database.

Message Queue Diagram 2D

Exploring the RPC Code

We will not examine the REST HTTP IPC code in this post. Instead, we will explore the RPC code. You are welcome to download the source code and explore the REST HTTP code pattern; it uses some advanced features of Spring Boot and Spring Data.

Spring Dependencies

In order to use RabbitMQ, we need to add a project dependency on org.springframework.boot.spring-boot-starter-amqp. Below is a snippet from the Candidate service’s build.gradle file, showing project dependencies. The Voter service’s dependencies are identical.


dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-actuator-docs'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-rest'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-hateoas'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-logging'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web'
compile group: 'org.webjars', name: 'hal-browser'
testCompile group: 'org.springframework.boot', name: 'spring-boot-starter-test'
}

view raw

build.gradle

hosted with ❤ by GitHub

AMQP Configuration

Next, we need to add a small amount of RabbitMQ AMQP configuration to both services. We accomplish this by using Spring’s @Configuration annotation on our configuration classes. Below is the configuration class for the Voter service.


package com.voterapi.voter.configuration;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class VoterConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("voter.rpc");
}
}

And here, the configuration class for the Candidate service.


package com.voterapi.candidate.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CandidateConfig {
@Bean
public Queue queue() {
return new Queue("voter.rpc.requests");
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("voter.rpc");
}
@Bean
public Binding binding(DirectExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("rpc");
}
}

Candidate Service Code

With the dependencies and configuration in place, we define the method in the Voter service, which will request the candidates from the Candidate service, using RabbitMQ. Below is an abridged version of the Voter service’s CandidateListService class, containing the getCandidatesMessageRpc method. This method calls the rabbitTemplate.convertSendAndReceive method (see line 5, below).


public List<CandidateVoterView> getCandidatesMessageRpc(String election) {
logger.debug("Sending RPC request message for list of candidates…");
String requestMessage = election;
String candidates = (String) rabbitTemplate.convertSendAndReceive(
directExchange.getName(), "rpc", requestMessage);
TypeReference<Map<String, List<CandidateVoterView>>> mapType =
new TypeReference<Map<String, List<CandidateVoterView>>>() {};
ObjectMapper objectMapper = new ObjectMapper();
Map<String, List<CandidateVoterView>> candidatesMap = null;
try {
candidatesMap = objectMapper.readValue(candidates, mapType);
} catch (IOException e) {
logger.info(String.valueOf(e));
}
List<CandidateVoterView> candidatesList = candidatesMap.get("candidates");
logger.debug("List of {} candidates received…", candidatesList.size());
return candidatesList;
}

Voter Service Code

Next, we define a method in the Candidate service, which will process the Voter service’s request. Below is an abridged version of the CandidateController class, containing the getCandidatesMessageRpc method. This method is decorated with Spring’s @RabbitListener annotation (see line 1, below). This annotation marks c to be the target of a Rabbit message listener on the voter.rpc.requests queue.

Also shown, are the getCandidatesMessageRpc method’s two helper methods, getByElection and serializeToJson. These methods query MongoDB for the list of candidates and serialize the list to JSON.


@RabbitListener(queues = "voter.rpc.requests")
private String getCandidatesMessageRpc(String requestMessage) {
logger.debug("Request message: {}", requestMessage);
logger.debug("Sending RPC response message with list of candidates…");
List<CandidateVoterView> candidates = getByElection(requestMessage);
return serializeToJson(candidates);
}
private List<CandidateVoterView> getByElection(String election) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("election").is(election)),
project("firstName", "lastName", "politicalParty", "election")
.andExpression("concat(firstName,' ', lastName)")
.as("fullName"),
sort(Sort.Direction.ASC, "lastName")
);
AggregationResults<CandidateVoterView> groupResults
= mongoTemplate.aggregate(aggregation, Candidate.class, CandidateVoterView.class);
return groupResults.getMappedResults();
}
private String serializeToJson(List<CandidateVoterView> candidates) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
final Map<String, List<CandidateVoterView>> dataMap = new HashMap<>();
dataMap.put("candidates", candidates);
try {
jsonInString = mapper.writeValueAsString(dataMap);
} catch (JsonProcessingException e) {
logger.info(String.valueOf(e));
}
logger.debug(jsonInString);
return jsonInString;
}

Demonstration

To demonstrate both the synchronous REST HTTP IPC code and the Spring AMQP-based RPC IPC code, we will make a few REST HTTP calls to the Voter API Gateway. For convenience, I have provided a shell script, demostrate_ipc.sh, which executes all the API calls necessary. I have added sleep commands to slow the output to the terminal down a bit, for easier analysis. The script requires HTTPie, a great time saver when working with RESTful services.

The demostrate_ipc.sh script does three things. First, it calls the Candidate service to generate a group of sample candidates. Next, the script calls the Voter service to simulate votes, using synchronous REST HTTP. Lastly, the script repeats the voter simulation, this time using message-based RPC IPC. All API calls are done through the Voter API Gateway on port 8080. To understand the API calls, examine the script, below.


#!/bin/sh
# Demostrate API calls for REST HTTP IPC and RPC IPC via API Gateway
# Requires HTTPie
# Requires all services are running
set -e
HOST=${1:-localhost:8080}
API_GATEWAY="http://${HOST}"
ELECTION="2016%20Presidential%20Election"
echo "Simulating candidates…"
http ${API_GATEWAY}/candidate/simulation && sleep 2
http ${API_GATEWAY}/candidate/candidates/summary/${ELECTION} && sleep 2
echo "Simulating voting using REST HTTP IPC…"
http ${API_GATEWAY}/voter/simulation/http/${ELECTION} && sleep 2
http ${API_GATEWAY}/voter/results && sleep 4
http ${API_GATEWAY}/voter/winners && sleep 2
echo "Simulating voting using message-based RPC IPC…"
http ${API_GATEWAY}/voter/simulation/rpc/${ELECTION} && sleep 2
http ${API_GATEWAY}/voter/results && sleep 4
http ${API_GATEWAY}/voter/winners && sleep 2
echo "Script completed…"

Below is the list of candidates for the 2016 Presidential Election, generated by the Candidate service. The JSON payload was retrieved using the Voter service’s /voter/candidates/rpc/{election} endpoint. This endpoint uses the same RPC IPC method as the Voter service’s /voter/simulation/rpc/{election} endpoint.


HTTP/1.1 200
Access-Control-Allow-Credentials: true
Access-Control-Allow-Headers: Content-Type, Accept, X-Requested-With, remember-me
Access-Control-Allow-Methods: POST, GET, OPTIONS, DELETE
Access-Control-Max-Age: 3600
Content-Type: application/json;charset=UTF-8
Date: Sun, 07 May 2017 19:10:22 GMT
Transfer-Encoding: chunked
X-Application-Context: Voter Service:docker-local:8099
{
"candidates": [
{
"election": "2016 Presidential Election",
"fullName": "Darrell Castle",
"politicalParty": "Constitution Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Hillary Clinton",
"politicalParty": "Democratic Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Gary Johnson",
"politicalParty": "Libertarian Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Chris Keniston",
"politicalParty": "Veterans Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Jill Stein",
"politicalParty": "Green Party"
},
{
"election": "2016 Presidential Election",
"fullName": "Donald Trump",
"politicalParty": "Republican Party"
}
]
}

Based on the list of candidates, below are the simulated election results. This JSON payload was retrieved using the Voter service’s /voter/results endpoint.