Posts Tagged Protobuf
IoT Telemetry Collection using Google Protocol Buffers, Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas
Posted by Gary A. Stafford in Big Data, Cloud, GCP, Python, Serverless, Software Development on May 21, 2019
Collect IoT sensor telemetry using Google Protocol Buffers’ serialized binary format over HTTPS, serverless Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas on GCP, as an alternative to integrated Cloud IoT platforms and standard IoT protocols. Aggregate, analyze, and build machine learning models with the data using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.
Introduction
Most of the dominant Cloud providers offer IoT (Internet of Things) and IIotT (Industrial IoT) integrated services. Amazon has AWS IoT, Microsoft Azure has multiple offering including IoT Central, IBM’s offering including IBM Watson IoT Platform, Alibaba Cloud has multiple IoT/IIoT solutions for different vertical markets, and Google offers Google Cloud IoT platform. All of these solutions are marketed as industrial-grade, highly-performant, scalable technology stacks. They are capable of scaling to tens-of-thousands of IoT devices or more and massive amounts of streaming telemetry.
In reality, not everyone needs a fully integrated IoT solution. Academic institutions, research labs, tech start-ups, and many commercial enterprises want to leverage the Cloud for IoT applications, but may not be ready for a fully-integrated IoT platform or are resistant to Cloud vendor platform lock-in.
Similarly, depending on the performance requirements and the type of application, organizations may not need or want to start out using IoT/IIOT industry standard data and transport protocols, such as MQTT (Message Queue Telemetry Transport) or CoAP (Constrained Application Protocol), over UDP (User Datagram Protocol). They may prefer to transmit telemetry over HTTP using TCP, or securely, using HTTPS (HTTP over TLS).
Demonstration
In this demonstration, we will collect environmental sensor data from a number of IoT device sensors and stream that telemetry over the Internet to Google Cloud. Each IoT device is installed in a different physical location. The devices contain a variety of common sensors, including humidity and temperature, motion, and light intensity.

Prototype IoT Devices used in this Demonstration
We will transmit the sensor telemetry data as JSON over HTTP to serverless Google Cloud Function HTTPS endpoints. We will then switch to using Google’s Protocol Buffers to transmit binary data over HTTP. We should observe a reduction in the message size contained in the request payload as we move from JSON to Protobuf, which should reduce system latency and cost.
Data received by Cloud Functions over HTTP will be published asynchronously to Google Cloud Pub/Sub. A second Cloud Function will respond to all published events and push the messages to MongoDB Atlas on GCP. Once in Atlas, we will aggregate, transform, analyze, and build machine learning models with the data, using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.
For this demonstration, the architecture for JSON over HTTP will look as follows. All sensors will transmit data to a single Cloud Function HTTPS endpoint.
For Protobuf over HTTP, the architecture will look as follows in the demonstration. Each type of sensor will transmit data to a different Cloud Function HTTPS endpoint.
Although the Cloud Functions will automatically scale horizontally to accommodate additional load created by the volume of telemetry being received, there are also other options to scale the system. For example, we could create individual pipelines of functions and topic/subscriptions for each sensor type. We could also split the telemetry data across multiple MongoDB Atlas Collections, based on sensor type, instead of a single collection. In all cases, we will still benefit from the Cloud Function’s horizontal scaling capabilities.
Source Code
All source code is all available on GitHub. Use the following command to clone the project.
git clone \ --branch master --single-branch --depth 1 --no-tags \ https://github.com/garystafford/iot-protobuf-demo.git
You will need to adjust the project’s environment variables to fit your own development and Cloud environments. All source code for this post is written in Python. It is intended for Python 3 interpreters but has been tested using Python 2 interpreters. The project’s Jupyter Notebooks can be viewed from within the project on GitHub or using the free, online Jupyter nbviewer.
Technologies
Protocol Buffers
According to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.
Each protocol buffer message is a small logical record of information, containing a series of strongly-typed name-value pairs. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto
file to generate data access classes.
Google Cloud Functions
According to Google, Cloud Functions is Google’s event-driven, serverless compute platform. Key features of Cloud Functions include automatic scaling, high-availability, fault-tolerance,
no servers to provision, manage, patch or update, only
pay while your code runs, and they easily connect and extend other cloud services. Cloud Functions natively support multiple event-types, including HTTP, Cloud Pub/Sub, Cloud Storage, and Firebase. Current language support includes Python, Go, and Node.
Google Cloud Pub/Sub
According to Google, Cloud Pub/Sub is an enterprise message-oriented middleware for the Cloud. It is a scalable, durable event ingestion and delivery system. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independent applications. Cloud Pub/Sub delivers low-latency, durable messaging that integrates with systems hosted on the Google Cloud Platform and externally.
MongoDB Atlas
MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime service-level agreements, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.
MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, free M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.
Cost Effectiveness of Cloud Functions
At true IIoT scale, Google Cloud Functions may not be the most efficient or cost-effective method of ingesting telemetry data. Based on Google’s pricing model, you get two million free function invocations per month, with each additional million invocations costing USD $0.40. The total cost also includes memory usage, total compute time, and outbound data transfer. If your system is comprised of tens or hundreds of IoT devices, Cloud Functions may prove cost-effective.
However, with thousands of devices or more, each transmitting data multiple times per minutes, you could quickly outgrow the cost-effectiveness of Google Functions. In that case, you might look to Google’s Google Cloud IoT platform. Alternately, you can build your own platform with Google products such as Knative, letting you choose to run your containers either fully managed with the newly-released Cloud Run, or in your Google Kubernetes Engine cluster with Cloud Run on GKE.
Sensor Scripts
For each sensor type, I have developed separate Python scripts, which run on each IoT device. There are two versions of each script, one for JSON over HTTP and one for Protobuf over HTTP.
JSON over HTTPS
Below we see the script, dht_sensor_http_json.py, used to transmit humidity and temperature data via JSON over HTTP to a Google Cloud Function running on GCP. The JSON request payload contains a timestamp, IoT device ID, device type, and the temperature and humidity sensor readings. The URL for the Google Cloud Function is stored as an environment variable, local to the IoT devices, and set when the script is deployed.
import json import logging import os import socket import sys import time import Adafruit_DHT import requests URL = os.environ.get('GCF_URL') JWT = os.environ.get('JWT') SENSOR = Adafruit_DHT.DHT22 TYPE = 'DHT22' PIN = 18 FREQUENCY = 15 def main(): if not URL or not JWT: sys.exit("Are the Environment Variables set?") get_sensor_data(socket.gethostname()) def get_sensor_data(device_id): while True: humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN) payload = {'device': device_id, 'type': TYPE, 'timestamp': time.time(), 'data': {'temperature': temperature, 'humidity': humidity}} post_data(payload) time.sleep(FREQUENCY) def post_data(payload): payload = json.dumps(payload) headers = { 'Content-Type': 'application/json; charset=utf-8', 'Authorization': JWT } try: requests.post(URL, json=payload, headers=headers) except requests.exceptions.ConnectionError: logging.error('Error posting data to Cloud Function!') except requests.exceptions.MissingSchema: logging.error('Error posting data to Cloud Function! Are Environment Variables set?') if __name__ == '__main__': sys.exit(main())
Telemetry Frequency
Although the sensors are capable of producing data many times per minute, for this demonstration, sensor telemetry is intentionally limited to only being transmitted every 15 seconds. To reduce system complexity, potential latency, back-pressure, and cost, in my opinion, you should only produce telemetry data at the frequency your requirements dictate.
JSON Web Tokens
For security, in addition to the HTTPS endpoints exposed by the Google Cloud Functions, I have incorporated the use of a JSON Web Token (JWT). JSON Web Tokens are an open, industry standard RFC 7519 method for representing claims securely between two parties. In this case, the JWT is used to verify the identity of the sensor scripts sending telemetry to the Cloud Functions. The JWT contains an id, password, and expiration, all encrypted with a secret key, which is known to each Cloud Function, in order to verify the IoT device’s identity. Without the correct JWT being passed in the Authorization header, the request to the Cloud Function will fail with an HTTP status code of 401 Unauthorized. Below is an example of the JWT’s payload data.
{ "sub": "IoT Protobuf Serverless Demo", "id": "iot-demo-key", "password": "t7J2gaQHCFcxMD6584XEpXyzWhZwRrNJ", "iat": 1557407124, "exp": 1564664724 }
For this demonstration, I created a temporary JWT using jwt.io. The HTTP Functions are using PyJWT
, a Python library which allows you to encode and decode the JWT. The PyJWT library allows the Function to decode and validate the JWT (Bearer Token) from the incoming request’s Authorization header. The JWT token is stored as an environment variable. Deployment instructions are included in the GitHub project.
JSON Payload
Below is a typical JSON request payload (pretty-printed), containing DHT sensor data. This particular message is 148 bytes in size. The message format is intentionally reader-friendly. We could certainly shorten the message’s key fields, to reduce the payload size by an additional 15-20 bytes.
{ "device": "rp829c7e0e", "type": "DHT22", "timestamp": 1557585090.476025, "data": { "temperature": 17.100000381469727, "humidity": 68.0999984741211 } }
Protocol Buffers
For the demonstration, I have built a Protocol Buffers file, sensors.proto
, to support the data output by three sensor types: digital humidity and temperature (DHT), passive infrared sensor (PIR), and digital light intensity (DLI). I am using the newer proto3
version of the protocol buffers language. I have created a common Protobuf sensor message schema, with the variable sensor telemetry stored in the nested data
object, within each message type.
It is important to use the correct Protobuf Scalar Value Type to maintain numeric precision in the language you compile for. For simplicity, I am using a double
to represent the timestamp, as well as the numeric humidity and temperature readings. Alternately, you could choose Google’s Protobuf WellKnownTypes
, Timestamp to store timestamp.
syntax = "proto3"; package sensors; // DHT22 message SensorDHT { string device = 1; string type = 2; double timestamp = 3; DataDHT data = 4; } message DataDHT { double temperature = 1; double humidity = 2; } // Onyehn_PIR message SensorPIR { string device = 1; string type = 2; double timestamp = 3; DataPIR data = 4; } message DataPIR { bool motion = 1; } // Anmbest_MD46N message SensorDLI { string device = 1; string type = 2; double timestamp = 3; DataDLI data = 4; } message DataDLI { bool light = 1; }
Since the sensor data will be captured with scripts written in Python 3, the Protocol Buffers file is compiled for Python, resulting in the file, sensors_pb2.py
.
protoc --python_out=. sensors.proto
Protocol Buffers over HTTPS
Below we see the alternate DHT sensor script, dht_sensor_http_pb.py, which transmits a Protocol Buffers-based binary request payload over HTTPS to a Google Cloud Function running on GCP. Note the request’s Content-Type
header has been changed from application/json
to application/x-protobuf
. In this case, instead of JSON, the same data fields are stored in an instance of the Protobuf’s SensorDHT
message type (sensors_pb2.SensorDHT()
). Note the import sensors_pb2
statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device.
import logging import os import socket import sys import time import Adafruit_DHT import requests import sensors_pb2 URL = os.environ.get('GCF_DHT_URL') JWT = os.environ.get('JWT') SENSOR = Adafruit_DHT.DHT22 TYPE = 'DHT22' PIN = 18 FREQUENCY = 15 def main(): if not URL or not JWT: sys.exit("Are the Environment Variables set?") get_sensor_data(socket.gethostname()) def get_sensor_data(device_id): while True: try: humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN) sensor_dht = sensors_pb2.SensorDHT() sensor_dht.device = device_id sensor_dht.type = TYPE sensor_dht.timestamp = time.time() sensor_dht.data.temperature = temperature sensor_dht.data.humidity = humidity payload = sensor_dht.SerializeToString() post_data(payload) time.sleep(FREQUENCY) except TypeError: logging.error('Error getting sensor data!') def post_data(payload): headers = { 'Content-Type': 'application/x-protobuf', 'Authorization': JWT } try: requests.post(URL, data=payload, headers=headers) except requests.exceptions.ConnectionError: logging.error('Error posting data to Cloud Function!') except requests.exceptions.MissingSchema: logging.error('Error posting data to Cloud Function! Are Environment Variables set?') if __name__ == '__main__': sys.exit(main())
Protobuf Binary Payload
To understand the binary Protocol Buffers-based payload, we can write a sample SensorDHT
message to a file on disk as a byte array.
message = sensorDHT.SerializeToString() binary_file_output = open("./data_binary.txt", "wb") file_byte_array = bytearray(message) binary_file_output.write(file_byte_array)
Then, using the hexdump
command, we can view a representation of the binary data file.
> hexdump -C data_binary.txt 00000000 0a 08 38 32 39 63 37 65 30 65 12 05 44 48 54 32 |..829c7e0e..DHT2| 00000010 32 1d 05 a0 b9 4e 22 0a 0d ec 51 b2 41 15 cd cc |2....N"...Q.A...| 00000020 38 42 |8B| 00000022
The binary data file size is 48 bytes on disk, as compared to the equivalent JSON file size of 148 bytes on disk (32% the size). As a test, we could then send that binary data file as the payload of a POST to the Cloud Function, as shown below using Postman. Postman will serialize the binary data file’s contents to a binary string before transmitting.
Similarly, we can serialize the same binary Protocol Buffers-based SensorDHT
message to a binary string using the SerializeToString
method.
message = sensorDHT.SerializeToString() print(message)
The resulting binary string resembles the following.
b'\n\nrp829c7e0e\x12\x05DHT22\x19c\xee\xbcg\xf5\x8e\xccA"\x12\t\x00\x00\x00\xa0\x99\x191@\x11\x00\x00\x00`f\x06Q@'
The binary string length of the serialized message, and therefore the request payload sent by Postman and received by the Cloud Function for this particular message, is 111 bytes, as compared to the JSON payload size of 148 bytes (75% the size).
Validate Protobuf Payload
To validate the data contained in the Protobuf payload is identical to the JSON payload, we can parse the payload from the serialized binary string using the Protobuf ParseFromString
method. We then convert it to JSON using the Protobuf MessageToJson
method.
message = sensorDHT.SerializeToString() message_parsed = sensors_pb2.SensorDHT() message_parsed.ParseFromString(message) print(MessageToJson(message_parsed))
The resulting JSON object is identical to the JSON payload sent using JSON over HTTPS, earlier in the demonstration.
{ "device": "rp829c7e0e", "type": "DHT22", "timestamp": 1557585090.476025, "data": { "temperature": 17.100000381469727, "humidity": 68.0999984741211 } }
Google Cloud Functions
There are a series of Google Cloud Functions, specifically four HTTP Functions, which accept the sensor data over HTTP from the IoT devices. Each function exposes an HTTPS endpoint. According to Google, you use HTTP functions when you want to invoke your function via an HTTP(S) request. To allow for HTTP semantics, HTTP function signatures accept HTTP-specific arguments.
Below, I have deployed a single function that accepts JSON sensor telemetry from all sensor types, and three functions for Protobuf, one for each sensor type: DHT, PIR, and DLI.
JSON Message Processing
Below, we see the Cloud Function, main.py, which processes the incoming JSON over HTTPS payload from all sensor types. Once the request’s JWT is validated, the JSON message payload is serialized to a byte string and sent to a common Google Cloud Pub/Sub Topic. Note the JWT secret key, id, and password, and the Google Cloud Pub/Sub Topic are all stored as environment variables, local to the Cloud Functions. In my tests, the JSON-based HTTP Functions took an average of 9–18 ms to execute successfully.
import logging import os import jwt from flask import make_response, jsonify from flask_api import status from google.cloud import pubsub_v1 TOPIC = os.environ.get('TOPIC') SECRET_KEY = os.getenv('SECRET_KEY') ID = os.getenv('ID') PASSWORD = os.getenv('PASSWORD') def incoming_message(request): if not validate_token(request): return make_response(jsonify({'success': False}), status.HTTP_401_UNAUTHORIZED, {'ContentType': 'application/json'}) request_json = request.get_json() if not request_json: return make_response(jsonify({'success': False}), status.HTTP_400_BAD_REQUEST, {'ContentType': 'application/json'}) send_message(request_json) return make_response(jsonify({'success': True}), status.HTTP_201_CREATED, {'ContentType': 'application/json'}) def validate_token(request): auth_header = request.headers.get('Authorization') if not auth_header: return False auth_token = auth_header.split(" ")[1] if not auth_token: return False try: payload = jwt.decode(auth_token, SECRET_KEY) if payload['id'] == ID and payload['password'] == PASSWORD: return True except jwt.ExpiredSignatureError: return False except jwt.InvalidTokenError: return False def send_message(message): publisher = pubsub_v1.PublisherClient() publisher.publish(topic=TOPIC, data=bytes(str(message), 'utf-8'))
The Cloud Functions are deployed to GCP using the gcloud functions deploy
CLI command (I use Jenkins to automate the deployments). I have wrapped the deploy commands into bash scripts. The script also copies over a common environment variables YAML file, consumed by the Cloud Function. Each Function has a deployment script, included in the project.
# get latest env vars file cp -f ./../env_vars_file/env.yaml . # deploy function gcloud functions deploy http_json_to_pubsub \ --runtime python37 \ --trigger-http \ --region us-central1 \ --memory 256 \ --entry-point incoming_message \ --env-vars-file env.yaml
Using a .gcloudignore
file, the gcloud functions deploy
CLI command deploys three files: the cloud function (main.py
), required Python packages file (requirements.txt
), the environment variables file (env.yaml
). Google automatically installs dependencies using the requirements.txt
file.
Protobuf Message Processing
Below, we see the Cloud Function, main.py, which processes the incoming Protobuf over HTTPS payload from DHT sensor types. Once the sensor data Protobuf message payload is received by the HTTP Function, it is deserialized to JSON and then serialized to a byte string. The byte string is then sent to a Google Cloud Pub/Sub Topic. In my tests, the Protobuf-based HTTP Functions took an average of 7–14 ms to execute successfully.
As before, note the import sensors_pb2
statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device. It is used to parse a serialized message into its original Protobuf’s SensorDHT
message type.
import logging import os import jwt import sensors_pb2 from flask import make_response, jsonify from flask_api import status from google.cloud import pubsub_v1 from google.protobuf.json_format import MessageToJson TOPIC = os.environ.get('TOPIC') SECRET_KEY = os.getenv('SECRET_KEY') ID = os.getenv('ID') PASSWORD = os.getenv('PASSWORD') def incoming_message(request): if not validate_token(request): return make_response(jsonify({'success': False}), status.HTTP_401_UNAUTHORIZED, {'ContentType': 'application/json'}) data = request.get_data() if not data: return make_response(jsonify({'success': False}), status.HTTP_400_BAD_REQUEST, {'ContentType': 'application/json'}) sensor_pb = sensors_pb2.SensorDHT() sensor_pb.ParseFromString(data) sensor_json = MessageToJson(sensor_pb) send_message(sensor_json) return make_response(jsonify({'success': True}), status.HTTP_201_CREATED, {'ContentType': 'application/json'}) def validate_token(request): auth_header = request.headers.get('Authorization') if not auth_header: return False auth_token = auth_header.split(" ")[1] if not auth_token: return False try: payload = jwt.decode(auth_token, SECRET_KEY) if payload['id'] == ID and payload['password'] == PASSWORD: return True except jwt.ExpiredSignatureError: return False except jwt.InvalidTokenError: return False def send_message(message): publisher = pubsub_v1.PublisherClient() publisher.publish(topic=TOPIC, data=bytes(message, 'utf-8'))
Cloud Pub/Sub Functions
In addition to HTTP Functions, the demonstration uses a function triggered by Google Cloud Pub/Sub Triggers. According to Google, Cloud Functions can be triggered by messages published to Cloud Pub/Sub Topics in the same GCP project as the function. The function automatically subscribes to the Topic. Below, we see that the function has automatically subscribed to iot-data-demo
Cloud Pub/Sub Topic.
Sending Telemetry to MongoDB Atlas
The common Cloud Function, triggered by messages published to Cloud Pub/Sub, then sends the messages to MongoDB Atlas. There is a minimal amount of cleanup required to re-format the Cloud Pub/Sub messages to BSON (binary JSON). Interestingly, according to bsonspec.org, BSON can be compared to binary interchange formats, like Protocol Buffers. BSON is more schema-less than Protocol Buffers, which can give it an advantage in flexibility but also a slight disadvantage in space efficiency (BSON has overhead for field names within the serialized data).
The function uses the PyMongo to connect to MongoDB Atlas. According to their website, PyMongo is a Python distribution containing tools for working with MongoDB and is the recommended way to work with MongoDB from Python.
import base64 import json import logging import os import pymongo MONGODB_CONN = os.environ.get('MONGODB_CONN') MONGODB_DB = os.environ.get('MONGODB_DB') MONGODB_COL = os.environ.get('MONGODB_COL') def read_message(event, context): message = base64.b64decode(event['data']).decode('utf-8') message = message.replace("'", '"') message = message.replace('True', 'true') message = json.loads(message) client = pymongo.MongoClient(MONGODB_CONN) db = client[MONGODB_DB] col = db[MONGODB_COL] col.insert_one(message)
The function responds to the published events and sends the messages to the MongoDB Atlas cluster, running in the same Region, us-central1, as the Cloud Functions and Pub/Sub Topic. Below, we see the current options available when provisioning an Atlas cluster.
MongoDB Atlas provides a rich, web-based UI for managing and monitoring MongoDB clusters, databases, collections, security, and performance.
Although Cloud Pub/Sub to Atlas function execution times are longer in duration than the HTTP functions, the latency is greatly reduced by locating the Cloud Pub/Sub Topic, Cloud Functions, and MongoDB Atlas cluster into the same GCP Region. Cross-region execution times were as high as 500-600 ms, while same-region execution times averaged 200-225 ms. Selecting a more performant Atlas cluster would likely result in even lower function execution times.
Aggregating Data with MongoDB Compass
MongoDB Compass is a free, convenient, desktop application for interacting with your MongoDB databases. You can view the collected sensor data, review message (document) schema, manage indexes, and build complex MongoDB aggregations.
When performing analytics or machine learning, I primarily use MongoDB Compass to preview the captured telemetry data and build aggregation pipelines. Aggregation operations process data records and returns computed results. This feature saves a ton of time, filtering and preparing data for further analysis, visualization, and machine learning with Jupyter Notebooks.
Aggregation pipelines can be directly exported to Java, Node, C#, and Python 3. The exported aggregation pipeline code can be placed directly into your Python applications and Jupyter Notebooks.
Below, the exported aggregation pipelines code from MongoDB Compass is used to load a resultset directly into a Pandas DataFrame. This particular aggregation returns time-series DHT sensor data from a specific IoT device over a 72-hour period.
DEVICE_1 = 'rp59adf374' pipeline = [ { '$match': { 'type': 'DHT22', 'device': DEVICE_1, 'timestamp': { '$gt': 1557619200, '$lt': 1557792000 } } }, { '$project': { '_id': 0, 'timestamp': 1, 'temperature': '$data.temperature', 'humidity': '$data.humidity' } }, { '$sort': { 'timestamp': 1 } } ] aggResult = iot_data.aggregate(pipeline) df1 = pd.DataFrame(list(aggResult))
MongoDB Atlas Performance
In this demonstration, from Python3-based Jupyter Notebooks, I was able to consistently query a MongoDB Atlas collection of almost 70k documents for resultsets containing 3 days (72 hours) worth of digital temperature and humidity data, roughly 10.2k documents, in an average of 825 ms. That is round trip from my local development laptop to MongoDB Atlas running on GCP, in a different geographic region.
Query times on GCP are much faster, such as when running a Notebook in JupyterLab on Google’s AI Platform, or a PySpark job with Cloud Dataproc, against Atlas. Running the same Jupyter Notebook directly on Google’s AI Platform, the same MongoDB Atlas query took an average of 450 ms versus 825 ms (1.83x faster). This was across two different GCP Regions; same Region times should be even faster.
GCP Observability
There are several choices for observing the system’s Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas. As shown above, the GCP Cloud Functions interface lets you see the individual function executions, execution times, memory usage, and active instances, over varying time intervals.
For a more detailed view of Google Cloud Functions and Google Cloud Pub/Sub, I built two custom dashboards using Stackdriver. According to Google, Stackdriver aggregates metrics, logs, and events from infrastructure, giving developers and operators a rich set of observable signals. I built a custom Stackdriver Cloud Functions dashboard (shown below) and a Cloud Pub/Sub Topics and Subscriptions dashboard.
For functions, I chose to display execution times, memory usage, the number of executions, and network egress, all in a single pane of glass, using four graphs. Below, I am using the 95th percentile average for monitoring. The 95th percentile asserts that 95% of the time, the observed values are below this amount and the remaining 5% of the time, the observed values are above that amount.
Data Analysis using Jupyter Notebooks
According to jupyter.org, the Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The widespread use of Jupyter Notebooks has grown significantly, as Big Data, AI, and ML have all experienced explosive growth.
PyCharm
JetBrains PyCharm, my favorite Python IDE, has direct integrations with Jupyter Notebooks. In fact, PyCharm’s most recent updates to the Professional Edition greatly enhanced those integrations. PyCharm offers round-trip editing in the IDE and the Jupyter Notebook web browser interface. PyCharm allows you to run and debug individual cells within the notebook. PyCharm automatically starts the Jupyter Server and appropriate kernel for the Notebook you have opened. And, one of my favorite features, PyCharm’s variable viewer tracks the current value of a variable, automatically.
Below, we see the example Analytics Notebook, included in the demonstration’s project, displayed in PyCharm 19.1.2 (Professional Edition). To effectively work with Notebooks in PyCharm really requires a full-size monitor. Working on a laptop with PyCharm’s crowded Notebook UI is workable, but certainly not as effective as on a larger monitor.
Jupyter Notebook Server
Below, we see the same Analytics Notebook, shown above in PyCharm, opened in Jupyter Notebook Server’s web-based client interface, running locally on the development workstation. The web browser-based interface also offers a rich set of features for Notebook development.
From within the Notebook, we are able to query the data from MongoDB Atlas, again using PyMongo, and load the resultsets into Panda DataFrames. As an alternative to hard-coded values and environment variables, with Notebooks, I use the python-dotenv Python package. This package allows me to place my environment variables in a common .env
file and reference them from any Notebook. The package has many options for managing environment variables.
We can then analyze the data using a number of common frameworks, including Pandas, Matplotlib, SciPy, PySpark, and NumPy, to name but a few. Below, we see time series data from four different sensors, on the same IoT device. Viewing the data together, we can study the causal effect of one environment variable on another, such as the impact of light on temperature or humidity.
Below, we can use histograms to visualize temperature frequencies for
intervals, over time, for a given device location.
Machine Learning using Jupyter Notebooks
In addition to data analytics, we can use Jupyter Notebooks with tools such as scikit-learn to build machine learning models based on our sensor telemetry. Scikit-learn is a set of machine learning tools in Python, built on NumPy, SciPy, and matplotlib. Below, I have used JupyterLab on Google’s AI Platform and scikit-learn to build several models, based on the sensor data.
Using scikit-learn, we can build models to predict such things as which IoT device generated a specific temperature and humidity reading, or the temperature and humidity, given the time of day, device location, and external environment variables, or discover anomalies in the sensor telemetry.
Scikit-learn makes it easy to construct randomized training and test datasets, to build models, using data from multiple IoT devices, as shown below.
The project includes a Jupyter Notebook that demonstrates how to build several ML models using sensor data. Examples of supervised learning algorithms used to build the classification models in this demonstration include Support Vector Machine (SVM), k-nearest neighbors (k-NN), and Random Forest Classifier.
Having data from multiple sensors, we are able to enrich the ML models by adding additional categorical (discrete) features to our training data. For example, we could look at the effect of light, motion, and time of day on temperature and humidity.
Conclusion
Hopefully, this post has demonstrated how to efficiently collect telemetry data from IoT devices using Google Protocol Buffers over HTTPS, serverless Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas, all on the Google Cloud Platform. Once captured, the telemetry data was easily aggregated and analyzed using common tools, such as MongoDB Compass and Jupyter Notebooks. Further, we used the data and tools to build machine learning models for prediction and anomaly detection.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Image: everythingpossible © 123RF.com
Istio Observability with Go, gRPC, and Protocol Buffers-based Microservices on Google Kubernetes Engine (GKE)
Posted by Gary A. Stafford in Bash Scripting, Build Automation, Client-Side Development, Cloud, Continuous Delivery, DevOps, Enterprise Software Development, GCP, Go, JavaScript, Kubernetes, Software Development on April 17, 2019
In the last two posts, Kubernetes-based Microservice Observability with Istio Service Mesh and Azure Kubernetes Service (AKS) Observability with Istio Service Mesh, we explored the observability tools which are included with Istio Service Mesh. These tools currently include Prometheus and Grafana for metric collection, monitoring, and alerting, Jaeger for distributed tracing, and Kiali for Istio service-mesh-based microservice visualization and monitoring. Combined with cloud platform-native monitoring and logging services, such as Stackdriver on GCP, CloudWatch on AWS, Azure Monitor logs on Azure, and we have a complete observability solution for modern, distributed, Cloud-based applications.
In this post, we will examine the use of Istio’s observability tools to monitor Go-based microservices that use Protocol Buffers (aka Protobuf) over gRPC (gRPC Remote Procedure Calls) and HTTP/2 for client-server communications, as opposed to the more traditional, REST-based JSON (JavaScript Object Notation) over HTTP (Hypertext Transfer Protocol). We will see how Kubernetes, Istio, Envoy, and the observability tools work seamlessly with gRPC, just as they do with JSON over HTTP, on Google Kubernetes Engine (GKE).
Technologies
gRPC
According to the gRPC project, gRPC, a CNCF incubating project, is a modern, high-performance, open-source and universal remote procedure call (RPC) framework that can run anywhere. It enables client and server applications to communicate transparently and makes it easier to build connected systems. Google, the original developer of gRPC, has used the underlying technologies and concepts in gRPC for years. The current implementation is used in several Google cloud products and Google externally facing APIs. It is also being used by Square, Netflix, CoreOS, Docker, CockroachDB, Cisco, Juniper Networks and many other organizations.
Protocol Buffers
By default, gRPC uses Protocol Buffers. According to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto
file to generate data access classes.
Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.
Protocol buffers currently support generated code in Java, Python, Objective-C, and C++, Dart, Go, Ruby, and C#. For this post, we have compiled for Go. You can read more about the binary wire format of Protobuf on Google’s Developers Portal.
Envoy Proxy
According to the Istio project, Istio uses an extended version of the Envoy proxy. Envoy is deployed as a sidecar to a relevant service in the same Kubernetes pod. Envoy, created by Lyft, is a high-performance proxy developed in C++ to mediate all inbound and outbound traffic for all services in the service mesh. Istio leverages Envoy’s many built-in features, including dynamic service discovery, load balancing, TLS termination, HTTP/2 and gRPC proxies, circuit-breakers, health checks, staged rollouts, fault injection, and rich metrics.
According to the post by Harvey Tuch of Google, Evolving a Protocol Buffer canonical API, Envoy proxy adopted Protocol Buffers, specifically proto3, as the canonical specification of for version 2 of Lyft’s gRPC-first API.
Reference Microservices Platform
In the last two posts, we explored Istio’s observability tools, using a RESTful microservices-based API platform written in Go and using JSON over HTTP for service to service communications. The API platform was comprised of eight Go-based microservices and one sample Angular 7, TypeScript-based front-end web client. The various services are dependent on MongoDB, and RabbitMQ for event queue-based communications. Below, the is JSON over HTTP-based platform architecture.
Below, the current Angular 7-based web client interface.
Converting to gRPC and Protocol Buffers
For this post, I have modified the eight Go microservices to use gRPC and Protocol Buffers, Google’s data interchange format. Specifically, the services use version 3 release (aka proto3) of Protocol Buffers. With gRPC, a gRPC client calls a gRPC server. Some of the platform’s services are gRPC servers, others are gRPC clients, while some act as both client and server, such as Service A, B, and E. The revised architecture is shown below.
gRPC Gateway
Assuming for the sake of this demonstration, that most consumers of the API would still expect to communicate using a RESTful JSON over HTTP API, I have added a gRPC Gateway reverse proxy to the platform. The gRPC Gateway is a gRPC to JSON reverse proxy, a common architectural pattern, which proxies communications between the JSON over HTTP-based clients and the gRPC-based microservices. A diagram from the grpc-gateway GitHub project site effectively demonstrates how the reverse proxy works.
Image courtesy: https://github.com/grpc-ecosystem/grpc-gateway
In the revised platform architecture diagram above, note the addition of the reverse proxy, which replaces Service A at the edge of the API. The proxy sits between the Angular-based Web UI and Service A. Also, note the communication method between services is now Protobuf over gRPC instead of JSON over HTTP. The use of Envoy Proxy (via Istio) is unchanged, as is the MongoDB Atlas-based databases and CloudAMQP RabbitMQ-based queue, which are still external to the Kubernetes cluster.
Alternatives to gRPC Gateway
As an alternative to the gRPC Gateway reverse proxy, we could convert the TypeScript-based Angular UI client to gRPC and Protocol Buffers, and continue to communicate directly with Service A as the edge service. However, this would limit other consumers of the API to rely on gRPC as opposed to JSON over HTTP, unless we also chose to expose two different endpoints, gRPC, and JSON over HTTP, another common pattern.
Demonstration
In this post’s demonstration, we will repeat the exact same installation process, outlined in the previous post, Kubernetes-based Microservice Observability with Istio Service Mesh. We will deploy the revised gRPC-based platform to GKE on GCP. You could just as easily follow Azure Kubernetes Service (AKS) Observability with Istio Service Mesh, and deploy the platform to AKS.
Source Code
All source code for this post is available on GitHub, contained in three projects. The Go-based microservices source code, all Kubernetes resources, and all deployment scripts are located in the k8s-istio-observe-backend project repository, in the new grpc
branch.
git clone \ --branch grpc --single-branch --depth 1 --no-tags \ https://github.com/garystafford/k8s-istio-observe-backend.git
The Angular-based web client source code is located in the k8s-istio-observe-frontend repository on the new grpc
branch. The source protocol buffers .proto
file and the generated code, using the protocol buffers compiler, is located in the new pb-greeting project repository. You do not need to clone either of these projects for this post’s demonstration.
All Docker images for the services, UI, and the reverse proxy are located on Docker Hub.
Code Changes
This post is not specifically about writing Go for gRPC and Protobuf. However, to better understand the observability requirements and capabilities of these technologies, compared to JSON over HTTP, it is helpful to review some of the source code.
Service A
First, compare the source code for Service A, shown below, to the original code in the previous post. The service’s code is almost completely re-written. I relied on several references for writing the code, including, Tracing gRPC with Istio, written by Neeraj Poddar of Aspen Mesh and Distributed Tracing Infrastructure with Jaeger on Kubernetes, by Masroor Hasan.
Specifically, note the following code changes to Service A:
- Import of the pb-greeting protobuf package;
- Local Greeting struct replaced with
pb.Greeting
struct; - All services are now hosted on port
50051
; - The HTTP server and all API resource handler functions are removed;
- Headers, used for distributed tracing with Jaeger, have moved from HTTP request object to metadata passed in the gRPC context object;
- Service A is coded as a gRPC server, which is called by the gRPC Gateway reverse proxy (gRPC client) via the
Greeting
function; - The primary
PingHandler
function, which returns the service’s Greeting, is replaced by the pb-greeting protobuf package’sGreeting
function; - Service A is coded as a gRPC client, calling both Service B and Service C using the
CallGrpcService
function; - CORS handling is offloaded to Istio;
- Logging methods are unchanged;
Source code for revised gRPC-based Service A (gist):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// author: Gary A. Stafford | |
// site: https://programmaticponderings.com | |
// license: MIT License | |
// purpose: Service A – gRPC/Protobuf | |
package main | |
import ( | |
"context" | |
"github.com/banzaicloud/logrus-runtime-formatter" | |
"github.com/google/uuid" | |
"github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" | |
ot "github.com/opentracing/opentracing-go" | |
log "github.com/sirupsen/logrus" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/metadata" | |
"net" | |
"os" | |
"time" | |
pb "github.com/garystafford/pb-greeting" | |
) | |
const ( | |
port = ":50051" | |
) | |
type greetingServiceServer struct { | |
} | |
var ( | |
greetings []*pb.Greeting | |
) | |
func (s *greetingServiceServer) Greeting(ctx context.Context, req *pb.GreetingRequest) (*pb.GreetingResponse, error) { | |
greetings = nil | |
tmpGreeting := pb.Greeting{ | |
Id: uuid.New().String(), | |
Service: "Service-A", | |
Message: "Hello, from Service-A!", | |
Created: time.Now().Local().String(), | |
} | |
greetings = append(greetings, &tmpGreeting) | |
CallGrpcService(ctx, "service-b:50051") | |
CallGrpcService(ctx, "service-c:50051") | |
return &pb.GreetingResponse{ | |
Greeting: greetings, | |
}, nil | |
} | |
func CallGrpcService(ctx context.Context, address string) { | |
conn, err := createGRPCConn(ctx, address) | |
if err != nil { | |
log.Fatalf("did not connect: %v", err) | |
} | |
defer conn.Close() | |
headersIn, _ := metadata.FromIncomingContext(ctx) | |
log.Infof("headersIn: %s", headersIn) | |
client := pb.NewGreetingServiceClient(conn) | |
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
ctx = metadata.NewOutgoingContext(context.Background(), headersIn) | |
defer cancel() | |
req := pb.GreetingRequest{} | |
greeting, err := client.Greeting(ctx, &req) | |
log.Info(greeting.GetGreeting()) | |
if err != nil { | |
log.Fatalf("did not connect: %v", err) | |
} | |
for _, greeting := range greeting.GetGreeting() { | |
greetings = append(greetings, greeting) | |
} | |
} | |
func createGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { | |
//https://aspenmesh.io/2018/04/tracing-grpc-with-istio/ | |
var opts []grpc.DialOption | |
opts = append(opts, grpc.WithStreamInterceptor( | |
grpc_opentracing.StreamClientInterceptor( | |
grpc_opentracing.WithTracer(ot.GlobalTracer())))) | |
opts = append(opts, grpc.WithUnaryInterceptor( | |
grpc_opentracing.UnaryClientInterceptor( | |
grpc_opentracing.WithTracer(ot.GlobalTracer())))) | |
opts = append(opts, grpc.WithInsecure()) | |
conn, err := grpc.DialContext(ctx, addr, opts…) | |
if err != nil { | |
log.Fatalf("Failed to connect to application addr: ", err) | |
return nil, err | |
} | |
return conn, nil | |
} | |
func getEnv(key, fallback string) string { | |
if value, ok := os.LookupEnv(key); ok { | |
return value | |
} | |
return fallback | |
} | |
func init() { | |
formatter := runtime.Formatter{ChildFormatter: &log.JSONFormatter{}} | |
formatter.Line = true | |
log.SetFormatter(&formatter) | |
log.SetOutput(os.Stdout) | |
level, err := log.ParseLevel(getEnv("LOG_LEVEL", "info")) | |
if err != nil { | |
log.Error(err) | |
} | |
log.SetLevel(level) | |
} | |
func main() { | |
lis, err := net.Listen("tcp", port) | |
if err != nil { | |
log.Fatalf("failed to listen: %v", err) | |
} | |
s := grpc.NewServer() | |
pb.RegisterGreetingServiceServer(s, &greetingServiceServer{}) | |
log.Fatal(s.Serve(lis)) | |
} |
Greeting Protocol Buffers
Shown below is the greeting source protocol buffers .proto
file. The greeting response struct, originally defined in the services, remains largely unchanged (gist). The UI client responses will look identical.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package greeting; | |
import "google/api/annotations.proto"; | |
message Greeting { | |
string id = 1; | |
string service = 2; | |
string message = 3; | |
string created = 4; | |
} | |
message GreetingRequest { | |
} | |
message GreetingResponse { | |
repeated Greeting greeting = 1; | |
} | |
service GreetingService { | |
rpc Greeting (GreetingRequest) returns (GreetingResponse) { | |
option (google.api.http) = { | |
get: "/api/v1/greeting" | |
}; | |
} | |
} |
When compiled with protoc
, the Go-based protocol compiler plugin, the original 27 lines of source code swells to almost 270 lines of generated data access classes that are easier to use programmatically.
# Generate gRPC stub (.pb.go) protoc -I /usr/local/include -I. \ -I ${GOPATH}/src \ -I ${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \ --go_out=plugins=grpc:. \ greeting.proto # Generate reverse-proxy (.pb.gw.go) protoc -I /usr/local/include -I. \ -I ${GOPATH}/src \ -I ${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \ --grpc-gateway_out=logtostderr=true:. \ greeting.proto # Generate swagger definitions (.swagger.json) protoc -I /usr/local/include -I. \ -I ${GOPATH}/src \ -I ${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \ --swagger_out=logtostderr=true:. \ greeting.proto
Below is a small snippet of that compiled code, for reference. The compiled code is included in the pb-greeting project on GitHub and imported into each microservice and the reverse proxy (gist). We also compile a separate version for the reverse proxy to implement.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Code generated by protoc-gen-go. DO NOT EDIT. | |
// source: greeting.proto | |
package greeting | |
import ( | |
context "context" | |
fmt "fmt" | |
proto "github.com/golang/protobuf/proto" | |
_ "google.golang.org/genproto/googleapis/api/annotations" | |
grpc "google.golang.org/grpc" | |
codes "google.golang.org/grpc/codes" | |
status "google.golang.org/grpc/status" | |
math "math" | |
) | |
// Reference imports to suppress errors if they are not otherwise used. | |
var _ = proto.Marshal | |
var _ = fmt.Errorf | |
var _ = math.Inf | |
// This is a compile-time assertion to ensure that this generated file | |
// is compatible with the proto package it is being compiled against. | |
// A compilation error at this line likely means your copy of the | |
// proto package needs to be updated. | |
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package | |
type Greeting struct { | |
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` | |
Service string `protobuf:"bytes,2,opt,name=service,proto3" json:"service,omitempty"` | |
Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` | |
Created string `protobuf:"bytes,4,opt,name=created,proto3" json:"created,omitempty"` | |
XXX_NoUnkeyedLiteral struct{} `json:"-"` | |
XXX_unrecognized []byte `json:"-"` | |
XXX_sizecache int32 `json:"-"` | |
} | |
func (m *Greeting) Reset() { *m = Greeting{} } | |
func (m *Greeting) String() string { return proto.CompactTextString(m) } | |
func (*Greeting) ProtoMessage() {} | |
func (*Greeting) Descriptor() ([]byte, []int) { | |
return fileDescriptor_6acac03ccd168a87, []int{0} | |
} | |
func (m *Greeting) XXX_Unmarshal(b []byte) error { | |
return xxx_messageInfo_Greeting.Unmarshal(m, b) | |
} | |
func (m *Greeting) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | |
return xxx_messageInfo_Greeting.Marshal(b, m, deterministic) |
Using Swagger, we can view the greeting protocol buffers’ single RESTful API resource, exposed with an HTTP GET method. I use the Docker-based version of Swagger UI for viewing protoc
generated swagger definitions.
docker run -p 8080:8080 -d --name swagger-ui \ -e SWAGGER_JSON=/tmp/greeting.swagger.json \ -v ${GOAPTH}/src/pb-greeting:/tmp swaggerapi/swagger-ui
The Angular UI makes an HTTP GET request to the /api/v1/greeting
resource, which is transformed to gRPC and proxied to Service A, where it is handled by the Greeting
function.
gRPC Gateway Reverse Proxy
As explained earlier, the gRPC Gateway reverse proxy service is completely new. Specifically, note the following code features in the gist below:
- Import of the pb-greeting protobuf package;
- The proxy is hosted on port
80
; - Request headers, used for distributed tracing with Jaeger, are collected from the incoming HTTP request and passed to Service A in the gRPC context;
- The proxy is coded as a gRPC client, which calls Service A;
- Logging is largely unchanged;
The source code for the Reverse Proxy (gist):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// author: Gary A. Stafford | |
// site: https://programmaticponderings.com | |
// license: MIT License | |
// purpose: gRPC Gateway / Reverse Proxy | |
// reference: https://github.com/grpc-ecosystem/grpc-gateway | |
package main | |
import ( | |
"context" | |
"flag" | |
lrf "github.com/banzaicloud/logrus-runtime-formatter" | |
gw "github.com/garystafford/pb-greeting" | |
"github.com/grpc-ecosystem/grpc-gateway/runtime" | |
log "github.com/sirupsen/logrus" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/metadata" | |
"net/http" | |
"os" | |
) | |
func injectHeadersIntoMetadata(ctx context.Context, req *http.Request) metadata.MD { | |
//https://aspenmesh.io/2018/04/tracing-grpc-with-istio/ | |
var ( | |
otHeaders = []string{ | |
"x-request-id", | |
"x-b3-traceid", | |
"x-b3-spanid", | |
"x-b3-parentspanid", | |
"x-b3-sampled", | |
"x-b3-flags", | |
"x-ot-span-context"} | |
) | |
var pairs []string | |
for _, h := range otHeaders { | |
if v := req.Header.Get(h); len(v) > 0 { | |
pairs = append(pairs, h, v) | |
} | |
} | |
return metadata.Pairs(pairs…) | |
} | |
type annotator func(context.Context, *http.Request) metadata.MD | |
func chainGrpcAnnotators(annotators …annotator) annotator { | |
return func(c context.Context, r *http.Request) metadata.MD { | |
var mds []metadata.MD | |
for _, a := range annotators { | |
mds = append(mds, a(c, r)) | |
} | |
return metadata.Join(mds…) | |
} | |
} | |
func run() error { | |
ctx := context.Background() | |
ctx, cancel := context.WithCancel(ctx) | |
defer cancel() | |
annotators := []annotator{injectHeadersIntoMetadata} | |
mux := runtime.NewServeMux( | |
runtime.WithMetadata(chainGrpcAnnotators(annotators…)), | |
) | |
opts := []grpc.DialOption{grpc.WithInsecure()} | |
err := gw.RegisterGreetingServiceHandlerFromEndpoint(ctx, mux, "service-a:50051", opts) | |
if err != nil { | |
return err | |
} | |
return http.ListenAndServe(":80", mux) | |
} | |
func getEnv(key, fallback string) string { | |
if value, ok := os.LookupEnv(key); ok { | |
return value | |
} | |
return fallback | |
} | |
func init() { | |
formatter := lrf.Formatter{ChildFormatter: &log.JSONFormatter{}} | |
formatter.Line = true | |
log.SetFormatter(&formatter) | |
log.SetOutput(os.Stdout) | |
level, err := log.ParseLevel(getEnv("LOG_LEVEL", "info")) | |
if err != nil { | |
log.Error(err) | |
} | |
log.SetLevel(level) | |
} | |
func main() { | |
flag.Parse() | |
if err := run(); err != nil { | |
log.Fatal(err) | |
} | |
} |
Below, in the Stackdriver logs, we see an example of a set of HTTP request headers in the JSON payload, which are propagated upstream to gRPC-based Go services from the gRPC Gateway’s reverse proxy. Header propagation ensures the request produces a complete distributed trace across the complete service call chain.
Istio VirtualService and CORS
According to feedback in the project’s GitHub Issues, the gRPC Gateway does not directly support Cross-Origin Resource Sharing (CORS) policy. In my own experience, the gRPC Gateway cannot handle OPTIONS HTTP method requests, which must be issued by the Angular 7 web UI. Therefore, I have offloaded CORS responsibility to Istio, using the VirtualService resource’s CorsPolicy configuration. This makes CORS much easier to manage than coding CORS configuration into service code (gist):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: service-rev-proxy | |
spec: | |
hosts: | |
– api.dev.example-api.com | |
gateways: | |
– demo-gateway | |
http: | |
– match: | |
– uri: | |
prefix: / | |
route: | |
– destination: | |
port: | |
number: 80 | |
host: service-rev-proxy.dev.svc.cluster.local | |
weight: 100 | |
corsPolicy: | |
allowOrigin: | |
– "*" | |
allowMethods: | |
– OPTIONS | |
– GET | |
allowCredentials: true | |
allowHeaders: | |
– "*" |
Set-up and Installation
To deploy the microservices platform to GKE, follow the detailed instructions in part one of the post, Kubernetes-based Microservice Observability with Istio Service Mesh: Part 1, or Azure Kubernetes Service (AKS) Observability with Istio Service Mesh for AKS.
- Create the external MongoDB Atlas database and CloudAMQP RabbitMQ clusters;
- Modify the Kubernetes resource files and bash scripts for your own environments;
- Create the managed GKE or AKS cluster on GCP or Azure;
- Configure and deploy Istio to the managed Kubernetes cluster, using Helm;
- Create DNS records for the platform’s exposed resources;
- Deploy the Go-based microservices, gRPC Gateway reverse proxy, Angular UI, and associated resources to Kubernetes cluster;
- Test and troubleshoot the platform deployment;
- Observe the results;
The Three Pillars
As introduced in the first post, logs, metrics, and traces are often known as the three pillars of observability. These are the external outputs of the system, which we may observe. As modern distributed systems grow ever more complex, the ability to observe those systems demands equally modern tooling that was designed with this level of complexity in mind. Traditional logging and monitoring systems often struggle with today’s hybrid and multi-cloud, polyglot language-based, event-driven, container-based and serverless, infinitely-scalable, ephemeral-compute platforms.
Tools like Istio Service Mesh attempt to solve the observability challenge by offering native integrations with several best-of-breed, open-source telemetry tools. Istio’s integrations include Jaeger for distributed tracing, Kiali for Istio service mesh-based microservice visualization and monitoring, and Prometheus and Grafana for metric collection, monitoring, and alerting. Combined with cloud platform-native monitoring and logging services, such as Stackdriver for GKE, CloudWatch for Amazon’s EKS, or Azure Monitor logs for AKS, and we have a complete observability solution for modern, distributed, Cloud-based applications.
Pillar 1: Logging
Moving from JSON over HTTP to gRPC does not require any changes to the logging configuration of the Go-based service code or Kubernetes resources.
Stackdriver with Logrus
As detailed in part two of the last post, Kubernetes-based Microservice Observability with Istio Service Mesh, our logging strategy for the eight Go-based microservices and the reverse proxy continues to be the use of Logrus, the popular structured logger for Go, and Banzai Cloud’s logrus-runtime-formatter.
If you recall, the Banzai formatter automatically tags log messages with runtime/stack information, including function name and line number; extremely helpful when troubleshooting. We are also using Logrus’ JSON formatter. Below, in the Stackdriver console, note how each log entry below has the JSON payload contained within the message with the log level, function name, lines on which the log entry originated, and the message.
Below, we see the details of a specific log entry’s JSON payload. In this case, we can see the request headers propagated from the downstream service.
Pillar 2: Metrics
Moving from JSON over HTTP to gRPC does not require any changes to the metrics configuration of the Go-based service code or Kubernetes resources.
Prometheus
Prometheus is a completely open source and community-driven systems monitoring and alerting toolkit originally built at SoundCloud, circa 2012. Interestingly, Prometheus joined the Cloud Native Computing Foundation (CNCF) in 2016 as the second hosted-project, after Kubernetes.
Grafana
Grafana describes itself as the leading open source software for time series analytics. According to Grafana Labs, Grafana allows you to query, visualize, alert on, and understand your metrics no matter where they are stored. You can easily create, explore, and share visually-rich, data-driven dashboards. Grafana allows users to visually define alert rules for your most important metrics. Grafana will continuously evaluate rules and can send notifications.
According to Istio, the Grafana add-on is a pre-configured instance of Grafana. The Grafana Docker base image has been modified to start with both a Prometheus data source and the Istio Dashboard installed. Below, we see two of the pre-configured dashboards, the Istio Mesh Dashboard and the Istio Performance Dashboard.
Pillar 3: Traces
Moving from JSON over HTTP to gRPC did require a complete re-write of the tracing logic in the service code. In fact, I spent the majority of my time ensuring the correct headers were propagated from the Istio Ingress Gateway to the gRPC Gateway reverse proxy, to Service A in the gRPC context, and upstream to all the dependent, gRPC-based services. I am sure there are a number of optimization in my current code, regarding the correct handling of traces and how this information is propagated across the service call stack.
Jaeger
According to their website, Jaeger, inspired by Dapper and OpenZipkin, is a distributed tracing system released as open source by Uber Technologies. It is used for monitoring and troubleshooting microservices-based distributed systems, including distributed context propagation, distributed transaction monitoring, root cause analysis, service dependency analysis, and performance and latency optimization. The Jaeger website contains an excellent overview of Jaeger’s architecture and general tracing-related terminology.
Below we see the Jaeger UI Traces View. In it, we see a series of traces generated by hey, a modern load generator and benchmarking tool, and a worthy replacement for Apache Bench (ab
). Unlike ab
, hey
supports HTTP/2. The use of hey
was detailed in the previous post.
A trace, as you might recall, is an execution path through the system and can be thought of as a directed acyclic graph (DAG) of spans. If you have worked with systems like Apache Spark, you are probably already familiar with DAGs.
Below we see the Jaeger UI Trace Detail View. The example trace contains 16 spans, which encompasses nine components – seven of the eight Go-based services, the reverse proxy, and the Istio Ingress Gateway. The trace and the spans each have timings. The root span in the trace is the Istio Ingress Gateway. In this demo, traces do not span the RabbitMQ message queues. This means you would not see a trace which includes the decoupled, message-based communications between Service D to Service F, via the RabbitMQ.
Within the Jaeger UI Trace Detail View, you also have the ability to drill into a single span, which contains additional metadata. Metadata includes the URL being called, HTTP method, response status, and several other headers.
Microservice Observability
Moving from JSON over HTTP to gRPC does not require any changes to the Kiali configuration of the Go-based service code or Kubernetes resources.
Kiali
According to their website, Kiali provides answers to the questions: What are the microservices in my Istio service mesh, and how are they connected? Kiali works with Istio, in OpenShift or Kubernetes, to visualize the service mesh topology, to provide visibility into features like circuit breakers, request rates and more. It offers insights about the mesh components at different levels, from abstract Applications to Services and Workloads.
The Graph View in the Kiali UI is a visual representation of the components running in the Istio service mesh. Below, filtering on the cluster’s dev
Namespace, we should observe that Kiali has mapped all components in the platform, along with rich metadata, such as their version and communication protocols.
Using Kiali, we can confirm our service-to-service IPC protocol is now gRPC instead of the previous HTTP.
Conclusion
Although converting from JSON over HTTP to protocol buffers with gRPC required major code changes to the services, it did not impact the high-level observability we have of those services using the tools provided by Istio, including Prometheus, Grafana, Jaeger, and Kiali.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.