Archive for category Python

IoT Telemetry Collection using Google Protocol Buffers, Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas

Business team meeting. Photo professional investor working new s
Collect IoT sensor telemetry using Google Protocol Buffers’ serialized binary format over HTTPS, serverless Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas on GCP, as an alternative to integrated Cloud IoT platforms and standard IoT protocols. Aggregate, analyze, and build machine learning models with the data using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

Introduction

Most of the dominant Cloud providers offer IoT (Internet of Things) and IIotT  (Industrial IoT) integrated services. Amazon has AWS IoT, Microsoft Azure has multiple offering including IoT Central, IBM’s offering including IBM Watson IoT Platform, Alibaba Cloud has multiple IoT/IIoT solutions for different vertical markets, and Google offers Google Cloud IoT platform. All of these solutions are marketed as industrial-grade, highly-performant, scalable technology stacks. They are capable of scaling to tens-of-thousands of IoT devices or more and massive amounts of streaming telemetry.

In reality, not everyone needs a fully integrated IoT solution. Academic institutions, research labs, tech start-ups, and many commercial enterprises want to leverage the Cloud for IoT applications, but may not be ready for a fully-integrated IoT platform or are resistant to Cloud vendor platform lock-in.

Similarly, depending on the performance requirements and the type of application, organizations may not need or want to start out using IoT/IIOT industry standard data and transport protocols, such as MQTT (Message Queue Telemetry Transport) or CoAP (Constrained Application Protocol), over UDP (User Datagram Protocol). They may prefer to transmit telemetry over HTTP using TCP, or securely, using HTTPS (HTTP over TLS).

Demonstration

In this demonstration, we will collect environmental sensor data from a number of IoT device sensors and stream that telemetry over the Internet to Google Cloud. Each IoT device is installed in a different physical location. The devices contain a variety of common sensors, including humidity and temperature, motion, and light intensity.

iot_3.jpg

Prototype IoT Devices used in this Demonstration

We will transmit the sensor telemetry data as JSON over HTTP to serverless Google Cloud Function HTTPS endpoints. We will then switch to using Google’s Protocol Buffers to transmit binary data over HTTP. We should observe a reduction in the message size contained in the request payload as we move from JSON to Protobuf, which should reduce system latency and cost.

Data received by Cloud Functions over HTTP will be published asynchronously to Google Cloud Pub/Sub. A second Cloud Function will respond to all published events and push the messages to MongoDB Atlas on GCP. Once in Atlas, we will aggregate, transform, analyze, and build machine learning models with the data, using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

For this demonstration, the architecture for JSON over HTTP will look as follows. All sensors will transmit data to a single Cloud Function HTTPS endpoint.

JSON IoT Basic Icons.png

For Protobuf over HTTP, the architecture will look as follows in the demonstration. Each type of sensor will transmit data to a different Cloud Function HTTPS endpoint.

Demo IoT Diagram Icons.png

Although the Cloud Functions will automatically scale horizontally to accommodate additional load created by the volume of telemetry being received, there are also other options to scale the system. For example, we could create individual pipelines of functions and topic/subscriptions for each sensor type. We could also split the telemetry data across multiple MongoDB Atlas Collections, based on sensor type, instead of a single collection. In all cases, we will still benefit from the Cloud Function’s horizontal scaling capabilities.

Complex IoT Diagram Icons.png

Source Code

All source code is all available on GitHub. Use the following command to clone the project.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/iot-protobuf-demo.git

You will need to adjust the project’s environment variables to fit your own development and Cloud environments. All source code for this post is written in Python. It is intended for Python 3 interpreters but has been tested using Python 2 interpreters. The project’s Jupyter Notebooks can be viewed from within the project on GitHub or using the free, online Jupyter nbviewer.

screen_shot_2019-05-21_at_12_55_25_pm

Technologies

Protocol Buffers

Image result for google developerAccording to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.

Each protocol buffer message is a small logical record of information, containing a series of strongly-typed name-value pairs. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto file to generate data access classes.

Google Cloud Functions

Cloud-Functions.png

According to Google, Cloud Functions is Google’s event-driven, serverless compute platform. Key features of Cloud Functions include automatic scaling, high-availability, fault-tolerance,
no servers to provision, manage, patch or update, only
pay while your code runs, and they easily connect and extend other cloud services. Cloud Functions natively support multiple event-types, including HTTP, Cloud Pub/Sub, Cloud Storage, and Firebase. Current language support includes Python, Go, and Node.

Google Cloud Pub/Sub

google pub-subAccording to Google, Cloud Pub/Sub is an enterprise message-oriented middleware for the Cloud. It is a scalable, durable event ingestion and delivery system. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independent applications. Cloud Pub/Sub delivers low-latency, durable messaging that integrates with systems hosted on the Google Cloud Platform and externally.

MongoDB Atlas

mongodbMongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime service-level agreements, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, free M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

Cost Effectiveness of Cloud Functions

At true IIoT scale, Google Cloud Functions may not be the most efficient or cost-effective method of ingesting telemetry data. Based on Google’s pricing model, you get two million free function invocations per month, with each additional million invocations costing USD $0.40. The total cost also includes memory usage, total compute time, and outbound data transfer. If your system is comprised of tens or hundreds of IoT devices, Cloud Functions may prove cost-effective.

However, with thousands of devices or more, each transmitting data multiple times per minutes, you could quickly outgrow the cost-effectiveness of Google Functions. In that case, you might look to Google’s Google Cloud IoT platform. Alternately, you can build your own platform with Google products such as Knative, letting you choose to run your containers either fully managed with the newly-released Cloud Run, or in your Google Kubernetes Engine cluster with Cloud Run on GKE.

Sensor Scripts

For each sensor type, I have developed separate Python scripts, which run on each IoT device. There are two versions of each script, one for JSON over HTTP and one for Protobuf over HTTP.

JSON over HTTPS

Below we see the script, dht_sensor_http_json.py, used to transmit humidity and temperature data via JSON over HTTP to a Google Cloud Function running on GCP. The JSON request payload contains a timestamp, IoT device ID, device type, and the temperature and humidity sensor readings. The URL for the Google Cloud Function is stored as an environment variable, local to the IoT devices, and set when the script is deployed.

import json
import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests

URL = os.environ.get('GCF_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)
        payload = {'device': device_id,
                   'type': TYPE,
                   'timestamp': time.time(),
                   'data': {'temperature': temperature,
                            'humidity': humidity}}
        post_data(payload)
        time.sleep(FREQUENCY)


def post_data(payload):
    payload = json.dumps(payload)

    headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': JWT
    }

    try:
        requests.post(URL, json=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Telemetry Frequency

Although the sensors are capable of producing data many times per minute, for this demonstration, sensor telemetry is intentionally limited to only being transmitted every 15 seconds. To reduce system complexity, potential latency, back-pressure, and cost, in my opinion, you should only produce telemetry data at the frequency your requirements dictate.

JSON Web Tokens

For security, in addition to the HTTPS endpoints exposed by the Google Cloud Functions, I have incorporated the use of a JSON Web Token (JWT). JSON Web Tokens are an open, industry standard RFC 7519 method for representing claims securely between two parties. In this case, the JWT is used to verify the identity of the sensor scripts sending telemetry to the Cloud Functions. The JWT contains an id, password, and expiration, all encrypted with a secret key, which is known to each Cloud Function, in order to verify the IoT device’s identity. Without the correct JWT being passed in the Authorization header, the request to the Cloud Function will fail with an HTTP status code of 401 Unauthorized. Below is an example of the JWT’s payload data.

{
  "sub": "IoT Protobuf Serverless Demo",
  "id": "iot-demo-key",
  "password": "t7J2gaQHCFcxMD6584XEpXyzWhZwRrNJ",
  "iat": 1557407124,
  "exp": 1564664724
}

For this demonstration, I created a temporary JWT using jwt.io. The HTTP Functions are using PyJWT, a Python library which allows you to encode and decode the JWT. The PyJWT library allows the Function to decode and validate the JWT (Bearer Token) from the incoming request’s Authorization header. The JWT token is stored as an environment variable. Deployment instructions are included in the GitHub project.

screen_shot_2019-05-09_at_5_13_28_pm

JSON Payload

Below is a typical JSON request payload (pretty-printed), containing DHT sensor data. This particular message is 148 bytes in size. The message format is intentionally reader-friendly. We could certainly shorten the message’s key fields, to reduce the payload size by an additional 15-20 bytes.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Protocol Buffers

For the demonstration, I have built a Protocol Buffers file, sensors.proto, to support the data output by three sensor types: digital humidity and temperature (DHT), passive infrared sensor (PIR), and digital light intensity (DLI). I am using the newer proto3 version of the protocol buffers language. I have created a common Protobuf sensor message schema, with the variable sensor telemetry stored in the nested data object, within each message type.

It is important to use the correct Protobuf Scalar Value Type to maintain numeric precision in the language you compile for. For simplicity, I am using a double to represent the timestamp, as well as the numeric humidity and temperature readings. Alternately, you could choose Google’s Protobuf WellKnownTypesTimestamp to store timestamp.

syntax = "proto3";

package sensors;

// DHT22
message SensorDHT {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDHT data = 4;
}

message DataDHT {
    double temperature = 1;
    double humidity = 2;
}

// Onyehn_PIR
message SensorPIR {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataPIR data = 4;
}

message DataPIR {
    bool motion = 1;
}

// Anmbest_MD46N
message SensorDLI {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDLI data = 4;
}

message DataDLI {
    bool light = 1;
}

Since the sensor data will be captured with scripts written in Python 3, the Protocol Buffers file is compiled for Python, resulting in the file, sensors_pb2.py.

protoc --python_out=. sensors.proto

Protocol Buffers over HTTPS

Below we see the alternate DHT sensor script, dht_sensor_http_pb.py, which transmits a Protocol Buffers-based binary request payload over HTTPS to a Google Cloud Function running on GCP. Note the request’s Content-Type header has been changed from application/json to application/x-protobuf. In this case, instead of JSON, the same data fields are stored in an instance of the Protobuf’s SensorDHT message type (sensors_pb2.SensorDHT()). Note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device.

import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests
import sensors_pb2

URL = os.environ.get('GCF_DHT_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        try:
            humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)

            sensor_dht = sensors_pb2.SensorDHT()
            sensor_dht.device = device_id
            sensor_dht.type = TYPE
            sensor_dht.timestamp = time.time()
            sensor_dht.data.temperature = temperature
            sensor_dht.data.humidity = humidity

            payload = sensor_dht.SerializeToString()

            post_data(payload)
            time.sleep(FREQUENCY)
        except TypeError:
            logging.error('Error getting sensor data!')


def post_data(payload):
    headers = {
        'Content-Type': 'application/x-protobuf',
        'Authorization': JWT
    }

    try:
        requests.post(URL, data=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Protobuf Binary Payload

To understand the binary Protocol Buffers-based payload, we can write a sample SensorDHT message to a file on disk as a byte array.

message = sensorDHT.SerializeToString()

binary_file_output = open("./data_binary.txt", "wb")
file_byte_array = bytearray(message)
binary_file_output.write(file_byte_array)

Then, using the hexdump command, we can view a representation of the binary data file.

> hexdump -C data_binary.txt
00000000  0a 08 38 32 39 63 37 65  30 65 12 05 44 48 54 32  |..829c7e0e..DHT2|
00000010  32 1d 05 a0 b9 4e 22 0a  0d ec 51 b2 41 15 cd cc  |2....N"...Q.A...|
00000020  38 42                                             |8B|
00000022

The binary data file size is 48 bytes on disk, as compared to the equivalent JSON file size of 148 bytes on disk (32% the size). As a test, we could then send that binary data file as the payload of a POST to the Cloud Function, as shown below using Postman. Postman will serialize the binary data file’s contents to a binary string before transmitting.

screen_shot_2019-05-14_at_7_00_39_am.png

Similarly, we can serialize the same binary Protocol Buffers-based SensorDHT message to a binary string using the SerializeToString method.

message = sensorDHT.SerializeToString()
print(message)

The resulting binary string resembles the following.

b'\n\nrp829c7e0e\x12\x05DHT22\x19c\xee\xbcg\xf5\x8e\xccA"\x12\t\x00\x00\x00\xa0\x99\x191@\x11\x00\x00\x00`f\x06Q@'

The binary string length of the serialized message, and therefore the request payload sent by Postman and received by the Cloud Function for this particular message, is 111 bytes, as compared to the JSON payload size of 148 bytes (75% the size).

Validate Protobuf Payload

To validate the data contained in the Protobuf payload is identical to the JSON payload, we can parse the payload from the serialized binary string using the Protobuf ParseFromString method. We then convert it to JSON using the Protobuf MessageToJson method.

message = sensorDHT.SerializeToString() 
message_parsed = sensors_pb2.SensorDHT()
message_parsed.ParseFromString(message)
print(MessageToJson(message_parsed))

The resulting JSON object is identical to the JSON payload sent using JSON over HTTPS, earlier in the demonstration.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Google Cloud Functions

There are a series of Google Cloud Functions, specifically four HTTP Functions, which accept the sensor data over HTTP from the IoT devices. Each function exposes an HTTPS endpoint. According to Google, you use HTTP functions when you want to invoke your function via an HTTP(S) request. To allow for HTTP semantics, HTTP function signatures accept HTTP-specific arguments.

Below, I have deployed a single function that accepts JSON sensor telemetry from all sensor types, and three functions for Protobuf, one for each sensor type: DHT, PIR, and DLI.

screen-shot-2019-05-13-at-8_34_49-pm

JSON Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming JSON over HTTPS payload from all sensor types. Once the request’s JWT is validated, the JSON message payload is serialized to a byte string and sent to a common Google Cloud Pub/Sub Topic. Note the JWT secret key, id, and password, and the Google Cloud Pub/Sub Topic are all stored as environment variables, local to the Cloud Functions. In my tests, the JSON-based HTTP Functions took an average of 9–18 ms to execute successfully.

import logging
import os

import jwt
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    request_json = request.get_json()
    if not request_json:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    send_message(request_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, 
                      data=bytes(str(message), 'utf-8'))

The Cloud Functions are deployed to GCP using the gcloud functions deploy CLI command (I use Jenkins to automate the deployments). I have wrapped the deploy commands into bash scripts. The script also copies over a common environment variables YAML file, consumed by the Cloud Function. Each Function has a deployment script, included in the project.

# get latest env vars file
cp -f ./../env_vars_file/env.yaml .

# deploy function
gcloud functions deploy http_json_to_pubsub \
  --runtime python37 \
  --trigger-http \
  --region us-central1 \
  --memory 256 \
  --entry-point incoming_message \
  --env-vars-file env.yaml

Using a .gcloudignore file, the gcloud functions deploy CLI command deploys three files: the cloud function (main.py), required Python packages file (requirements.txt), the environment variables file (env.yaml). Google automatically installs dependencies using the requirements.txt file.

Protobuf Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming Protobuf over HTTPS payload from DHT sensor types. Once the sensor data Protobuf message payload is received by the HTTP Function, it is deserialized to JSON and then serialized to a byte string. The byte string is then sent to a Google Cloud Pub/Sub Topic. In my tests, the Protobuf-based HTTP Functions took an average of 7–14 ms to execute successfully.

As before, note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device. It is used to parse a serialized message into its original Protobuf’s SensorDHT message type.

import logging
import os

import jwt
import sensors_pb2
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1
from google.protobuf.json_format import MessageToJson

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    data = request.get_data()
    if not data:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    sensor_pb = sensors_pb2.SensorDHT()
    sensor_pb.ParseFromString(data)
    sensor_json = MessageToJson(sensor_pb)
    send_message(sensor_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, data=bytes(message, 'utf-8'))

Cloud Pub/Sub Functions

In addition to HTTP Functions, the demonstration uses a function triggered by Google Cloud Pub/Sub Triggers. According to Google, Cloud Functions can be triggered by messages published to Cloud Pub/Sub Topics in the same GCP project as the function. The function automatically subscribes to the Topic. Below, we see that the function has automatically subscribed to iot-data-demo Cloud Pub/Sub Topic.

screen_shot_2019-05-09_at_2_41_17_pm

Sending Telemetry to MongoDB Atlas

The common Cloud Function, triggered by messages published to Cloud Pub/Sub, then sends the messages to MongoDB Atlas. There is a minimal amount of cleanup required to re-format the Cloud Pub/Sub messages to BSON (binary JSON). Interestingly, according to bsonspec.org, BSON can be com­pared to bin­ary inter­change for­mats, like Proto­col Buf­fers. BSON is more schema-less than Proto­col Buf­fers, which can give it an ad­vant­age in flex­ib­il­ity but also a slight dis­ad­vant­age in space ef­fi­ciency (BSON has over­head for field names with­in the seri­al­ized data).

The function uses the PyMongo to connect to MongoDB Atlas. According to their website, PyMongo is a Python distribution containing tools for working with MongoDB and is the recommended way to work with MongoDB from Python.

import base64
import json
import logging
import os
import pymongo

MONGODB_CONN = os.environ.get('MONGODB_CONN')
MONGODB_DB = os.environ.get('MONGODB_DB')
MONGODB_COL = os.environ.get('MONGODB_COL')


def read_message(event, context):
    message = base64.b64decode(event['data']).decode('utf-8')
    message = message.replace("'", '"')
    message = message.replace('True', 'true')
    message = json.loads(message)

    client = pymongo.MongoClient(MONGODB_CONN)
    db = client[MONGODB_DB]
    col = db[MONGODB_COL]
    col.insert_one(message)

The function responds to the published events and sends the messages to the MongoDB Atlas cluster, running in the same Region, us-central1, as the Cloud Functions and Pub/Sub Topic. Below, we see the current options available when provisioning an Atlas cluster.

screen_shot_2019-05-09_at_6_17_18_pm

MongoDB Atlas provides a rich, web-based UI for managing and monitoring MongoDB clusters, databases, collections, security, and performance.

screen_shot_2019-05-10_at_10_08_14_pm

Although Cloud Pub/Sub to Atlas function execution times are longer in duration than the HTTP functions, the latency is greatly reduced by locating the Cloud Pub/Sub Topic, Cloud Functions, and MongoDB Atlas cluster into the same GCP Region. Cross-region execution times were as high as 500-600 ms, while same-region execution times averaged 200-225 ms. Selecting a more performant Atlas cluster would likely result in even lower function execution times.

screen_shot_2019-05-10_at_10_16_49_pm

Aggregating Data with MongoDB Compass

MongoDB Compass is a free, convenient, desktop application for interacting with your MongoDB databases. You can view the collected sensor data, review message (document) schema, manage indexes, and build complex MongoDB aggregations.

screen_shot_2019-05-14_at_5_19_17_pm

screen_shot_2019-05-21_at_1_17_40_pm.pngscreen_shot_2019-05-21_at_1_15_09_pm

When performing analytics or machine learning, I primarily use MongoDB Compass to preview the captured telemetry data and build aggregation pipelines. Aggregation operations process data records and returns computed results. This feature saves a ton of time, filtering and preparing data for further analysis, visualization, and machine learning with Jupyter Notebooks.

screen_shot_2019-05-14_at_5_22_58_pm

Aggregation pipelines can be directly exported to Java, Node, C#, and Python 3. The exported aggregation pipeline code can be placed directly into your Python applications and Jupyter Notebooks.

screen_shot_2019-05-14_at_5_23_39_pm

Below, the exported aggregation pipelines code from MongoDB Compass is used to load a resultset directly into a Pandas DataFrame. This particular aggregation returns time-series DHT sensor data from a specific IoT device over a 72-hour period.

DEVICE_1 = 'rp59adf374'
pipeline = [
    {
        '$match': {
            'type': 'DHT22', 
            'device': DEVICE_1, 
            'timestamp': {
                '$gt': 1557619200,
                '$lt': 1557792000
            }
        }
    }, {
        '$project': {
            '_id': 0,
            'timestamp': 1, 
            'temperature': '$data.temperature', 
            'humidity': '$data.humidity'
        }
    }, {
        '$sort': {
            'timestamp': 1
        }
    }
]
aggResult = iot_data.aggregate(pipeline)
df1 = pd.DataFrame(list(aggResult))

MongoDB Atlas Performance

In this demonstration, from Python3-based Jupyter Notebooks, I was able to consistently query a MongoDB Atlas collection of almost 70k documents for resultsets containing 3 days (72 hours) worth of digital temperature and humidity data, roughly 10.2k documents, in an average of 825 ms. That is round trip from my local development laptop to MongoDB Atlas running on GCP, in a different geographic region.

Query times on GCP are much faster, such as when running a Notebook in JupyterLab on Google’s AI Platform, or a PySpark job with Cloud Dataproc, against Atlas. Running the same Jupyter Notebook directly on Google’s AI Platform, the same MongoDB Atlas query took an average of 450 ms versus 825 ms (1.83x faster). This was across two different GCP Regions; same Region times should be even faster.

screen-shot-2019-05-13-at-9_09_52-pm

GCP Observability

There are several choices for observing the system’s Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas. As shown above, the GCP Cloud Functions interface lets you see the individual function executions, execution times, memory usage, and active instances, over varying time intervals.

For a more detailed view of Google Cloud Functions and Google Cloud Pub/Sub, I built two custom dashboards using Stackdriver. According to Google, Stackdriver aggregates metrics, logs, and events from infrastructure, giving developers and operators a rich set of observable signals. I built a custom Stackdriver Cloud Functions dashboard (shown below) and a Cloud Pub/Sub Topics and Subscriptions dashboard.

For functions, I chose to display execution times, memory usage, the number of executions, and network egress, all in a single pane of glass, using four graphs. Below, I am using the 95th percentile average for monitoring. The 95th percentile asserts that 95% of the time, the observed values are below this amount and the remaining 5% of the time, the observed values are above that amount.

screen_shot_2019-05-10_at_10_13_37_pm

Data Analysis using Jupyter Notebooks

According to jupyter.org, the Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The widespread use of Jupyter Notebooks has grown significantly, as Big Data, AI, and ML have all experienced explosive growth.

PyCharm

JetBrains PyCharm, my favorite Python IDE, has direct integrations with Jupyter Notebooks. In fact, PyCharm’s most recent updates to the Professional Edition greatly enhanced those integrations. PyCharm offers round-trip editing in the IDE and the Jupyter Notebook web browser interface. PyCharm allows you to run and debug individual cells within the notebook. PyCharm automatically starts the Jupyter Server and appropriate kernel for the Notebook you have opened. And, one of my favorite features, PyCharm’s variable viewer tracks the current value of a variable, automatically.

screen_shot_2019-05-10_at_10_38_42_am

Below, we see the example Analytics Notebook, included in the demonstration’s project, displayed in PyCharm 19.1.2 (Professional Edition). To effectively work with Notebooks in PyCharm really requires a full-size monitor. Working on a laptop with PyCharm’s crowded Notebook UI is workable, but certainly not as effective as on a larger monitor.

screen_shot_2019-05-20_at_2_23_46_pm.png

Jupyter Notebook Server

Below, we see the same Analytics Notebook, shown above in PyCharm, opened in Jupyter Notebook Server’s web-based client interface, running locally on the development workstation. The web browser-based interface also offers a rich set of features for Notebook development.

From within the Notebook, we are able to query the data from MongoDB Atlas, again using PyMongo, and load the resultsets into Panda DataFrames. As an alternative to hard-coded values and environment variables, with Notebooks, I use the python-dotenv Python package. This package allows me to place my environment variables in a common .env file and reference them from any Notebook. The package has many options for managing environment variables.

screen_shot_2019-05-19_at_9_46_32_am.png

We can then analyze the data using a number of common frameworks, including PandasMatplotlib, SciPy, PySpark, and NumPy, to name but a few. Below, we see time series data from four different sensors, on the same IoT device. Viewing the data together, we can study the causal effect of one environment variable on another, such as the impact of light on temperature or humidity.

screen_shot_2019-05-23_at_5_25_44_pm

Below, we can use histograms to visualize temperature frequencies for
intervals, over time, for a given device location.

screen-shot-2019-05-13-at-9_13_35-pm

Machine Learning using Jupyter Notebooks

In addition to data analytics, we can use Jupyter Notebooks with tools such as scikit-learn to build machine learning models based on our sensor telemetry.  Scikit-learn is a set of machine learning tools in Python, built on NumPy, SciPy, and matplotlib. Below, I have used JupyterLab on Google’s AI Platform and scikit-learn to build several models, based on the sensor data.

screen_shot_2019-05-19_at_12_25_45_pm

screen_shot_2019-05-19_at_12_26_45_pm

Using scikit-learn, we can build models to predict such things as which IoT device generated a specific temperature and humidity reading, or the temperature and humidity, given the time of day, device location, and external environment variables, or discover anomalies in the sensor telemetry.

Scikit-learn makes it easy to construct randomized training and test datasets, to build models, using data from multiple IoT devices, as shown below.

screen_shot_2019-05-19_at_12_27_39_pm

The project includes a Jupyter Notebook that demonstrates how to build several ML models using sensor data. Examples of supervised learning algorithms used to build the classification models in this demonstration include Support Vector Machine (SVM), k-nearest neighbors (k-NN), and Random Forest Classifier.

screen_shot_2019-05-19_at_12_30_38_pm

Having data from multiple sensors, we are able to enrich the ML models by adding additional categorical (discrete) features to our training data. For example, we could look at the effect of light, motion, and time of day on temperature and humidity.

screen_shot_2019-05-19_at_12_29_25_pm

Conclusion

Hopefully, this post has demonstrated how to efficiently collect telemetry data from IoT devices using Google Protocol Buffers over HTTPS, serverless Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas, all on the Google Cloud Platform. Once captured, the telemetry data was easily aggregated and analyzed using common tools, such as MongoDB Compass and Jupyter Notebooks. Further, we used the data and tools to build machine learning models for prediction and anomaly detection.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

Image: everythingpossible © 123RF.com

, , , , , , , , , ,

Leave a comment

Apache Solr: Because your Database is not a Search Engine

In this post, we will examine what sets Apache Solr aside as a search engine, from conventional databases like MongoDB. We will explore the similarities and differences between Solr and MongoDB by analyzing a series of comparative queries. We will then delve into some of Solr’s more advanced search capabilities.

Copyright: Dejan Bozic (123RF)

Why Search?

The ability to search for information is a basic requirement of many applications. Architects and Developers who limit themselves to traditional databases, often attempt to meet search requirements by creating unnecessarily and overly complex SQL query-based solutions. They force end-users to search in unnatural or highly-structured ways or provide results that lack a sense of relevancy. End-users are not Database Administrators, they do not understand the nuances of SQL, they simply want relevant responses to their inquiries.

In a scenario where data consumers are arbitrarily searching for relevant information within a distinct domain, implementing a search-optimized, Lucene-based platform, such as Elasticsearch or Apache Solr, for reads, is often an effective solution.

Separating database reads from writes is not uncommon. I’ve worked on many projects where the requirements suggested an architecture in which one type of data storage technology should be implemented to optimize for writes, while a different type or types of data storage technologies should be implemented to optimize for reads. Architectures in which this is common include the following.

  • CQRS (Command Query Responsibility Segregation) and Event Sourcing;
  • Reporting, Data Analytics, and Big Data;
  • ML (Machine Learning) and AI (Artificial Intelligence);
  • Real-Time and Streaming Data (such as IoT);
  • Search: Content, Document, Knowledge;

In this post, we will examine the search capabilities of Apache Solr. We will compare and contrast Solr’s search capabilities to those of MongoDB, the leading NoSQL database. We will consider the differences between querying for data and searching for information.

lucene

Apache Lucene

According to Apache, the Apache Lucene project develops open-source search software, including the following sub-projects: Lucene Core, Solr, and PyLucene. The Lucene Core sub-project provides Java-based indexing and search technology, as well as spellchecking, hit highlighting, and advanced analysis/tokenization capabilities.

Apache Lucene 7.7.0 and Apache Solr 7.7.0 were just released in February 2019 and used for all the post’s examples.

solr_logoApache Solr

According to Apache, Apache Solr is the popular, blazing fast, open source, enterprise search platform built on Apache Lucene. Solr powers the search and navigation features of many of the world’s largest internet sites.

Apache Solr includes the ability to set up a cluster of Solr servers that combines fault tolerance and high availability. Referred to as SolrCloud, and backed by Apache Zookeeper, these capabilities provide distributed, sharded, and replicated indexing and search capabilities.

According to Wikipedia, Solr was created at CNET Networks in 2004, donated to the Apache Software Foundation in 2006, and graduated from their incubator in 2007. Solr version 1.3 was released in 2008. In 2010, the Lucene and Solr projects merged; Solr became a Lucene subproject. With Apache Solr 7.7.0 just released, Solr has well over ten years of development and enterprise adoption behind it.

mongodbMongoDB

The leading NoSQL database, MongoDB, describes itself as a document database with the scalability and flexibility that you want with the querying and indexing that users need. Mongo features include ad hoc queries, indexing, and real-time aggregation, which provide powerful ways to access and analyze your data.

Released less than a year ago, MongoDB 4.0 added multi-document ACID transactions, data type conversions, non-blocking secondary replica reads, SHA-2 authentication, MongoDB Compass aggregation pipeline builder, Kubernetes integration, and the MongoDB Stitch serverless platform. MongoDB 4.0.6 was just released in February 2019 and used for all the post’s examples.

Comparing Search Features

Solr and MongoDB appear to have many search-related features in common.

  • Both Solr and MongoDB are document-based data stores;
  • Both Solr and MongoDB use a non-relational data model;
  • Both feature advanced querying and indexing capabilities;
  • Solr implements Lucene-based search capabilities; MongoDB has text-based search capabilities;
  • Solr scores the relevance of search results using the Lucene scoring algorithm; MongoDB has the capability of ranking text search results using the $meta operator;
  • Solr is able to selectively boost the relative importance of search fields and specific values in a field when calculating scores; MongoDB has the capability of boosting the relative importance of fields used in a text search using text indexes;
  • Both Solr and MongoDB are capable of implementing stop words, stemming, and tokenization;

Demonstration

Source Code Examples

All examples shown in this post are available as a series of Python 3 scripts, contained in an open-source project on GitHub, searching-solr-vs-mongodb. The project contains the script, query_mongo.py, which uses the Python driver for MongoDB, pymongo, to execute all the MongoDB queries in this post. The project also contains the script, query_solr.py, which uses the lightweight Python wrapper for Apache Solr, pysolr, to execute all the Solr searches in this post. Both packages, along with ancillary packages, may be installed with pip.

pip3 install pysolr pymongo bson.json_util requests

MongoDB and Solr Instances

To follow along, you will need your own MongoDB and Solr instances. Both are easily stood up locally with Docker, using the official MongoDB and Solr Docker Hub images. Example docker run commands are shown below.

The second command, the Solr command, also creates a new Solr core. The command also bind-mounts the ‘conf’ directory, within the local project, into the container. This will give us the ability to modify our index’s configuration and to store that configuration in source control. All data is ephemeral, neither container persists data outside the container, using these particular commands.

docker run --name mongo -p 27017:27017 -d mongo:latest

docker run --name solr -d \
  -p 8983:8983 \
  -v $PWD/conf:/conf \
  solr:latest \
  solr-create -c movies -d /conf

screen_shot_2019-02-24_at_1_33_33_am

Environment Variables

The source code expects two environment variables, which contain the connection information for MongoDB and Solr. You will need to replace the values below with your own connection strings if they are different than the examples below, used for Docker.

export SOLR_URL="http://localhost:8983/solr"
export MONOGDB_CONN="mongodb://localhost:27017/movies"

screen_shot_2019-02-23_at_8_57_13_am

Importing Movies to MongoDB

For this post, we will be using a publicly available movie dataset from MongoDB. A copy of the dataset is available in the project, as well as on MongoDB’s website, Setup and Import the Data.

Assuming you have an instance of MongoDB accessible and have set the two environment variables above, import the specially-formatted JSON file for MongoDB, movieDetails_mongo.json, directly into the movies database’s movieDetails collection, using the following mongoimport command.

mongoimport \
  --uri $MONOGDB_CONN \
  --collection "movieDetails" \
  --drop --file "data/movieDetails_mongo.json"

screen_shot_2019-02-23_at_8_57_42_am

Below is a view of the movies database’s movieDetails collection, running in the Docker container, as shown in the MongoDB Compass application.

Screen Shot 2019-02-25 at 7.45.40 AM.png

Indexing Movies to Solr

Assuming you have an instance of Solr accessible and have set the two environment variables above, import the contents of the JSON file, movieDetails.json, by running the Python script, solr_index_movies.py, using the following command.

python3 ./solr_index_movies.py

The command executes a series of HTTP calls to Solr’s exposed RESTful API.

screen_shot_2019-02-23_at_10_28_56_pm.png

Below is a view of the Solr Administration User Interface, running within the Docker container, and showing the new movies core. After running the script, we should have 2,250 movie documents indexed.

screen_shot_2019-02-23_at_9_15_42_am

The Solr Admin UI offers a number of useful tools for examing indexes, reviewing schemas and field types, and creating, analyzing, and debugging Solr queries. Below we see the Query UI with the results of a query displayed.

screen_shot_2019-02-25_at_7_34_22_am

Tuning the Solr Index

The movies index uses a default schema, which was created when the movie documents were indexed. To optimize our query results, we will want to make a few adjustments to the default movies schema. First, we want to ensure that our Solr searches consider the pluralization of words. For example, when we search for the search term ‘Adventure’, we want Solr to also return documents containing terms like adventure, adventures, adventure’s, adventuring, and adventurer, but not misadventure. This is known as Stemming, or reducing words to their word stem. An example is shown below in the Solr Analysis UI.

screen_shot_2019-03-01_at_7_08_37_am.png

The fields that we want to search, such as title, plot, and genres, were all indexed by default as the ‘text_general’ Solr field type. The ‘text_general’ field type does not implement stemming when indexing or querying. We need to switch the title, plot, and genres fields to the ‘text_en’ (English text) field type. The ‘text_en’ field type implements multiple indexing and querying filters, including the PorterStemFilterFactory filter, which removes common endings from words. Similar filters include the English Minimal Stem Filter and the English Possessive Filter.

Additionally, the MultiValued field property is set to true by default for these fields in Solr. Since the title and plot fields, amongst others, were only intended to hold a single text value, as opposed to an array of values, we will switch the MultiValued field property to false. This helps with sorting and filtering, and the correct deserialization of documents.

The solr_index_movies.py script will change the title, plot, and genres fields from text_general’ to ‘text_en’ and change the title and plot fields from multi-valued to single-valued. Since we have changed the index’s schema, the script will re-import all the documents after making the schema changes.

To get a better sense of what the schema changes look like, let’s look at the equivalent cURL command to change the schema. This gives you a better sense of the field-level modifications we are making.

You can use the Schema UI to view the results, as shown below. Note the new field types for title, plot, and genres. Also, note the index and query analyzers, including the  PorterStemFilterFactory, used by the ‘text_en’ field type.

screen_shot_2019-02-23_at_10_07_42_pm

Comparative Queries

To demonstrate the similarities and the differences between Solr and MongoDB, we will examine a series of comparative queries, followed by a series of Solr-only searches. Again, all queries and output shown are included in the two project’s Python scripts.

Query 1a: All Documents

To start, we will perform a simple query for all the movie documents in the MongoDB collection, followed by the Solr index. With MongoDB, we use the find method. With Solr, we will use the Standard Query Parser, commonly known as the ‘lucene’ query parser, and the q (query) parameter. The result of the queries should be identical, with all 2,250 documents returned.

MongoDB:

Parameters
----------
query: {}
  
Results
----------
document count: 2250

Solr:

Parameters
----------
q: *:*
kwargs: {}
  
Results
----------
document count: 2250

Query 1b: Count Only

We can alter our first query to limit our response to only the count of documents for a given query in MongoDB; no documents will be returned. Since our query is empty, we will get back a count of all documents in the MongoDB database’s collection.

db.movieDetails.count()

Similarly, in Solr, we can set the rows parameter to zero to return only the document count. For brevity, we can also omit the Solr response header using the omitHeader  parameter.

Parameters
----------
q: *:*
kwargs: {
  'omitHeader': 'true', 
  'rows': '0'
}

Results
----------
document count: 2250

Query 2: Exact Search

Next, we will perform a query for the exact movie title, ‘Star Wars: Episode V – The Empire Strikes Back’ in MongoDB, then Solr. Again, the results of the queries should be identical, with one document returned, matching the title.

screen_shot_2019-02-25_at_7_41_07_am

MongoDB:

Parameters
----------
query: {'title': 'Star Wars: Episode V - The Empire Strikes Back'}
projection: {'_id': 0, 'title': 1}
  
Results
----------
document count: 1
{'title': 'Star Wars: Episode V - The Empire Strikes Back'}

The quotes around the title are key for Solr to view the query as a single phrase as opposed to a series of search terms.

Solr:

Parameters
----------
q: title:"Star Wars: Episode V - The Empire Strikes Back"
kwargs: {
  'defType': 'lucene', 
  'fl': 'title, score'
}
  
Results
----------
document count: 1
{'title': 'Star Wars: Episode V - The Empire Strikes Back', 'score': 29.41}

Note the use of 'defType': 'lucene' is optional. The standard Lucene query parser is the default parser used by Solr. I am merely showing this parameter to improve the reader’s understanding. Later, we will use other query parsers.

Query 3: Search Phrase

Next, we will perform a query for the phrase ‘star wars’. With MongoDB, we will use the $regex and $options Evaluation Query Operators. The results from both the MongoDB and Solr queries should be identical, the six Star Wars movies are returned.

MongoDB:

Parameters
----------
query: {
  'title': {'$regex': '\\bstar wars\\b', 
  '$options': 'i'}
}
projection: {'_id': 0, 'title': 1}
  
Results
----------
document count: 6
{'title': 'Star Wars: Episode I - The Phantom Menace'}
{'title': 'Star Wars: Episode II - Attack of the Clones'}
{'title': 'Star Wars: Episode III - Revenge of the Sith'}
{'title': 'Star Wars: Episode IV - A New Hope'}
{'title': 'Star Wars: Episode V - The Empire Strikes Back'}
{'title': 'Star Wars: Episode VI - Return of the Jedi'}

With Solr, wrapping the phrase ‘star wars’ in quotes ensures Solr will treat the query string as an exact phrase, not individual search terms. Solr results are scored, but scores are almost all identical since all six movies contain the exact phrase.

Solr:

Parameters
----------
q: "star wars"
kwargs: {
  'defType': 'lucene', 
  'df': 'title', 
  'fl': 'title, score'
}
  
Results
----------
document count: 6
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'score': 8.21}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'score': 8.21}
{'title': 'Star Wars: Episode IV - A New Hope', 'score': 8.21}
{'title': 'Star Wars: Episode I - The Phantom Menace', 'score': 8.21}
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'score': 8.21}
{'title': 'Star Wars: Episode V - The Empire Strikes Back', 'score': 7.55}

Here is the actual Lucene query (q) Solr will run.

title:"star war"

Query 4: Search Terms

Next, we will perform a query for all movies whose title contains either the search terms ‘star’ or ‘wars’, as opposed to the phrase, ‘star wars’. The Solr web console has a very powerful Analysis tool. Using the Analysis tool, we can examine how each filter (abbreviated in the far left column, below), associated with a particular field type, will impact the matching capabilities of Solr. To use the Analysis tool, place your search term(s) or phrase on the right side, an indexed field value on the left, and choose a field or field type from the dropdown.

Below, we see how the search terms ‘Star’ and ‘Wars’ (shown below right) would match a series of variations on the two words (shown below left) if the fields being searched are of the field type, ‘text_en’, as discussed earlier. For example, a query for ‘Star’ would match ‘star’, ‘stars’, ‘star‘s’, ‘starring’, ‘starred’, ‘star-shaped’, but not ‘shaped’, ‘superstars’, ‘started’.

screen_shot_2019-02-23_at_9_21_37_am

Below, we see similar results for the search term, ‘Wars’.

screen_shot_2019-02-23_at_9_27_12_am

MongoDB Text Search

To accomplish the query with MongoDB, we will use MongoDB’s $text Evaluation Query Operator. The MongoDB $text operator is able to perform a text search across multiple fields indexed with a text index. MongoDB’s text indexes support text search queries on string content. The text indexes can include any field whose value is a string or an array of string elements, such as the movieDetail collection’s genres field. Although not as powerful as Solr’s search capabilities, MongoDB’s text search may address many basic search requirements without the need to augment the architecture with a search engine.

For our next query, we will rely on a text index on the title field. When the Python script runs, it creates the following three indexes on the collection, including the title text index.

With the text index in place, the result of the queries should be identical, with 18 documents returned. Both the MongoDB and Solr resultsets are scored, however, both are scored differently, using different algorithms.

MongoDB:

Parameters
----------
query: {
  '$text': {'$search': 'star wars', '$language': 'en', '$caseSensitive': False}, 
  'countries': 'USA'
}
projection: {'score': {'$meta': 'textScore'}, '_id': 0, 'title': 1}
sort: [('score', {'$meta': 'textScore'})]
  
Results
----------
document count: 18
{'title': 'Star Wars: Episode I - The Phantom Menace', 'score': 1.2}
{'title': 'Star Wars: Episode IV - A New Hope', 'score': 1.17}
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'score': 1.17}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'score': 1.17}
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'score': 1.17}

Solr:

Parameters
----------
q: star wars
kwargs: {
  'defType': 'lucene', 
  'fq': 'countries: USA' 
  'df': 'title', 
  'fl': 'title, score', 
  'rows': '5'
}
  
Results
----------
document count: 18
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'score': 8.21}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'score': 8.21}
{'title': 'Star Wars: Episode IV - A New Hope', 'score': 8.21}
{'title': 'Star Wars: Episode I - The Phantom Menace', 'score': 8.21}
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'score': 8.21}

Here is the actual Lucene query (q) Solr will run. The countries filter is applied afterward.

title:star title:war

Find me a Good Western

Query 5a: Multiple Search Terms

Next, we will perform a query for movies, produced in the USA, with the search terms ‘western’, ‘action’, or ‘adventure’ in the movie genres field. The genres field may hold multiple genre values. Although this is a simple query, we can start to see the advantages of Solr’s Lucene scoring capability to provide a way to measure the relevancy of individual results.

Even limited to the USA-based movies, this genres query returns a large number of results, 244 documents. With MongoDB, we have no sense of which documents are more relevant than others. Compared to the Solr results, MongoDB got a few in the top five results, but not the most relevant, based on matching all or most of the genres.

MongoDB:

Parameters
----------
query: {
  'genres': {'$in': ['Adventure', 'Action', 'Western']}, 
  'countries': 'USA'
}
projection: {'_id': 0, 'genres': 1, 'title': 1}
  
Results
----------
document count: 244
{'title': 'Wild Wild West', 'genres': ['Action', 'Western', 'Comedy']}
{'title': 'A Million Ways to Die in the West', 'genres': ['Comedy', 'Western']}
{'title': 'An American Tail: Fievel Goes West', 'genres': ['Animation', 'Adventure', 'Family']}
{'title': 'Once Upon a Time in the West', 'genres': ['Western']}
{'title': 'How the West Was Won', 'genres': ['Western']}

However, with Solr’s scoring, we see the first (top) result, ‘The Wild Bunch’, has a score of 7.18. It genres contain exactly ‘western’, ‘action’, or ‘adventure’. The last (bottom) result, ‘S.S. Doomtrooper’, has a score of 1.47. The most relevant result scored nearly 5x higher (488%) than the least most relevant result. If you were searching for a western action adventure movie, it is pretty apparent the top Solr result, ‘The Wild Bunch’, is a much better choice than the bottom result, ‘S.S. Doomtrooper’. In fact, as shown below, all five top-scoring Solr results look pretty promising based on their score, genres, and title.

Solr:

Parameters
----------
q: adventure action western
kwargs: {
  'defType': 'lucene', 
  'fq': 'countries: USA', 
  'df': 'genres', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 244
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 7.18}
{'title': 'Crossfire Trail', 'genres': ['Action', 'Western'], 'score': 6.26}
{'title': 'The Big Trail', 'genres': ['Adventure', 'Western', 'Romance'], 'score': 5.46}
{'title': 'Once Upon a Time in the West', 'genres': ['Western'], 'score': 5.26}
{'title': 'How the West Was Won', 'genres': ['Western'], 'score': 5.26}

Here is the actual Lucene query (q) Solr will run. The countries filter is applied afterward.

genres:adventur genres:action genres:western

Query 5b: Required Search Term

There are nearly endless options that can be used with Solr to influence Solr’s results. For example, we could perform the same Solr query above, but this time require that the word ‘western’ is the genres field, by using the plus symbol (+boolean operator. The top five results and scores are the same, but the total number of relevant results have decreased from 244 to just 24. That means 220 of the previous results contained ‘action’, and/or ‘adventure’, but not ‘western’. The opposite is also true, using the minus symbol (-) boolean operator will ensure the results do not contain a particular word or phrase.

Solr:

Parameters
----------
q: adventure action +western
kwargs: { 
  'defType': 'lucene', 
  'fq': 'countries: USA', 
  'df': 'genres', 
  'fl': 'title, genres, score', 
  'rows': '5'
} 
  
Results
----------
document count: 24
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 7.18}
{'title': 'Crossfire Trail', 'genres': ['Action', 'Western'], 'score': 6.26}
{'title': 'The Big Trail', 'genres': ['Adventure', 'Western', 'Romance'], 'score': 5.46}
{'title': 'Once Upon a Time in the West', 'genres': ['Western'], 'score': 5.26}
{'title': 'How the West Was Won', 'genres': ['Western'], 'score': 5.26}

Here is the actual Lucene query (q) Solr will run. The countries filter is applied afterward.

(genres:adventur genres:action) +genres:western

Query 6a: eDisMax Query

For our next query, we will compare Solr’s eDisMax query parser to MongoDB’s text search capabilities.

Solr Extended DisMax

According to Solr, The DisMax query parser is designed to process simple phrases and to search for individual terms across several fields using different weighting (boosts) based on the significance of each field. Additional options enable users to influence the score based on rules specific to each use case (independent of user input). Solr’s Extended DisMax (eDisMax) query parser is an improved version of the DisMax query parser.

In my opinion, in addition to the Lucene-based scoring, the ability to easily search across multiple fields and selectively boost results with the DisMax and eDisMax query parsers is what starts to differentiate querying data in a database, from searching for relevant results with a search engine.

Multi-Field Text Index

For our next query, the Python script will drop the previous MongoDB text index on the title field and create a new compound text index, which will incorporate the title, plot, and genres fields.

Below, we see the new compound text index in the MongoDB Compass application’s Indexes tab.

screen_shot_2019-02-23_at_11_39_06_am

We will perform a query for movies, produced in the USA, with the search terms ‘western’, ‘action’, or ‘adventure’ in the movie title, plot, or genres fields. The results of the queries should be identical, with 259 documents returned. Both the MongoDB and Solr resultsets are scored, but again, the scores and ordering of results are not identical. Of the top ten results, the two queries matched six movies in their top ten results.

MongoDB:

Parameters
----------
query: {
  '$text': {'$search': 'western action adventure', '$language': 'en', '$caseSensitive': False}, 
  'countries': 'USA'
}
projection: {'score': {'$meta': 'textScore'}, '_id': 0, 'title': 1}
  
Results
----------
document count: 259
{'title': 'Zathura: A Space Adventure', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 3.3}
{'title': 'The Extraordinary Adventures of Adèle Blanc-Sec', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 3.24}
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 3.2}
{'title': 'The Adventures of Tintin', 'genres': ['Animation', 'Action', 'Adventure'], 'score': 2.85}
{'title': 'Adventures in Babysitting', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 2.85}

Solr:

Parameters
----------
q: western action adventure
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 259
{'title': 'The Secret Life of Walter Mitty', 'genres': ['Adventure', 'Comedy', 'Drama'], 'score': 7.67}
{'title': 'Western Union', 'genres': ['History', 'Western'], 'score': 7.39}
{'title': 'The Adventures of Tintin', 'genres': ['Animation', 'Action', 'Adventure'], 'score': 7.36}
{'title': 'Adventures in Babysitting', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 7.36}
{'title': 'The Poseidon Adventure', 'genres': ['Action', 'Adventure', 'Drama'], 'score': 7.36}

Here is the actual Lucene query Solr will run.

+(
  (title:adventur | plot:adventur | genres:adventur) 
  (title:action | plot:action | genres:action) 
  (title:western | plot:western | genres:western)
)

Query 6b: Boosting Fields

If you really wanted a ‘western action adventure’ movie as opposed to a ‘western’, ‘action’, or ‘adventure’ movie, then neither Solr or MongoDB’s probably completely satisfied you with their first five search results. Boosting or weighting fields can often provide more relevant search results if the correct fields are boosted, and the amount of positive or negative boost is appropriate.

MongoDB’s text indexes also allow for weighting individual fields. The weight of an indexed field denotes the significance of the field relative to the other indexed fields and directly impacts the text search score. Weighting fields are the equivalent to boosting fields with Solr. Below, we see a modification applied to our previous text index in which the title field is given twice the weight of the plot field (1.0 is default) and the genres field is given twice the weight of the title field. The Python script also applies this index for you.

Likewise, Solr is also capable of boosting fields for both the DisMax and eDisMax query parsers. For our next query, we will repeat the previous query, but boost fields in the eDisMax’s qf (Query Fields) parameter to match the boost in the MongoDB weighted text index, shown above.

The results of the queries should be identical, with 259 documents returned. MongoDB and Solr’s results are scored and ordered differently. However, compared to the previous, un-weighted/boosted MongoDB and Solr query results above, the relative scores are higher, the order of movies returned are different, and most importantly, the Solr results seem more relevant to the original search intent.

For example with Solr, take the movie, ‘The Secret Life of Walter Mitty’, which previously scored highest at 7.58. In the boosted search results, the movie, ‘The Wild Bunch’ is now ranked first with a score of 28.71. The movie, ‘The Secret Life of Walter Mitty’ is no longer even in the top 50 Solr results. Comparatively, other movies, like ‘The Adventures of Tintin’ and ‘Adventures in Babysitting’, barely moved in position even though their scores changed proportionally.

MongoDB:

Parameters
----------
query: {
  '$text': {'$search': 'western action adventure', '$language': 'en', '$caseSensitive': False}, 
  'countries': 'USA'
}
projection: {'score': {'$meta': 'textScore'}, '_id': 0, 'title': 1}
  
Results
----------
document count: 259
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 12.8}
{'title': 'Zathura: A Space Adventure', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 10.27}
{'title': 'The Extraordinary Adventures of Adèle Blanc-Sec', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 10.14}
{'title': 'The Adventures of Tintin', 'genres': ['Animation', 'Action', 'Adventure'], 'score': 9.9}
{'title': 'Adventures in Babysitting', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 9.9}

Solr:

Parameters
----------
q: western action adventure
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title^2.0 genres^4.0', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 259
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 28.71}
{'title': 'Crossfire Trail', 'genres': ['Action', 'Western'], 'score': 25.05}
{'title': 'The Big Trail', 'genres': ['Adventure', 'Western', 'Romance'], 'score': 21.84}
{'title': 'Once Upon a Time in the West', 'genres': ['Western'], 'score': 21.05}
{'title': 'How the West Was Won', 'genres': ['Western'], 'score': 21.05}

Here is the actual Lucene query Solr will run.

+(
  ((title:adventur)^2.0 | plot:adventur | (genres:adventur)^4.0) 
  ((title:action)^2.0 | plot:action | (genres:action)^4.0) 
  ((title:western)^2.0 | plot:western | (genres:western)^4.0)
)

Query 6c: eDisMax Boosted with Required/Prohibited Terms

We can use both the plus (+) and minus (-) boolean operators to obtain more relevant search results. Let’s repeat the last Solr boosted query, but this time, also require any results to contain the search term, ‘western’, and prohibit the responses from containing the search term, ‘romance’. I would consider the new search results, based these modifications to the Solr query, to be more relevant to the original intent of the search, than the previous results. For example, the movie, ‘The Big Trail’, a romantic western adventure movie, according to its genres, is no longer included in the results sets.

Solr:

Parameters
----------
q: adventure action +western -romance
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title^2.0 genres^4.0',
  'fl': 'title, genres, score', 
  'rows': '5'
}

Results
----------
document count: 25
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 28.71}
{'title': 'Crossfire Trail', 'genres': ['Action', 'Western'], 'score': 25.05}
{'title': 'Once Upon a Time in the West', 'genres': ['Western'], 'score': 21.05}
{'title': 'How the West Was Won', 'genres': ['Western'], 'score': 21.05}
{'title': 'Cowboy', 'genres': ['Western'], 'score': 21.05}

Here is the actual Lucene query Solr will run.

+(
  ((title:adventur)^2.0 | plot:adventur | (genres:adventur)^4.0) 
  ((title:action)^2.0 | plot:action | (genres:action)^4.0) 
  +((title:western)^2.0 | plot:western | (genres:western)^4.0) 
  -((title:romanc)^2.0 | plot:romanc | (genres:romanc)^4.0)
)

Query 7a: The Movie Dilemma

Frequently, end-users interact with a search engine, such as Google, through a search box. We type something into a search box and get back a list of relevant results. By now, most of us have learned how to phrase our Google search to get optimal results. However, the reality is, people can and will type just about anything into a search box.

To try and improve the average search results for end-users, search engineers will often try to tune query parameters, such as boosting the importance certain search fields over other, adjusting fuzzy search parameters, or ignore irrelevant words in the search phases by adding them to the stop words. Default English in Solr stop words include like: ‘a’, ‘an’, ‘and’, ‘are’, ‘as’, ‘at’, ‘be’, ‘but’, ‘by’, ‘for’, and so forth.

Take, for example, the word ‘movie’. Someone searching for a movie to watch, using a search box, might enter the phrase ‘A cowboy movie’. The term, ‘A’, is ignored as a stop word. This leaves the search terms ‘cowboy’ and ‘movie’ to be searched for in the title, plot, and genres fields. As we see by the top ten results below, most appear to be about cowboys, having the word ‘cowboy’ in their title or plot. Then there is the result, ‘TV: The Movie’. This does not appear to be a movie about cowboys. The word ‘cowboy’ is not in the title, plot, or genres, yet here it is in third place since it contains the word ‘movie’ in the title, plot, and/or genres.

Similarly, the top movie result, ‘Cowboy Bebop: The Movie’, is probably no more relevant than the second, third, or fourth place movies. However, ‘Cowboy Bebop: The Movie’ has scored significantly higher than even the number two results (11.24 vs. 7.31). This is because the movie’s title contains both search terms, ‘cowboy’ and ‘movie’, even though the word ‘movie’ is irrelevant to the original intent of the search phrase.

Solr:

Parameters
----------
q: A cowboys movie
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, genres, score', 
  'rows': '10'
}
  
Results
----------
document count: 23
{'title': 'Cowboy Bebop: The Movie', 'genres': ['Animation', 'Action', 'Crime'], 'score': 11.24}
{'title': 'Cowboy', 'genres': ['Western'], 'score': 7.31}
{'title': 'TV: The Movie', 'genres': ['Comedy'], 'score': 6.42}
{'title': 'Space Cowboys', 'genres': ['Action', 'Adventure', 'Thriller'], 'score': 6.33}
{'title': 'Midnight Cowboy', 'genres': ['Drama'], 'score': 6.33}
{'title': 'Drugstore Cowboy', 'genres': ['Crime', 'Drama'], 'score': 6.33}
{'title': 'Urban Cowboy', 'genres': ['Drama', 'Romance', 'Western'], 'score': 6.33}
{'title': 'The Cowboy Way', 'genres': ['Action', 'Comedy', 'Drama'], 'score': 6.33}
{'title': 'The Cowboy and the Lady', 'genres': ['Comedy', 'Drama', 'Romance'], 'score': 6.33}
{'title': 'Toy Story', 'genres': ['Animation', 'Adventure', 'Comedy'], 'score': 5.65}

Here is the actual Lucene query Solr will run.

+(
  (title:cowboi | plot:cowboi | genres:cowboi) 
  (title:movi | plot:movi | genres:movi)
)

Query 7b: Stop Words

To solve the movie dilemma, we might consider adding the word ‘movie’ to the stop words, since the word ‘movie’ seems to be irrelevant to the search phrase ‘A cowboys movie’, or to a movie search engine in general. However, this choice will negatively impact other searches. There are 12 movie titles containing the word ‘movie’. If you were searching for ‘The Lego Movie’, ignoring the word ‘movie’, as a stop word, would negatively impact the accuracy and relevance of your search results. You would end up with only one of two Lego movies in your search results, the one without the title that contained the word ‘movie’. Note only one of the two Lego movies is returned.

Solr:

Parameters
----------
q: The Lego Movie
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 1
{'title': 'Lego DC Comics Super Heroes: Justice League vs. Bizarro League', 'genres': ['Animation', 'Action', 'Adventure'], 'score': 3.63}

Here is the actual Lucene query Solr will run. Note neither the term ‘a’ and ‘movie’ are part of the search.

+((title:lego | plot:lego | genres:lego)

Query 7c: Negative Boost

A second method to solve the movie dilemma might be to negatively boost the word ‘movie’ when it appears in the title field, thus reducing its relevance. Negatively boosting fields, or more precisely, negatively boosting a specific field value, is possible with both the DisMax and eDisMax query parsers. We can assign a negative boost to the word ‘movie’ when it appears in the title field, by using the bq (Boost Query) parameter.  According to Solr, The bq parameter specifies an additional, optional, query clause that will be added to the user’s main query to influence the score. As Developers, we could programmatically append the negatively boosted term(s) into the query without directly altering the user’s original search phrase. Again, like stop words, boosting may also negatively impact other searches.

After some experimentation, we will try a boost value of -2. We still get 23 results, however, the top ten results now appear to be more relevant, based on the intent of our search phrase. The movies, ‘Cowboy Bebop: The Movie’ and ‘TV: The Movie’ are not present in the top ten results; their scoring was lowered. You can repeat this process for more search terms, like ‘movie’, positively or negatively boosting their scores, to improve the relevancy of the results.

Solr:

Parameters
----------
q: A cowboys movie
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'bq': 'title:movie^-2.0', 
  'fl': 'title, genres, score', 
  'rows': '10'
}
  
Results
----------
document count: 23
{'title': 'Cowboy', 'genres': ['Western'], 'score': 7.31}
{'title': 'Space Cowboys', 'genres': ['Action', 'Adventure', 'Thriller'], 'score': 6.33}
{'title': 'Midnight Cowboy', 'genres': ['Drama'], 'score': 6.33}
{'title': 'Drugstore Cowboy', 'genres': ['Crime', 'Drama'], 'score': 6.33}
{'title': 'Urban Cowboy', 'genres': ['Drama', 'Romance', 'Western'], 'score': 6.33}
{'title': 'The Cowboy Way', 'genres': ['Action', 'Comedy', 'Drama'], 'score': 6.33}
{'title': 'The Cowboy and the Lady', 'genres': ['Comedy', 'Drama', 'Romance'], 'score': 6.33}
{'title': 'Toy Story', 'genres': ['Animation', 'Adventure', 'Comedy'], 'score': 5.65}
{'title': "Ride 'Em Cowboy", 'genres': ['Comedy', 'Western', 'Musical'], 'score': 5.58}
{'title': "G.M. Whiting's Enemy", 'genres': ['Mystery'], 'score': 5.32}

Here is the actual Lucene query Solr will run, accounting for the negative boost.

+((title:cowboi | plot:cowboi | genres:cowboi) 
  (title:movi | plot:movi | genres:movi)) 
(title:movi)^-2.0

Function Query

Solr’s Lucene scoring is effective, but what if we wanted to use an additional, subjective measure of relevance to enrich our search results? If we examine the movies index schema, we will see there are quite a few rating-related fields that provide a sense of the movie’s quality, as judged by viewers and organizations. For example, there is a nested ‘tomato’ object, containing a number of qualitative data fields, such as the tomato rating and a tomato user rating. Tomato refers to Rotten Tomatoes. According to their site, Rotten Tomatoes provides the world’s most trusted recommendation resources for quality entertainment. Additionally, the movies index schema includes similar ‘awards’ and ‘imdb’ objects and a ‘metacritic’ rating. Here is a snippet of data from the movie, ‘Butch Cassidy and the Sundance Kid’, showing many of those qualitative fields.

  "imdb": {
    "id": "tt0064115",
    "rating": 8.1,
    "votes": 142642
  },
  "tomato": {
    "meter": 89,
    "image": "certified",
    "rating": 8.2,
    "reviews": 46,
    "fresh": 41,
    "consensus": "With its iconic pairing of Paul Newman and Robert Redford, jaunty screenplay and Burt Bacharach score, Butch Cassidy and the Sundance Kid has gone down as among the defining moments in late-'60s American cinema.",
    "userMeter": 93,
    "userRating": 4,
    "userReviews": 70088
  },
  "metacritic": 58,
  "awards": {
    "wins": 16,
    "nominations": 14,
    "text": "Won 4 Oscars. Another 16 wins & 14 nominations."
  }

According to Apache, Lucene scoring is a combination of the Vector Space Model (VSM) of Information Retrieval and the Boolean model. Lucene allows influencing search results by ‘boosting’. Using the Solr’s Function Query, we can apply a document-level multiplicative boost function, which will alter the scores of the query’s search results.

Query 8: Boost Function

If you remember in our previous example, we queried for ‘adventure action +western -romance’. If we run it again, without the boosted fields, we got back 25 documents, of which ‘Western Union’ ranked highest, with a score of 7.39.

Solr:

Parameters
----------
q: adventure action +western -romance
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, awards.wins, score', 
  'rows': '5'
}

Results
----------
document count: 25
{'title': 'Western Union', 'awards.wins': [0.0], 'score': 7.39}
{'title': 'The Wild Bunch', 'awards.wins': [5.0], 'score': 7.18}
{'title': 'Western Spaghetti', 'awards.wins': [2.0], 'score': 6.64}
{'title': 'Crossfire Trail', 'awards.wins': [1.0], 'score': 6.26}
{'title': 'Butch Cassidy and the Sundance Kid', 'awards.wins': [16.0], 'score': 6.23}

Here is the actual Lucene query Solr will run.

+(
  (title:adventur | plot:adventur | genres:adventur) 
  (title:action | plot:action | genres:action) 
  +(title:western | plot:western | genres:western) 
  -(title:romanc | plot:romanc | genres:romanc)
)

Now, we will apply a boost function. In the function below, I have arbitrarily taken the number of awards won by each movie and divided it in half. The function is applied to the eDisMax’s boost parameter.

div(field(awards.wins,min),2)

This function has a multiplicative effect on the Lucene scoring of the documents in the result set, by boosting scores in proportion to the number of awards each movie has won. We now get movies that are a blend of both relevant results, based on our search phrase, as well as those movies that are highly acclaimed.

The impact of the multiplicative boost function is immediately apparent with the top result, ‘Butch Cassidy and the Sundance Kid’. This widely acclaimed movie climbed from fifth place in the previous search to first place, using the boost formula. The movie, ‘Butch Cassidy and the Sundance Kid’, won an amazing 16 awards, which is 16 more than that of the previous first place movie, ‘Western Union’, which won no awards, moving it down all the way down to 17th place in the boosted results. A movie that wasn’t even in the top five results, ‘Wild Wild West’, is now in second place, having received ten awards.

The impact of the boost function is most apparent in the scores. Previously, the score delta between the first and fifth positions in the results was 1.16. The delta between the first and second position was a mear 0.21. Now, with the boost function applied, the range of scoring, and consequently, the two deltas increased significantly, 36.78 compared to 1.16 and 23.83 compared to 0.21.

Solr:

Parameters
----------
q: adventure action +western -romance
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, awards.wins, score', 
  'boost': 'div(field(awards.wins,min),2)', 
  'rows': '5'
}

Results
----------
document count: 25
{'title': 'Butch Cassidy and the Sundance Kid', 'awards.wins': [16.0], 'score': 49.86}
{'title': 'Wild Wild West', 'awards.wins': [10.0], 'score': 26.03}
{'title': 'How the West Was Won', 'awards.wins': [7.0], 'score': 18.42}
{'title': 'The Wild Bunch', 'awards.wins': [5.0], 'score': 17.95}
{'title': 'All Quiet on the Western Front', 'awards.wins': [5.0], 'score': 13.08}

Here is the actual Lucene query Solr will run, using the boost function.

FunctionScoreQuery(+((title:adventur | plot:adventur | 
  genres:adventur) (title:action | plot:action | genres:action) 
  +(title:western | plot:western | genres:western) 
  -(title:romanc | plot:romanc | genres:romanc)), 
  scored by boost(div(double(awards.wins,MIN),const(2))))

Solr’s Function Query offers a large number of mathematical functions, which can be combined into complex formulas. For example, we could also take the square root of the sum of the IMDB rating and the number of award nominations.

sqrt(sum(field(imdb.rating,min),field(awards.wins,min)))

More Like This, Please

You’ve found a good movie, and now you want more movies just like that one. Maybe you want more movies by a particular director, or starring a certain actor, or based on the same theme. Solr has a solution for this, the More Like This Query Parser. The MLTQParser, for short, enables retrieving documents that are similar to a given document. It uses Lucene’s existing MoreLikeThis logic. To use the parser, you provide the unique Solr ID of the document you want to find more like and the field(s) to use for the comparison. For example:

q: {!mlt qf="genres"}da54520e-a013-4ea3-9698-230ed02c8cf0

Query 9a: MLT Genres

In the first MLTQParser query, we will select the movie, ‘Star Wars: Episode I – The Phantom Menace’. We will look for more movies, produced in the USA, that are similar to ‘Star Wars: Episode I – The Phantom Menace’, by looking for similarities in the genres field. The MLTQParser’s qf field specifies the fields to use for similarity. We will require amintf (Minimum Term Frequency) of 1. This is the frequency below which search terms will be ignored in the source document. We will also require a mindf (Minimum Document Frequency) of 1. This is the frequency at which words will be ignored when they do not occur in at least this many documents.

The results of the Solr search appear logical, they are the other five Star Wars movies. Note that since the first five results are exact matches on The Phantom Menace’s three genres, ‘action’, ‘adventure’, and ‘fantasy’, their scores are identical, 6.33.

Solr:

Parameters
----------
q: {!mlt qf="genres" mintf=1 mindf=1}aaf956a5-afa5-4284-91c1-69455142884f
kwargs: {
  'defType': 'lucene', 
  'fq': 'countries: USA', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 252
{'title': 'Star Wars: Episode IV - A New Hope', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}
{'title': 'Star Wars: Episode V - The Empire Strikes Back', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}

Here is the actual Lucene query Solr will run.

+(genres:action genres:adventur genres:fantasi) 
-id:652adcfa-c59c-4fa0-ace5-6345fec3cfff

Query 9b: The Problem with George

In the second MLTQParser query example, we will again choose the movie, ‘Star Wars: Episode I – The Phantom Menace’. However, this time will look for similar movies based on a comparison of the actors, director, and writers fields (shown below). Basically, we are looking for similarities between the people associated with ‘Star Wars: Episode I – The Phantom Menace’ and other movies.

Solr:

Parameters
----------
q: id:"aaf956a5-afa5-4284-91c1-69455142884f"
kwargs: {
  'defType': 'lucene', 
  'fl': 'actors, director, writers'
}
  
Results
----------
document count: 1
{
  'director': ['George Lucas'], 
  'writers': ['George Lucas'], 
  'actors': ['Liam Neeson', 'Ewan McGregor', 'Natalie Portman', 'Jake Lloyd']
}

As you can tell by the results of the MLTQParser query below, the first nine out of ten search results make sense. However, the tenth movie result, ‘New Meet Me on South Street: The Story of JC Dobbs’, has no obvious similarities with ‘Star Wars: Episode I – The Phantom Menace’. The movie does not share a common director, writer, or actor.

Solr:

Parameters
----------
q: {!mlt qf="actors director writers" mintf=1 mindf=1}aaf956a5-afa5-4284-91c1-69455142884f
kwargs: {
  'defType': 'lucene', 
  'fq': 'countries: USA', 
  'fl': 'title, actors, director, writers, score', 
  'rows': '10'
}
  
Results
----------
document count: 55
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'director': ['George Lucas'], 'writers': ['George Lucas'], 'actors': ['Ewan McGregor', 'Natalie Portman', 'Hayden Christensen', 'Ian McDiarmid'], 'score': 44.84}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'director': ['George Lucas'], 'writers': ['George Lucas', 'Jonathan Hales', 'George Lucas'], 'actors': ['Ewan McGregor', 'Natalie Portman', 'Hayden Christensen', 'Christopher Lee'], 'score': 44.58}
{'title': 'Star Wars: Episode IV - A New Hope', 'director': ['George Lucas'], 'writers': ['George Lucas'], 'actors': ['Mark Hamill', 'Harrison Ford', 'Carrie Fisher', 'Peter Cushing'], 'score': 23.51}
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'director': ['Richard Marquand'], 'writers': ['Lawrence Kasdan', 'George Lucas', 'George Lucas'], 'actors': ['Mark Hamill', 'Harrison Ford', 'Carrie Fisher', 'Billy Dee Williams'], 'score': 11.96}
{'title': 'A Million Ways to Die in the West', 'director': ['Seth MacFarlane'], 'writers': ['Seth MacFarlane', 'Alec Sulkin', 'Wellesley Wild'], 'actors': ['Seth MacFarlane', 'Charlize Theron', 'Amanda Seyfried', 'Liam Neeson'], 'score': 11.72}
{'title': 'Run All Night', 'director': ['Jaume Collet-Serra'], 'writers': ['Brad Ingelsby'], 'actors': ['Liam Neeson', 'Ed Harris', 'Joel Kinnaman', 'Boyd Holbrook'], 'score': 11.72}
{'title': 'I Love You Phillip Morris', 'director': ['Glenn Ficarra, John Requa'], 'writers': ['John Requa', 'Glenn Ficarra', 'Steve McVicker'], 'actors': ['Jim Carrey', 'Ewan McGregor', 'Leslie Mann', 'Rodrigo Santoro'], 'score': 10.97}
{'title': 'The Island', 'director': ['Michael Bay'], 'writers': ['Caspian Tredwell-Owen', 'Alex Kurtzman', 'Roberto Orci', 'Caspian Tredwell-Owen'], 'actors': ['Ewan McGregor', 'Scarlett Johansson', 'Djimon Hounsou', 'Sean Bean'], 'score': 10.97}
{'title': 'Big Fish', 'director': ['Tim Burton'], 'writers': ['Daniel Wallace', 'John August'], 'actors': ['Ewan McGregor', 'Albert Finney', 'Billy Crudup', 'Jessica Lange'], 'score': 10.97}
{'title': 'New Meet Me on South Street: The Story of JC Dobbs', 'director': ['George Manney'], 'writers': ['George Manney'], 'actors': ['Tony Bidgood', 'Peter Stone Brown', 'Stephen Caldwell', 'Tommy Conwell'], 'score': 10.5}

Here is the actual Lucene query Solr run.

+(writers:george director:george actors:natalie writers:lucas 
  actors:jake actors:portman actors:ewan actors:mcgregor 
  actors:liam actors:lloyd director:lucas actors:neeson) 
-id:652adcfa-c59c-4fa0-ace5-6345fec3cfff

Examining the query, we can plainly see the problem with MLTQParser. The MLTQParser query is splitting the first and last names of actors, directors, and writers, then searching for each name individually, but not their whole name. In my opinion, this is a bug with the MLTQParser, since each value in the actors, director, and writers MultiValued fields are wrapped in double quotes. The query should treat each value an exact phrase, not individual search terms.

Given the MLTQParser’s query logic, it is now clear why a seemingly irrelevant movie, like ‘New Meet Me on South Street: The Story of JC Dobbs’, was part of the search results. Examining the debug output of the scoring explanation, we see the following.

"544637e5-e96d-4f62-9b22-5174c60ee512":"
10.504457 = sum of:
  10.504457 = sum of:
    5.5608187 = weight(writers:george in 649) [SchemaSimilarity], result of:
      5.5608187 = score(doc=649,freq=1.0 = termFreq=1.0
), product of:
        4.226696 = idf, computed as log(1 + (docCount - docFreq + 0.5) / (docFreq + 0.5)) from:
          26.0 = docFreq
          1814.0 = docCount
        1.315642 = tfNorm, computed as (freq * (k1 + 1)) / (freq + k1 * (1 - b + b * fieldLength / avgFieldLength)) from:
          1.0 = termFreq=1.0
          1.2 = parameter k1
          0.75 = parameter b
          4.836273 = avgFieldLength
          2.0 = fieldLength
    4.943639 = weight(director:george in 649) [SchemaSimilarity], result of:
      4.943639 = score(doc=649,freq=1.0 = termFreq=1.0
), product of:
        4.6206594 = idf, computed as log(1 + (docCount - docFreq + 0.5) / (docFreq + 0.5)) from:
          20.0 = docFreq
          2081.0 = docCount
        1.069899 = tfNorm, computed as (freq * (k1 + 1)) / (freq + k1 * (1 - b + b * fieldLength / avgFieldLength)) from:
          1.0 = termFreq=1.0
          1.2 = parameter k1
          0.75 = parameter b
          2.3801057 = avgFieldLength
          2.0 = fieldLength
"

George Manney was both the Director and Writer of ‘New Meet Me on South Street: The Story of JC Dobbs’. George Manney shares a first name with George Lucas, the Director and Writer of ‘Star Wars: Episode I – The Phantom Menace’. This is the only similarity between people associated with both movies. Therefore, there were two matches, one match on the director fields and a match on the writers field. Unfortunately, having the same first names negatively impacts the results from the MLTQParser.

Synonyms

For our last example, we will examine the use of synonyms with Solr. In respect to the movie index, we could perform the following eDisMax search on the title, plot, and genres fields: ‘scary’ OR ‘slasher’ OR ‘spooky’ OR ‘evil’ OR ‘horror’, or more simply ‘scary slasher spooky evil horror’. Based on this search, we would get back a truly gruesome collection of 141 films. The search is effective because it uses multiple, similar search terms to return a larger resultset of movies within a similar theme. However, the search relies on each end-user to enter the same relevant search terms, every time.

With Solr’s synonym capability, we can build some intelligence into our index by defining synonymous relationships between terms. There are multiple ways to define synonymous relationships between terms in Solr. Lucidworks has an excellent article, Synonyms Files, on the different synonymous relationships. We will look at three types of relationships, as described by Lucidworks: Replacement Synonyms, Oneway Expansion Synonyms, and Multiway Expansion Synonyms.

I have pre-define some examples for each of the three types of relationships, in the movies index’s synonmys.txt configuration file. This file is created when a new index is created.

## Custom synonym groups for movies index ##

# Replacement Synonyms examples
scarey => scary
ciborg => cyborg

# Multiway Expansion Synonyms examples
scary,slasher,spooky,evil,horror

# Oneway Expansion Synonyms examples
droid => droid,android,robot,cyborg

There is a copy of the file in this project, which will be used by the movies index, running in the Docker container.

Screen Shot 2019-02-25 at 7.57.41 AM

Query 10a: Replacement Synonyms

We will start with a Replacement Synonyms example. I have added the following synonym mapping for a common misspelling of the word ‘cyborg’.

ciborg => cyborg

If we perform a search for the term ‘ciborg’, Solr will substitute it with the term ‘cyborg’. We can confirm this by viewing the query, as shown in the debug snippet below.

+(title:cyborg | plot:cyborg | genres:cyborg)

Performing the query returns two documents, including the most famous cyborg, Arnold Schwarzenegger, the Terminator.

Parameters
----------
q: ciborg
kwargs: {
  'defType': 'edismax', 
  'qf': 'title plot genres', 
  'fl': 'title, score', 
  'rows': '5'
}

Results
----------
document count: 2
{'title': 'Terminator 2: Judgment Day', 'score': 8.17}
{'title': "I'm a Cyborg, But That's OK", 'score': 7.13}

Query 10b: Oneway Expansion Synonyms

Next, we will demonstrate Oneway Expansion Synonyms. I have added the following synonym mapping for the word, ‘droid’. Note we must include the word ‘droid’ in the expansion synonyms on the right side of the mapping, as well as on the left.

droid => droid,android,robot,cyborg

When we perform a search on the term ‘droid’, Solr will also search for the synonyms ‘android’, ‘robot’, and ‘cyborg’. We can confirm this by viewing the query Solr constructs for the search term ‘droid’, as shown in the debugger snippet below.

+(
  Synonym(title:android title:cyborg title:droid title:robot) | 
  Synonym(plot:android plot:cyborg plot:droid plot:robot) | 
  Synonym(genres:android genres:cyborg genres:droid genres:robot)
)

Note the converse is not true since this is a one-way relationship. If we search on ‘cyborg’, Solr will not search on ‘droid’, ‘android’, and ‘robot’.

+(title:cyborg | plot:cyborg | genres:cyborg)

Performing the ‘droid’ eDisMax query returns 15 documents.

Parameters
----------
q: droid
kwargs: {
  'defType': 'edismax', 
  'qf': 'title plot genres', 
  'fl': 'title, score', 
  'rows': '5'
}

Results
----------
document count: 15
{'title': 'Robo Jî', 'score': 7.67}
{'title': "I'm a Cyborg, But That's OK", 'score': 7.13}
{'title': 'BV-01', 'score': 6.6}
{'title': 'Robot Chicken: DC Comics Special', 'score': 6.44}
{'title': 'Terminator 2: Judgment Day', 'score': 6.23}

Query 10c: Multiway Expansion Synonyms

Lastly, we will demonstrate Multiway Expansion Synonyms. I have added the following synonym mapping for the word, ‘scary’.

scary,slasher,spooky,evil,horror

If we perform a search on the term ‘scary’, Solr will also search for the synonyms ‘slasher’, ‘spooky’, ‘evil’, and ‘horror’. Unlike the previous example, the converse is true since this is a multi-way relationship. If we search on any of the five synonyms, Solr will also search on the other four terms and return identical results. We can confirm this by viewing the query Solr constructs for the term ‘scary’, as shown in the debug snippet below.

+(Synonym(title:evil title:horror title:scari title:slasher 
  title:spooki) | Synonym(plot:evil plot:horror plot:scari 
  plot:slasher plot:spooki) | Synonym(genres:evil genres:horror 
  genres:scari genres:slasher genres:spooki))

Performing the ‘scary’ eDisMax query returns 141 documents.

Parameters
----------
q: scary
kwargs: {
  'defType': 'edismax', 
  'qf': 'title plot genres', 
  'fl': 'title, score', 
  'rows': '5'
}

Results
----------
document count: 141
{'title': 'See No Evil, Hear No Evil', 'score': 7.9}
{'title': 'The Evil Dead', 'score': 7.23}
{'title': 'Evil Dead', 'score': 7.23}
{'title': 'Evil Ed', 'score': 7.23}
{'title': 'Evil Dead II', 'score': 6.37}

You may have noticed the other replacement synonym mapping I placed in the index’s synonmys.txt configuration file, shown below.

scarey => scary

You might be wondering if you entered the common misspelling ‘scarey’ as a search term, would Solr replace the term with ‘scary’ and search on the term ‘scary’, but also search on it’s four synonyms, ‘slasher’, ‘spooky’, ‘evil’, and ‘horror’. The answer is no, Solr will only search on ‘scary’. However, Solr does not create indirect or secondary relationships between synonym mappings. You would have to correct the spelling and input the term correctly to take advantage of the multi-way relationship.

Managing Unique Vocabulary with Synonyms

Large corporations, industry verticles, government agencies, and other entities often use a unique lexicon. Their vocabulary includes uncommon terms, phrases, acronyms, abbreviations, and other idioms. These commonly represent products and services, organizational structures, and technical jargon. Synonyms are an excellent way to support domain-specific dictions within indexes. We see this in the examples Solr includes in the synonmys.txt configuration file. The two examples, shown below, demonstrate how associate multiple abbreviations and technical terms to equivalent amounts of compute.

# Some synonym groups specific to this example
GB,gib,gigabyte,gigabytes
MB,mib,megabyte,megabytes

10d: Synonymous Phrases

A largely undocumented feature of synonyms is the ability to create synonymous relationships between common acronyms, abbreviations, terms, and phrases. Below I have provided a few examples of how to create these relationships. The only apparent, logical limitation, you cannot use stop words in the phrases; a good reason not to overuse stop words.

ai,artificial intelligence
cia,central intelligence agency
fbi,federal bureau investigation
lol,laughing out loud,league legends

For example, If we include the full phrase ‘league of legends’ in the synonyms map with ‘lol’, Solr ignores this phrase when I search on ‘LOL’. However, if we remove the stop word, ‘of’, then Solr will create a query that requires the two terms, ‘league’ and ‘legends’, or the two terms ‘laugh’ and ‘loud’, or the single term ‘lol’.

+(
  (((+title:laugh +title:out +title:loud) (+title:leagu +title:legend) title:lol)) | 
  (((+plot:laugh +plot:out +plot:loud) (+plot:leagu +plot:legend) plot:lol)) | 
  (((+genres:laugh +genres:out +genres:loud) (+genres:leagu +genres:legend) genres:lol))
)

Solr:

Parameters
----------
q: lol
kwargs: {
  'defType': 'edismax', 
  'qf': 'title plot genres', 
  'fl': 'title, score', 
  'rows': '5'
}

Results
----------
document count: 1
{'title': 'JK LOL', 'score': 9.05}

Conclusion

By understanding the capabilities of Apache Solr, the characteristics of the data contained in your indexes and the search patterns of your end users, you will be able to craft queries that ensure responses contain high-quality, relevant search results.

The query examples in this post demonstrate only a very small portion of Solr’s vast search capabilities. There are several additional query examples available in each of the two Python scripts, which you can uncomment and explore their results, further.

Solr’s documentation is very good; to learn more about Solr’s capabilities, I suggest reviewing the various parsers and their options, in the current Solr version 7.6 documentation.

I also suggest reviewing Solr’s Analyzers, Tokenizers, and Filters, and understand how they affect the way in which Solr indexes documents and how Solr interprets the content of the indexes when performing a search.

 

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

Feature Illustration Copyright: Dejan Bozic (123RF)

, , , , , , ,

Leave a comment

Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine

Leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications. In previous posts, we have integrated other SaaS products, including as MongoDB Atlas fully-managed MongoDB-as-a-service, ElephantSQL fully-manage PostgreSQL-as-a-service, and CloudAMQP RabbitMQ-as-a-service, into cloud-native applications on Azure, AWS, GCP, and PCF.

In this post, we will build and deploy an existing, Spring Framework, microservice-based, cloud-native API to Google Kubernetes Engine (GKE), replete with Istio 1.0, on Google Cloud Platform (GCP). The API will rely on Confluent Cloud to provide a fully-managed, Kafka-based messaging-as-a-service (MaaS). Similarly, the API will rely on MongoDB Atlas to provide a fully-managed, MongoDB-based Database-as-a-service (DBaaS).

Background

In a previous two-part post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1 and Part 2, we examined the role of Apache Kafka in an event-driven, eventually consistent, distributed system architecture. The system, an online storefront RESTful API simulation, was composed of multiple, Java Spring Boot microservices, each with their own MongoDB database. The microservices used a publish/subscribe model to communicate with each other using Kafka-based messaging. The Spring services were built using the Spring for Apache Kafka and Spring Data MongoDB projects.

Given the use case of placing an order through the Storefront API, we examined the interactions of three microservices, the Accounts, Fulfillment, and Orders service. We examined how the three services used Kafka to communicate state changes to each other, in a fully-decoupled manner.

The Storefront API’s microservices were managed behind an API Gateway, Netflix’s Zuul. Service discovery and load balancing were handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. In that post, the entire containerized system was deployed to Docker Swarm.

Kafka-Eventual-Cons-Swarm.png

Developing the services, not operationalizing the platform, was the primary objective of the previous post.

Featured Technologies

The following technologies are featured prominently in this post.

Confluent Cloud

confluent_cloud_apache-300x228

In May 2018, Google announced a partnership with Confluence to provide Confluent Cloud on GCP, a managed Apache Kafka solution for the Google Cloud Platform. Confluent, founded by the creators of Kafka, Jay Kreps, Neha Narkhede, and Jun Rao, is known for their commercial, Kafka-based streaming platform for the Enterprise.

Confluent Cloud is a fully-managed, cloud-based streaming service based on Apache Kafka. Confluent Cloud delivers a low-latency, resilient, scalable streaming service, deployable in minutes. Confluent deploys, upgrades, and maintains your Kafka clusters. Confluent Cloud is currently available on both AWS and GCP.

Confluent Cloud offers two plans, Professional and Enterprise. The Professional plan is optimized for projects under development, and for smaller organizations and applications. Professional plan rates for Confluent Cloud start at $0.55/hour. The Enterprise plan adds full enterprise capabilities such as service-level agreements (SLAs) with a 99.95% uptime and virtual private cloud (VPC) peering. The limitations and supported features of both plans are detailed, here.

MongoDB Atlas

mongodb

Similar to Confluent Cloud, MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime SLAs, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

MongoDB Atlas has been featured in several past posts, including Deploying and Configuring Istio on Google Kubernetes Engine (GKE) and Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.

Kubernetes Engine

gkeAccording to Google, Google Kubernetes Engine (GKE) provides a fully-managed, production-ready Kubernetes environment for deploying, managing, and scaling your containerized applications using Google infrastructure. GKE consists of multiple Google Compute Engine instances, grouped together to form a cluster.

A forerunner to other managed Kubernetes platforms, like EKS (AWS), AKS (Azure), PKS (Pivotal), and IBM Cloud Kubernetes Service, GKE launched publicly in 2015. GKE was built on Google’s experience of running hyper-scale services like Gmail and YouTube in containers for over 12 years.

GKE’s pricing is based on a pay-as-you-go, per-second-billing plan, with no up-front or termination fees, similar to Confluent Cloud and MongoDB Atlas. Cluster sizes range from 1 – 1,000 nodes. Node machine types may be optimized for standard workloads, CPU, memory, GPU, or high-availability. Compute power ranges from 1 – 96 vCPUs and memory from 1 – 624 GB of RAM.

Demonstration

In this post, we will deploy the three Storefront API microservices to a GKE cluster on GCP. Confluent Cloud on GCP will replace the previous Docker-based Kafka implementation. Similarly, MongoDB Atlas will replace the previous Docker-based MongoDB implementation.

ConfluentCloud-v3a.png

Kubernetes and Istio 1.0 will replace Netflix’s Zuul and  Eureka for API management, load-balancing, routing, and service discovery. Google Stackdriver will provide logging and monitoring. Docker Images for the services will be stored in Google Container Registry. Although not fully operationalized, the Storefront API will be closer to a Production-like platform, than previously demonstrated on Docker Swarm.

ConfluentCloudRouting.png

For brevity, we will not enable standard API security features like HTTPS, OAuth for authentication, and request quotas and throttling, all of which are essential in Production. Nor, will we integrate a full lifecycle API management tool, like Google Apigee.

Source Code

The source code for this demonstration is contained in four separate GitHub repositories, storefront-kafka-dockerstorefront-demo-accounts, storefront-demo-orders, and, storefront-demo-fulfillment. However, since the Docker Images for the three storefront services are available on Docker Hub, it is only necessary to clone the storefront-kafka-docker project. This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources (gist).

Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

Setup Process

The setup of the Storefront API platform is divided into a few logical steps:

  1. Create the MongoDB Atlas cluster;
  2. Create the Confluent Cloud Kafka cluster;
  3. Create Kafka topics;
  4. Modify the Kubernetes resources;
  5. Modify the microservices to support Confluent Cloud configuration;
  6. Create the GKE cluster with Istio on GCP;
  7. Apply the Kubernetes resources to the GKE cluster;
  8. Test the Storefront API, Kafka, and MongoDB are functioning properly;

MongoDB Atlas Cluster

This post assumes you already have a MongoDB Atlas account and an existing project created. MongoDB Atlas accounts are free to set up if you do not already have one. Account creation does require the use of a Credit Card.

For minimal latency, we will be creating the MongoDB Atlas, Confluent Cloud Kafka, and GKE clusters, all on the Google Cloud Platform’s us-central1 Region. Available GCP Regions and Zones for MongoDB Atlas, Confluent Cloud, and GKE, vary, based on multiple factors.

screen_shot_2018-12-23_at_6.48.12_pm

For this demo, I suggest creating a free, M0-sized MongoDB cluster. The M0-sized 3-data node cluster, with shared RAM and 512 MB of storage, and currently running MongoDB 4.0.4, is fine for individual development. The us-central1 Region is the only available US Region for the free-tier M0-cluster on GCP. An M0-sized Atlas cluster may take between 7-10 minutes to provision.

screen_shot_2018-12-23_at_6.49.24_pm

MongoDB Atlas’ Web-based management console provides convenient links to cluster details, metrics, alerts, and documentation.

screen_shot_2018-12-23_at_6.51.41_pm

Once the cluster is ready, you can review details about the cluster and each individual cluster node.

screen_shot_2018-12-23_at_6.51.54_pm

In addition to the account owner, create a demo_user account. This account will be used to authenticate and connect with the MongoDB databases from the storefront services. For this demo, we will use the same, single user account for all three services. In Production, you would most likely have individual users for each service.

screen_shot_2018-12-23_at_6.52.18_pm

Again, for security purposes, Atlas requires you to whitelist the IP address or CIDR block from which the storefront services will connect to the cluster. For now, open the access to your specific IP address using whatsmyip.com, or much less-securely, to all IP addresses (0.0.0.0/0). Once the GKE cluster and external static IP addresses are created, make sure to come back and update this value; do not leave this wide open to the Internet.

screen_shot_2018-12-23_at_6.52.36_pm

The Java Spring Boot storefront services use a Spring Profile, gke. According to Spring, Spring Profiles provide a way to segregate parts of your application configuration and make it available only in certain environments. The gke Spring Profile’s configuration values may be set in a number of ways. For this demo, the majority of the values will be set using Kubernetes Deployment, ConfigMap and Secret resources, shown later.

The first two Spring configuration values will need are the MongoDB Atlas cluster’s connection string and the demo_user account password. Note these both for later use.

screen_shot_2018-12-23_at_6.53.00_pm

Confluent Cloud Kafka Cluster

Similar to MongoDB Atlas, this post assumes you already have a Confluent Cloud account and an existing project. It is free to set up a Professional account and a new project if you do not already have one. Atlas account creation does require the use of a Credit Card.

The Confluent Cloud web-based management console is shown below. Experienced users of other SaaS platforms may find the Confluent Cloud web-based console a bit sparse on features. In my opinion, the console lacks some necessary features, like cluster observability, individual Kafka topic management, detailed billing history (always says $0?), and persistent history of cluster activities, which survives cluster deletion. It seems like Confluent prefers users to download and configure their Confluent Control Center to get the functionality you might normally expect from a web-based Saas management tool.

screen_shot_2018-12-23_at_6.34.18_pm

As explained earlier, for minimal latency, I suggest creating the MongoDB Atlas cluster, Confluent Cloud Kafka cluster, and the GKE cluster, all on the Google Cloud Platform’s us-central1 Region. For this demo, choose the smallest cluster size available on GCP, in the us-central1 Region, with 1 MB/s R/W throughput and 500 MB of storage. As shown below, the cost will be approximately $0.55/hour. Don’t forget to delete this cluster when you are done with the demonstration, or you will continue to be charged.

screen_shot_2018-12-23_at_6.34.56_pm

Cluster creation of the minimally-sized Confluent Cloud cluster is pretty quick.

screen_shot_2018-12-23_at_6.39.52_pmOnce the cluster is ready, Confluent provides instructions on how to interact with the cluster via the Confluent Cloud CLI. Install the Confluent Cloud CLI, locally, for use later.

screen_shot_2018-12-23_at_6.35.56_pm

As explained earlier, the Java Spring Boot storefront services use a Spring Profile, gke. Like MongoDB Atlas, the Confluent Cloud Kafka cluster configuration values will be set using Kubernetes ConfigMap and Secret resources, shown later. There are several Confluent Cloud Java configuration values shown in the Client Config Java tab; we will need these for later use.

screen_shot_2018-12-23_at_6.36.12_pm

SASL and JAAS

Some users may not be familiar with the terms, SASL and JAAS. According to Wikipedia, Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols. According to Confluent, Kafka brokers support client authentication via SASL. SASL authentication can be enabled concurrently with SSL encryption (SSL client authentication will be disabled).

There are numerous SASL mechanisms.  The PLAIN SASL mechanism (SASL/PLAIN), used by Confluent, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use. The SASL/PLAIN mechanism should only be used with SSL as a transport layer to ensure that clear passwords are not transmitted on the wire without encryption.

According to Wikipedia, Java Authentication and Authorization Service (JAAS) is the Java implementation of the standard Pluggable Authentication Module (PAM) information security framework. According to Confluent, Kafka uses the JAAS for SASL configuration. You must provide JAAS configurations for all SASL authentication mechanisms.

Cluster Authentication

Similar to MongoDB Atlas, we need to authenticate with the Confluent Cloud cluster from the storefront services. The authentication to Confluent Cloud is done with an API Key. Create a new API Key, and note the Key and Secret; these two additional pieces of configuration will be needed later.

screen_shot_2018-12-23_at_6.38.09_pm

Confluent Cloud API Keys can be created and deleted as necessary. For security in Production, API Keys should be created for each service and regularly rotated.

screen_shot_2018-12-23_at_6.38.21_pm

Kafka Topics

With the cluster created, create the storefront service’s three Kafka topics manually, using the Confluent Cloud’s ccloud CLI tool. First, configure the Confluent Cloud CLI using the ccloud init command, using your new cluster’s Bootstrap Servers address, API Key, and API Secret. The instructions are shown above Clusters Client Config tab of the Confluent Cloud web-based management interface.

screen_shot_2018-12-26_at_2.05.09_pm

Create the storefront service’s three Kafka topics using the ccloud topic create command. Use the list command to confirm they are created.

# manually create kafka topics
ccloud topic create accounts.customer.change
ccloud topic create fulfillment.order.change
ccloud topic create orders.order.fulfill
  
# list kafka topics
ccloud topic list
  
accounts.customer.change
fulfillment.order.change
orders.order.fulfill

Another useful ccloud command, topic describe, displays topic replication details. The new topics will have a replication factor of 3 and a partition count of 12.

screen_shot_2018-12-26_at_5.03.11_pm

Adding the --verbose flag to the command, ccloud --verbose topic describe, displays low-level topic and cluster configuration details, as well as a log of all topic-related activities.

screen_shot_2018-12-26_at_5.07.20_pm

Kubernetes Resources

The deployment of the three storefront microservices to the dev Namespace will minimally require the following Kubernetes configuration resources.

  • (1) Kubernetes Namespace;
  • (3) Kubernetes Deployments;
  • (3) Kubernetes Services;
  • (1) Kubernetes ConfigMap;
  • (2) Kubernetes Secrets;
  • (1) Istio 1.0 Gateway;
  • (1) Istio 1.0 VirtualService;
  • (2) Istio 1.0 ServiceEntry;

The Istio networking.istio.io v1alpha3 API introduced the last three configuration resources in the list, to control traffic routing into, within, and out of the mesh. There are a total of four new io networking.istio.io v1alpha3 API routing resources: Gateway, VirtualService, DestinationRule, and ServiceEntry.

Creating and managing such a large number of resources is a common complaint regarding the complexity of Kubernetes. Imagine the resource sprawl when you have dozens of microservices replicated across several namespaces. Fortunately, all resource files for this post are included in the storefront-kafka-docker project’s gke directory.

To follow along with the demo, you will need to make minor modifications to a few of these resources, including the Istio Gateway, Istio VirtualService, two Istio ServiceEntry resources, and two Kubernetes Secret resources.

Istio Gateway & VirtualService

Both the Istio Gateway and VirtualService configuration resources are contained in a single file, istio-gateway.yaml. For the demo, I am using a personal domain, storefront-demo.com, along with the sub-domain, api.dev, to host the Storefront API. The domain’s primary A record (‘@’) and sub-domain A record are both associated with the external IP address on the frontend of the load balancer. In the file, this host is configured for the Gateway and VirtualService resources. You can choose to replace the host with your own domain, or simply remove the host block altogether on lines 13–14 and 21–22. Removing the host blocks, you would then use the external IP address on the frontend of the load balancer (explained later in the post) to access the Storefront API (gist).

Istio ServiceEntry

There are two Istio ServiceEntry configuration resources. Both ServiceEntry resources control egress traffic from the Storefront API services, both of their ServiceEntry Location items are set to MESH_INTERNAL. The first ServiceEntry, mongodb-atlas-external-mesh.yaml, defines MongoDB Atlas cluster egress traffic from the Storefront API (gist).

The other ServiceEntry, confluent-cloud-external-mesh.yaml, defines Confluent Cloud Kafka cluster egress traffic from the Storefront API (gist).

Both need to have their host items replaced with the appropriate Atlas and Confluent URLs.

Inspecting Istio Resources

The easiest way to view Istio resources is from the command line using the istioctl and kubectl CLI tools.

istioctl get gateway
istioctl get virtualservices
istioctl get serviceentry
  
kubectl describe gateway
kubectl describe virtualservices
kubectl describe serviceentry

Multiple Namespaces

In this demo, we are only deploying to a single Kubernetes Namespace, dev. However, Istio will also support routing traffic to multiple namespaces. For example, a typical non-prod Kubernetes cluster might support devtest, and uat, each associated with a different sub-domain. One way to support multiple Namespaces with Istio 1.0 is to add each host to the Istio Gateway (lines 14–16, below), then create a separate Istio VirtualService for each Namespace. All the VirtualServices are associated with the single Gateway. In the VirtualService, each service’s host address is the fully qualified domain name (FQDN) of the service. Part of the FQDN is the Namespace, which we change for each for each VirtualService (gist).

MongoDB Atlas Secret

There is one Kubernetes Secret for the sensitive MongoDB configuration and one Secret for the sensitive Confluent Cloud configuration. The Kubernetes Secret object type is intended to hold sensitive information, such as passwords, OAuth tokens, and SSH keys.

The mongodb-atlas-secret.yaml file contains the MongoDB Atlas cluster connection string, with the demo_user username and password, one for each of the storefront service’s databases (gist).

Kubernetes Secrets are Base64 encoded. The easiest way to encode the secret values is using the Linux base64 program. The base64 program encodes and decodes Base64 data, as specified in RFC 4648. Pass each MongoDB URI string to the base64 program using echo -n.

MONGODB_URI=mongodb+srv://demo_user:your_password@your_cluster_address/accounts?retryWrites=true
echo -n $MONGODB_URI | base64

bW9uZ29kYitzcnY6Ly9kZW1vX3VzZXI6eW91cl9wYXNzd29yZEB5b3VyX2NsdXN0ZXJfYWRkcmVzcy9hY2NvdW50cz9yZXRyeVdyaXRlcz10cnVl

Repeat this process for the three MongoDB connection strings.

screen_shot_2018-12-26_at_2.15.21_pm

Confluent Cloud Secret

The confluent-cloud-kafka-secret.yaml file contains two data fields in the Secret’s data map, bootstrap.servers and sasl.jaas.config. These configuration items were both listed in the Client Config Java tab of the Confluent Cloud web-based management console, as shown previously. The sasl.jaas.config data field requires the Confluent Cloud cluster API Key and Secret you created earlier. Again, use the base64 encoding process for these two data fields (gist).

Confluent Cloud ConfigMap

The remaining five Confluent Cloud Kafka cluster configuration values are not sensitive, and therefore, may be placed in a Kubernetes ConfigMapconfluent-cloud-kafka-configmap.yaml (gist).

Accounts Deployment Resource

To see how the services consume the ConfigMap and Secret values, review the Accounts Deployment resource, shown below. Note the environment variables section, on lines 44–90, are a mix of hard-coded values and values referenced from the ConfigMap and two Secrets, shown above (gist).

Modify Microservices for Confluent Cloud

As explained earlier, Confluent Cloud’s Kafka cluster requires some very specific configuration, based largely on the security features of Confluent Cloud. Connecting to Confluent Cloud requires some minor modifications to the existing storefront service source code. The changes are identical for all three services. To understand the service’s code, I suggest reviewing the previous post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1. Note the following changes are already made to the source code in the gke git branch, and not necessary for this demo.

The previous Kafka SenderConfig and ReceiverConfig Java classes have been converted to Java interfaces. There are four new SenderConfigConfluent, SenderConfigNonConfluent, ReceiverConfigConfluent, and ReceiverConfigNonConfluent classes, which implement one of the new interfaces. The new classes contain the Spring Boot Profile class-level annotation. One set of Sender and Receiver classes are assigned the @Profile("gke") annotation, and the others, the @Profile("!gke") annotation. When the services start, one of the two class implementations are is loaded, depending on the Active Spring Profile, gke or not gke. To understand the changes better, examine the Account service’s SenderConfigConfluent.java file (gist).

Line 20: Designates this class as belonging to the gke Spring Profile.

Line 23: The class now implements an interface.

Lines 25–44: Reference the Confluent Cloud Kafka cluster configuration. The values for these variables will come from the Kubernetes ConfigMap and Secret, described previously, when the services are deployed to GKE.

Lines 55–59: Additional properties that have been added to the Kafka Sender configuration properties, specifically for Confluent Cloud.

Once code changes were completed and tested, the Docker Image for each service was rebuilt and uploaded to Docker Hub for public access. When recreating the images, the version of the Java Docker base image was upgraded from the previous post to Alpine OpenJDK 12 (openjdk:12-jdk-alpine).

Google Kubernetes Engine (GKE) with Istio

Having created the MongoDB Atlas and Confluent Cloud clusters, built the Kubernetes and Istio resources, modified the service’s source code, and pushed the new Docker Images to Docker Hub, the GKE cluster may now be built.

For the sake of brevity, we will manually create the cluster and deploy the resources, using the Google Cloud SDK gcloud and Kubernetes kubectl CLI tools, as opposed to automating with CI/CD tools, like Jenkins or Spinnaker. For this demonstration, I suggest a minimally-sized two-node GKE cluster using n1-standard-2 machine-type instances. The latest available release of Kubernetes on GKE at the time of this post was 1.11.5-gke.5 and Istio 1.03 (Istio on GKE still considered beta). Note Kubernetes and Istio are evolving rapidly, thus the configuration flags often change with newer versions. Check the GKE Clusters tab for the latest clusters create command format (gist).

Executing these commands successfully will build the cluster and the dev Namespace, into which all the resources will be deployed. The two-node cluster creation process takes about three minutes on average.

screen_shot_2018-12-26_at_2.00.56_pm

We can also observe the new GKE cluster from the GKE Clusters Details tab.

screen_shot_2018-12-26_at_2.18.32_pm

Creating the GKE cluster also creates several other GCP resources, including a TCP load balancer and three external IP addresses. Shown below in the VPC network External IP addresses tab, there is one IP address associated with each of the two GKE cluster’s VM instances, and one IP address associated with the frontend of the load balancer.

screen_shot_2018-12-26_at_2.59.38_pm

While the TCP load balancer’s frontend is associated with the external IP address, the load balancer’s backend is a target pool, containing the two GKE cluster node machine instances.

screen_shot_2018-12-26_at_2.58.42_pm

A forwarding rule associates the load balancer’s frontend IP address with the backend target pool. External requests to the frontend IP address will be routed to the GKE cluster. From there, requests will be routed by Kubernetes and Istio to the individual storefront service Pods, and through the Istio sidecar (Envoy) proxies. There is an Istio sidecar proxy deployed to each Storefront service Pod.

screen_shot_2018-12-26_at_2.59.59_pm

Below, we see the details of the load balancer’s target pool, containing the two GKE cluster’s VMs.

screen_shot_2018-12-26_at_3.57.03_pm.png

As shown at the start of the post, a simplified view of the GCP/GKE network routing looks as follows. For brevity, firewall rules and routes are not illustrated in the diagram.

ConfluentCloudRouting

Apply Kubernetes Resources

Again, using kubectl, deploy the three services and associated Kubernetes and Istio resources. Note the Istio Gateway and VirtualService(s) are not deployed to the dev Namespace since their role is to control ingress and route traffic to the dev Namespace and the services within it (gist).

Once these commands complete successfully, on the Workloads tab, we should observe two Pods of each of the three storefront service Kubernetes Deployments deployed to the dev Namespace, all six Pods with a Status of ‘OK’. A Deployment controller provides declarative updates for Pods and ReplicaSets.

screen_shot_2018-12-26_at_2.51.01_pm

On the Services tab, we should observe the three storefront service’s Kubernetes Services. A Service in Kubernetes is a REST object.

screen_shot_2018-12-26_at_2.51.16_pm

On the Configuration Tab, we should observe the Kubernetes ConfigMap and two Secrets also deployed to the dev Environment.

screen_shot_2018-12-26_at_2.51.36_pm

Below, we see the confluent-cloud-kafka ConfigMap resource with its data map of Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.54.51_pm

Below, we see the confluent-cloud-kafka Secret with its data map of sensitive Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.55.17_pm

Test the Storefront API

If you recall from part two of the previous post, there are a set of seven Storefront API endpoints that can be called to create sample data and test the API. The HTTP GET Requests hit each service, generate test data, populate the three MongoDB databases, and produce and consume Kafka messages across all three topics. Making these requests is the easiest way to confirm the Storefront API is working properly.

  1. Sample Customer: accounts/customers/sample
  2. Sample Orders: orders/customers/sample/orders
  3. Sample Fulfillment Requests: orders/customers/sample/fulfill
  4. Sample Processed Order Event: fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Event: fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Event: fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Event: fulfillment/fulfillment/sample/receive

Thee are a wide variety of tools to interact with the Storefront API. The project includes a simple Python script, sample_data.py, which will make HTTP GET requests to each of the above endpoints, after confirming their health, and return a success message.

screen_shot_2018-12-31_at_12.19.50_pm.png

Postman

Postman, my personal favorite, is also an excellent tool to explore the Storefront API resources. I have the above set of the HTTP GET requests saved in a Postman Collection. Using Postman, below, we see the response from an HTTP GET request to the /accounts/customers endpoint.

screen_shot_2018-12-26_at_5.48.34_pm

Postman also allows us to create integration tests and run Collections of Requests in batches using Postman’s Collection Runner. To test the Storefront API, below, I used Collection Runner to run a single series of integration tests, intended to confirm the API’s functionality, by checking for expected HTTP response codes and expected values in the response payloads. Postman also shows the response times from the Storefront API. Since this platform was not built to meet Production SLAs, measuring response times is less critical in the Development environment.

screen_shot_2018-12-26_at_5.47.57_pm

Google Stackdriver

If you recall, the GKE cluster had the Stackdriver Kubernetes option enabled, which gives us, amongst other observability features, access to all cluster, node, pod, and container logs. To confirm data is flowing to the MongoDB databases and Kafka topics, we can check the logs from any of the containers. Below we see the logs from the two Accounts Pod containers. Observe the AfterSaveListener handler firing on an onAfterSave event, which sends a CustomerChangeEvent payload to the accounts.customer.change Kafka topic, without error. These entries confirm that both Atlas and Confluent Cloud are reachable by the GKE-based workloads, and appear to be functioning properly.

screen_shot_2018-12-26_at_8.05.50_pm.png

MongoDB Atlas Collection View

Review the MongoDB Atlas Clusters Collections tab. In this Development environment, the MongoDB databases and collections are created the first time a service tries to connects to them. In Production, the databases would be created and secured in advance of deploying resources. Once the sample data requests are completed successfully, you should now observe the three Storefront API databases, each with collections of documents.

screen_shot_2018-12-26_at_4.56.25_pm

MongoDB Compass

In addition to the Atlas web-based management console, MongoDB Compass is an excellent desktop tool to explore and manage MongoDB databases. Compass is available for Mac, Linux, and Windows. One of the many great features of Compass is the ability to visualize collection schemas and interactively filter documents. Below we see the fulfillment.requests collection schema.

Screen Shot 2019-01-20 at 10.21.54 AM.png

Confluent Control Center

Confluent Control Center is a downloadable, web browser-based tool for managing and monitoring Apache Kafka, including your Confluent Cloud clusters. Confluent Control Center provides rich functionality for building and monitoring production data pipelines and streaming applications. Confluent offers a free 30-day trial of Confluent Control Center. Since the Control Center is provided at an additional fee, and I found difficult to configure for Confluent Cloud clusters based on Confluent’s documentation, I chose not to cover it in detail, for this post.

screen_shot_2018-12-23_at_10.21.41_pm

screen_shot_2018-12-23_at_10.48.49_pm

Tear Down Cluster

Delete your Confluent Cloud and MongoDB clusters using their web-based management consoles. To delete the GKE cluster and all deployed Kubernetes resources, use the cluster delete command. Also, double-check that the external IP addresses and load balancer, associated with the cluster, were also deleted as part of the cluster deletion (gist).

Conclusion

In this post, we have seen how easy it is to integrate Cloud-based DBaaS and MaaS products with the managed Kubernetes services from GCP, AWS, and Azure. As this post demonstrated, leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications.

In future posts, we will revisit this Storefront API example, further demonstrating how to enable HTTPS (Securing Your Istio Ingress Gateway with HTTPS) and end-user authentication (Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , , ,

4 Comments

Using the Google Cloud Dataproc WorkflowTemplates API to Automate Spark and Hadoop Workloads on GCP

In the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we explored Google Cloud Dataproc using the Google Cloud Console as well as the Google Cloud SDK and Cloud Dataproc API. We created clusters, then uploaded and ran Spark and PySpark jobs, then deleted clusters, each as discrete tasks. Although each task could be done via the Dataproc API and therefore automatable, they were independent tasks, without awareness of the previous task’s state.

Screen Shot 2018-12-15 at 11.39.26 PM.png

In this brief follow-up post, we will examine the Cloud Dataproc WorkflowTemplates API to more efficiently and effectively automate Spark and Hadoop workloads. According to Google, the Cloud Dataproc WorkflowTemplates API provides a flexible and easy-to-use mechanism for managing and executing Dataproc workflows. A Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. A Workflow is an operation that runs a Directed Acyclic Graph (DAG) of jobs on a cluster. Shown below, we see one of the Workflows that will be demonstrated in this post, displayed in Spark History Server Web UI.

screen-shot-2018-12-16-at-11.07.29-am.png

Here we see a four-stage DAG of one of the three jobs in the workflow, displayed in Spark History Server Web UI.

screen-shot-2018-12-16-at-11.18.45-am

Workflows are ideal for automating large batches of dynamic Spark and Hadoop jobs, and for long-running and unattended job execution, such as overnight.

Demonstration

Using the Python and Java projects from the previous post, we will first create workflow templates using the just the WorkflowTemplates API. We will create the template, set a managed cluster, add jobs to the template, and instantiate the workflow. Next, we will further optimize and simplify our workflow by using a YAML-based workflow template file. The YAML-based template file eliminates the need to make API calls to set the template’s cluster and add the jobs to the template. Finally, to further enhance the workflow and promote re-use of the template, we will incorporate parameterization. Parameters will allow us to pass parameters (key/value) pairs from the command line to workflow template, and on to the Python script as input arguments.

It is not necessary to use the Google Cloud Console for this post. All steps will be done using Google Cloud SDK shell commands. This means all steps may be automated using CI/CD DevOps tools, like Jenkins and Spinnaker on GKE.

Source Code

All open-sourced code for this post can be found on GitHub within three repositories: dataproc-java-demodataproc-python-demo, and dataproc-workflow-templates. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

WorkflowTemplates API

Always start by ensuring you have the latest Google Cloud SDK updates and are working within the correct Google Cloud project.

gcloud components update

export PROJECT_ID=your-project-id 
gcloud config set project $PROJECT

Set the following variables based on your Google environment. The variables will be reused throughout the post for multiple commands.

export REGION=your-region
export ZONE=your-zone
export BUCKET_NAME=your-bucket

The post assumes you still have the Cloud Storage bucket we created in the previous post. In the bucket, you will need the two Kaggle IBRD CSV files, available on Kaggle, the compiled Java JAR file from the dataproc-java-demo project, and a new Python script, international_loans_dataproc.py, from the dataproc-python-demo project.

screen-shot-2018-12-16-at-12.03.51-pm

Use gsutil with the copy (cp) command to upload the four files to your Storage bucket.

gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME
gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME
gsutil cp international_loans_dataproc.py $BUCKET_NAME

Following Google’s suggested process, we create a workflow template using the workflow-templates create command.

export TEMPLATE_ID=template-demo-1
  
gcloud dataproc workflow-templates create \
  $TEMPLATE_ID --region $REGION

Adding a Cluster

Next, we need to set a cluster for the workflow to use, in order to run the jobs. Cloud Dataproc will create and use a Managed Cluster for your workflow or use an existing cluster. If the workflow uses a managed cluster, it creates the cluster, runs the jobs, and then deletes the cluster when the jobs are finished. This means, for many use cases, there is no need to maintain long-lived clusters, they become just an ephemeral part of the workflow.

We set a managed cluster for our Workflow using the workflow-templates set-managed-cluster command. We will re-use the same cluster specifications we used in the previous post, the Standard, 1 master node and 2 worker nodes, cluster type.

gcloud dataproc workflow-templates set-managed-cluster \
  $TEMPLATE_ID \
  --region $REGION \
  --zone $ZONE \
  --cluster-name three-node-cluster \
  --master-machine-type n1-standard-4 \
  --master-boot-disk-size 500 \
  --worker-machine-type n1-standard-4 \
  --worker-boot-disk-size 500 \
  --num-workers 2 \
  --image-version 1.3-deb9

Alternatively, if we already had an existing cluster, we would use the workflow-templates set-cluster-selector command, to associate that cluster with the workflow template.

gcloud dataproc workflow-templates set-cluster-selector \
  $TEMPLATE_ID \
  --region $REGION \
  --cluster-labels goog-dataproc-cluster-uuid=$CLUSTER_UUID

To get the existing cluster’s UUID label value, you could use a command similar to the following.

CLUSTER_UUID=$(gcloud dataproc clusters describe $CLUSTER_2 \
  --region $REGION \
  | grep 'goog-dataproc-cluster-uuid:' \
  | sed 's/.* //')

echo $CLUSTER_UUID

1c27efd2-f296-466e-b14e-c4263d0d7e19

Adding Jobs

Next, we add the jobs we want to run to the template. Each job is considered a step in the template, each step requires a unique step id. We will add three jobs to the template, two Java-based Spark jobs from the previous post, and a new Python-based PySpark job.

First, we add the two Java-based Spark jobs, using the workflow-templates add-job spark command. This command’s flags are nearly identical to the dataproc jobs submit spark command, used in the previous post.

export STEP_ID=ibrd-small-spark
  
gcloud dataproc workflow-templates add-job spark \
  --region $REGION \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --class org.example.dataproc.InternationalLoansAppDataprocSmall \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar

export STEP_ID=ibrd-large-spark
  
gcloud dataproc workflow-templates add-job spark \
  --region $REGION \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --class org.example.dataproc.InternationalLoansAppDataprocLarge \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar

Next, we add the Python-based PySpark job, international_loans_dataproc.py, as the second job in the template. This Python script requires three input arguments, on lines 15–17, which are the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket where the results will be placed (gist).

We pass the arguments to the Python script as part of the PySpark job, using the workflow-templates add-job pyspark command.

export STEP_ID=ibrd-large-pyspark
  
gcloud dataproc workflow-templates add-job pyspark \
  $BUCKET_NAME/international_loans_dataproc.py \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --region $REGION \
  -- $BUCKET_NAME \
     ibrd-statement-of-loans-historical-data.csv \
     ibrd-summary-large-python

That’s it, we have created our first Cloud Dataproc Workflow Template using the Dataproc WorkflowTemplate API. To view our template we can use the following two commands. First, use the workflow-templates list command to display a list of available templates. The list command output displays the version of the workflow template and how many jobs are in the template.

gcloud dataproc workflow-templates list --region $REGION
  
ID               JOBS  UPDATE_TIME               VERSION
template-demo-1  3     2018-12-15T16:32:06.508Z  5

Then, we use the workflow-templates describe command to show the details of a specific template.

gcloud dataproc workflow-templates describe \
  $TEMPLATE_ID --region $REGION

Using the workflow-templates describe command, we should see output similar to the following (gist).

In the template description, notice the template’s id, the managed cluster in the placement section, and the three jobs, all which we added using the above series of workflow-templates commands. Also, notice the creation and update timestamps and version number, which were automatically generated by Dataproc. Lastly, notice the name, which refers to the GCP project and region where this copy of the template is located. Had we used an existing cluster with our workflow, as opposed to a managed cluster, the placement section would have looked as follows.

placement:
  clusterSelector:
    clusterLabels:
      goog-dataproc-cluster-uuid: your_clusters_uuid_label_value

To instantiate the workflow, we use the workflow-templates instantiate command. This command will create the managed cluster, run all the steps (jobs), then delete the cluster. I have added the time command to see how fast the workflow will take to complete.

time gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION #--async

We can observe the progress from the Google Cloud Dataproc Console, or from the command line by omitting the --async flag. Below we see the three jobs completed successfully on the managed cluster.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/e720bb96-9c87-330e-b1cd-efa4612b3c57].
WorkflowTemplate [template-demo-1] RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/e1fe53de-92f2-4f8c-8b3a-fda5e13829b6].
Created cluster: three-node-cluster-ugdo4ygpl52bo.
Job ID ibrd-small-spark-ugdo4ygpl52bo RUNNING
Job ID ibrd-large-spark-ugdo4ygpl52bo RUNNING
Job ID ibrd-large-pyspark-ugdo4ygpl52bo RUNNING
Job ID ibrd-small-spark-ugdo4ygpl52bo COMPLETED
Job ID ibrd-large-spark-ugdo4ygpl52bo COMPLETED
Job ID ibrd-large-pyspark-ugdo4ygpl52bo COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/f2a40c33-3cdf-47f5-92d6-345463fbd404].
WorkflowTemplate [template-demo-1] DONE
Deleted cluster: three-node-cluster-ugdo4ygpl52bo.

1.02s user 0.35s system 0% cpu 5:03.55 total

In the output, you see the creation of the cluster, the three jobs running and completing successfully, and finally the cluster deletion. The entire workflow took approximately 5 minutes to complete. Below is the view of the workflow’s results from the Dataproc Clusters Console Jobs tab.

screen_shot_2018-12-15_at_11.42.44_am

Below we see the output from the PySpark job, run as part of the workflow template, shown in the Dataproc Clusters Console Output tab. Notice the three input arguments we passed to the Python script from the workflow template, listed in the output.

screen_shot_2018-12-15_at_11.43.56_am

We see the arguments passed to the job, from the Jobs Configuration tab.

screen_shot_2018-12-15_at_1.11.11_pm.png

Examining the Google Cloud Dataproc Jobs Console, we will observe that the WorkflowTemplate API automatically adds a unique alphanumeric extension to both the name of the managed clusters we create, as well as to the name of each job that is run. The extension on the cluster name matches the extension on the jobs ran on that cluster.

screen_shot_2018-12-15_at_1.05.41_pm

YAML-based Workflow Template

Although, the above WorkflowTemplates API-based workflow was certainly more convenient than using the individual Cloud Dataproc API commands. At a minimum, we don’t have to remember to delete our cluster when the jobs are complete, as I often do. To further optimize the workflow, we will introduce YAML-based Workflow Template. According to Google, you can define a workflow template in a YAML file, then instantiate the template to run the workflow. You can also import and export a workflow template YAML file to create and update a Cloud Dataproc workflow template resource.

We can export our first workflow template to create our YAML-based template file.

gcloud dataproc workflow-templates export template-demo-1 \
  --destination template-demo-2.yaml \
  --region $REGION

Below is our first YAML-based template, template-demo-2.yaml. You will need to replace the values in the template with your own values, based on your environment (gist).

Note the template looks almost similar to the template we just created previously using the WorkflowTemplates API. The YAML-based template requires the placement and jobs fields. All the available fields are detailed, here.

To run the template we use the workflow-templates instantiate-from-file command. Again, I will use the time command to measure performance.

time gcloud dataproc workflow-templates instantiate-from-file \
  --file template-demo-2.yaml \
  --region $REGION

Running the workflow-templates instantiate-from-file command will run a workflow, nearly identical to the workflow we ran in the previous example, with a similar timing. Below we see the three jobs completed successfully on the managed cluster, in approximately the same time as the previous workflow.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/7ba3c28e-ebfa-32e7-9dd6-d938a1cfe23b].
WorkflowTemplate RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/8d05199f-ed36-4787-8a28-ae784c5bc8ae].
Created cluster: three-node-cluster-5k3bdmmvnna2y.
Job ID ibrd-small-spark-5k3bdmmvnna2y RUNNING
Job ID ibrd-large-spark-5k3bdmmvnna2y RUNNING
Job ID ibrd-large-pyspark-5k3bdmmvnna2y RUNNING
Job ID ibrd-small-spark-5k3bdmmvnna2y COMPLETED
Job ID ibrd-large-spark-5k3bdmmvnna2y COMPLETED
Job ID ibrd-large-pyspark-5k3bdmmvnna2y COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/a436ae82-f171-4b0a-9b36-5e16406c75d5].
WorkflowTemplate DONE
Deleted cluster: three-node-cluster-5k3bdmmvnna2y.

1.16s user 0.44s system 0% cpu 4:48.84 total

Parameterization of Templates

To further optimize the workflow template process for re-use, we have the option of passing parameters to our template. Imagine you now receive new loan snapshot data files every night. Imagine you need to run the same data analysis on the financial transactions of thousands of your customers, nightly. Parameterizing templates makes it more flexible and reusable. By removing hard-codes values, such as Storage bucket paths and data file names, a single template may be re-used for multiple variations of the same job. Parameterization allows you to automate hundreds or thousands of Spark and Hadoop jobs in a workflow or workflows, each with different parameters, programmatically.

To demonstrate the parameterization of a workflow template, we create another YAML-based template with just the Python/PySpark job, template-demo-3.yaml. If you recall from our first example, the Python script, international_loans_dataproc.py, requires three input arguments: the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket, where the results will be placed.

We will replace four of the values in the template with parameters. We will inject those parameter’s values when we instantiate the workflow. Below is the new parameterized template. The template now has a parameters section from lines 26–46. They define parameters that will be used to replace the four values on lines 3–7 (gist).

Note the PySpark job’s three arguments and the location of the Python script have been parameterized. Parameters may include validation. As an example of validation, the template uses regex to validate the format of the Storage bucket path. The regex follows Google’s RE2 regular expression library syntax. If you need help with regex, the Regex Tester – Golang website is a convenient way to test your parameter’s regex validations.

First, we import the new parameterized YAML-based workflow template, using the workflow-templates import command. Then, we instantiate the template using the workflow-templates instantiate command. The workflow-templates instantiate command will run the single PySpark job, analyzing the smaller IBRD data file, and placing the resulting Parquet-format file in a directory within the Storage bucket. We pass the Python script location, bucket link, smaller IBRD data file name, and output directory, as parameters to the template, and therefore indirectly, three of these, as input arguments to the Python script.

export TEMPLATE_ID=template-demo-3

gcloud dataproc workflow-templates import $TEMPLATE_ID \
   --region $REGION --source template-demo-3.yaml
  
gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION --async \
  --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-latest-available-snapshot.csv",RESULTS_DIRECTORY="ibrd-summary-small-python"

Next, we will analyze the larger historic data file, using the same parameterized YAML-based workflow template, but changing two of the four parameters we are passing to the template with the workflow-templates instantiate command. This will run a single PySpark job on the larger IBRD data file and place the resulting Parquet-format file in a different directory within the Storage bucket.

time gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION \
  --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-historical-data.csv",RESULTS_DIRECTORY="ibrd-summary-large-python"

This is the power of parameterization—one workflow template and one job script, but two different datasets and two different results.

Below we see the single PySpark job ran on the managed cluster.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/b3c5063f-e3cf-3833-b613-83db12b82f32].
WorkflowTemplate [template-demo-3] RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105].
Created cluster: three-node-cluster-j6q2al2mkkqck.
Job ID ibrd-pyspark-j6q2al2mkkqck RUNNING
Job ID ibrd-pyspark-j6q2al2mkkqck COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/fe4a263e-7c6d-466e-a6e2-52292cbbdc9b].
WorkflowTemplate [template-demo-3] DONE
Deleted cluster: three-node-cluster-j6q2al2mkkqck.

0.98s user 0.40s system 0% cpu 4:19.42 total

Using the workflow-templates list command again, should display a list of two workflow templates.

gcloud dataproc workflow-templates list --region $REGION
  
ID               JOBS  UPDATE_TIME               VERSION
template-demo-3  1     2018-12-15T17:04:39.064Z  2
template-demo-1  3     2018-12-15T16:32:06.508Z  5

Looking within the Google Cloud Storage bucket, we should now see four different folders, the results of the workflows.

screen-shot-2018-12-16-at-11.58.32-am.png

Job Results and Testing

To check on the status of a job, we use the dataproc jobs wait command. This returns the standard output (stdout) and standard error (stderr) for that specific job.

export SET_ID=ibrd-large-dataset-pyspark-cxzzhr2ro3i54
  
gcloud dataproc jobs wait $SET_ID \
  --project $PROJECT_ID \
  --region $REGION

The dataproc jobs wait command is frequently used for automated testing of jobs, often within a CI/CD pipeline. Assume we have expected part of the job output that indicates success, such as a string, boolean, or numeric value. We could any number of test frameworks or other methods to confirm the existence of that expected value or values. Below is a simple example of using the grep command to check for the existence of the expected line ‘ state: FINISHED’ in the standard output of the dataproc jobs wait command.

command=$(gcloud dataproc jobs wait $SET_ID \
--project $PROJECT_ID \
--region $REGION) &>/dev/null

if grep -Fqx "  state: FINISHED" <<< $command &>/dev/null; then
  echo "Job Success!"
else
  echo "Job Failure?"
fi

# single line alternative
if grep -Fqx "  state: FINISHED" <<< $command &>/dev/null;then echo "Job Success!";else echo "Job Failure?";fi

Job Success!

Individual Operations

To view individual workflow operations, use the operations list and operations describe commands. The operations list command will list all operations.

Notice the three distinct series of operations within each workflow, shown with the operations list command: WORKFLOW, CREATE, and DELETE. In the example below, I’ve separated the operations by workflow, for better clarity.

gcloud dataproc operations list --region $REGION

NAME                                  TIMESTAMP                 TYPE      STATE  ERROR  WARNINGS
fe4a263e-7c6d-466e-a6e2-52292cbbdc9b  2018-12-15T17:11:45.178Z  DELETE    DONE
896b7922-da8e-49a9-bd80-b1ac3fda5105  2018-12-15T17:08:38.322Z  CREATE    DONE
b3c5063f-e3cf-3833-b613-83db12b82f32  2018-12-15T17:08:37.497Z  WORKFLOW  DONE
---
be0e5293-275f-46ad-b1f4-696ba44c222e  2018-12-15T17:07:26.305Z  DELETE    DONE
6784078c-cbe3-4c1e-a56e-217149f555a4  2018-12-15T17:04:40.613Z  CREATE    DONE
fcd8039e-a260-3ab3-ad31-01abc1a524b4  2018-12-15T17:04:40.007Z  WORKFLOW  DONE
---
b4b23ca6-9442-4ffb-8aaf-460bac144dd8  2018-12-15T17:02:16.744Z  DELETE    DONE
89ef9c7c-f3c9-4d01-9091-61ed9e1f085d  2018-12-15T17:01:45.514Z  CREATE    DONE
243fa7c1-502d-3d7a-aaee-b372fe317570  2018-12-15T17:01:44.895Z  WORKFLOW  DONE

We use the results of the operations list command to execute the operations describe command to describe a specific operation.

gcloud dataproc operations describe \
  projects/$PROJECT_ID/regions/$REGION/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105

Each type of operation contains different details. Note the fine-grain of detail we get from Dataproc using the operations describe command for a CREATE operation (gist).

Conclusion

In this brief, follow-up post to the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we have seen how easy the WorkflowTemplates API and YAML-based workflow templates make automating our analytics jobs. This post only scraped the surface of the complete functionality of the WorkflowTemplates API and parameterization of templates.

In a future post, we leverage the automation capabilities of the Google Cloud Platform, the WorkflowTemplates API, YAML-based workflow templates, and parameterization, to develop a fully-automated DevOps for Big Data workflow, capable of running hundreds of Spark and Hadoop jobs.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

1 Comment

Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives.

However, installing, configuring, and managing the technologies that support big data analytics, data science, ML, and AI, at scale and in Production, often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. The mere ability to manage terabytes and petabytes of transient data is beyond the capability of many enterprises, let alone performing analysis of that data.

To ease the burden of implementing these technologies, the three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data Analytics-, AI-, and ML-as-a-Service offerings. In this post, we will explore one such cloud-based service offering in the field of big data analytics, Google Cloud Dataproc. We will focus on Cloud Dataproc’s ability to quickly and efficiently run Spark jobs written in Java and Python, two widely adopted enterprise programming languages.

Featured Technologies

The following technologies are featured prominently in this post.

dataproc

Google Cloud Dataproc

dataproc_logoAccording to Google, Cloud Dataproc is a fast, easy-to-use, fully-managed cloud service for running the Apache Spark and Apache Hadoop ecosystem on Google Cloud Platform. Dataproc is a complete platform for data processing, analytics, and machine learning. Dataproc offers per-second billing, so you only pay for exactly the resources you consume. Dataproc offers frequently updated and native versions of Apache Spark, Hadoop, Pig, and Hive, as well as other related applications. Dataproc has built-in integrations with other Google Cloud Platform (GCP) services, such as Cloud Storage, BigQuery, Bigtable, Stackdriver Logging, and Stackdriver Monitoring. Dataproc’s clusters are configurable and resizable from a three to hundreds of nodes, and each cluster action takes less than 90 seconds on average.

Similar Platform as a Service (PaaS) offerings to Dataproc, include Amazon Elastic MapReduce (EMR), Microsoft Azure HDInsight, and Qubole Data Service. Qubole is offered on AWS, Azure, and Oracle Cloud Infrastructure (Oracle OCI).

According to Google, Cloud Dataproc and Cloud Dataflow, both part of GCP’s Data Analytics/Big Data Product offerings, can both be used for data processing, and there’s overlap in their batch and streaming capabilities. Cloud Dataflow is a fully-managed service for transforming and enriching data in stream and batch modes. Dataflow uses the Apache Beam SDK to provide developers with Java and Python APIs, similar to Spark.

Apache Spark

spark_logoAccording to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With in-memory speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.

According to a post by DataFlair, ‘the DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD (Resilient Distributed Dataset). In Spark DAG, every edge directs from earlier to later in the sequence. On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.’ Below, we see a three-stage DAG visualization, displayed using the Spark History Server Web UI, from a job demonstrated in this post.

Screen Shot 2018-12-15 at 11.20.57 PM

Spark’s polyglot programming model allows users to write applications in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). Spark may be run using its standalone cluster mode or on Apache Hadoop YARNMesos, and Kubernetes.

PySpark

pyspark_logoThe Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.

Apache Hadoop

hadoop_logo1According to Apache, the Apache Hadoop project develops open-source software for reliable, scalable, distributed computing. The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. This is a rather modest description of such a significant and transformative project. When we talk about Hadoop, often it is in the context of the project’s well-known modules, which includes:

  • Hadoop Common: The common utilities that support the other Hadoop modules
  • Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data
  • Hadoop YARN (Yet Another Resource Negotiator): A framework for job scheduling and cluster resource management, also known as ‘Hadoop NextGen’
  • Hadoop MapReduce: A YARN-based system for parallel processing of large datasets
  • Hadoop Ozone: An object store for Hadoop

Based on the Powered by Apache Hadoop list, there are many well-known enterprises and academic institutions using Apache Hadoop, including Adobe, eBay, Facebook, Hulu, LinkedIn, and The New York Times.

Spark vs. Hadoop

There are many articles and posts that delve into the Spark versus Hadoop debate, this post is not one of them. Although both are mature technologies, Spark, the new kid on the block, reached version 1.0.0 in May 2014, whereas Hadoop reached version 1.0.0, earlier, in December 2011. According to Google Trends, interest in both technologies has remained relatively high over the last three years. However, interest in Spark, based on the volume of searches, has been steadily outpacing Hadoop for well over a year now. The in-memory speed of Spark over HDFS-based Hadoop and ease of Spark SQL for working with structured data are likely big differentiators for many users coming from a traditional relational database background and users with large or streaming datasets, requiring near real-time processing.

spark-to-hadoop

In this post, all examples are built to run on Spark. This is not meant to suggest Spark is necessarily superior or that Spark runs better on Dataproc than Hadoop. In fact, Dataproc’s implementation of Spark relies on Hadoop’s core HDFS and YARN technologies to run.

Demonstration

To show the capabilities of Cloud Dataproc, we will create both a single-node Dataproc cluster and three-node cluster, upload Java- and Python-based analytics jobs and data to Google Cloud Storage, and execute the jobs on the Spark cluster. Finally, we will enable monitoring and notifications for the Dataproc clusters and the jobs running on the clusters with Stackdriver. The post will demonstrate the use of the Google Cloud Console, as well as Google’s Cloud SDK’s command line tools, for all tasks.

In this post, we will be uploading and running individual jobs on the Dataproc Spark cluster, as opposed to using the Cloud Dataproc Workflow Templates. According to Google, Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. Workflow Templates are useful for automating your Datapoc workflows, however, automation is not the primary topic of this post.

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for Java with Spark and one for Python with PySpark. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

Cost

Of course, there is a cost associated with provisioning cloud services. However, if you manage the Google Cloud Dataproc resources prudently, the costs are negligible. Regarding pricing, according to Google, Cloud Dataproc pricing is based on the size of Cloud Dataproc clusters and the duration of time that they run. The size of a cluster is based on the aggregate number of virtual CPUs (vCPUs) across the entire cluster, including the master and worker nodes. The duration of a cluster is the length of time, measured in minutes, between cluster creation and cluster deletion.

Over the course of writing the code for this post, as well as writing the post itself, the entire cost of all the related resources was a minuscule US$7.50. The cost includes creating, running, and deleting more than a dozen Dataproc clusters and uploading and executing approximately 75-100 Spark and PySpark jobs. Given the quick creation time of a cluster, 2 minutes on average or less in this demonstration, there is no reason to leave a cluster running longer than it takes to complete your workloads.

Kaggle Datasets

To explore the features of Dataproc, we will use a publicly-available dataset from Kaggle. Kaggle is a popular open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.

For this demonstration, I chose the IBRD Statement Of Loans Data dataset, from World Bank Financial Open Data, and available on Kaggle. The International Bank for Reconstruction and Development (IBRD) loans are public and publicly guaranteed debt extended by the World Bank Group. IBRD loans are made to, or guaranteed by, countries that are members of IBRD. This dataset contains historical snapshots of the Statement of Loans including the latest available snapshots.

screen_shot_2018-12-04_at_7.02.53_pm

There are two data files available. The ‘Statement of Loans’ latest available snapshots data file contains 8,713 rows of loan data (~3 MB), ideal for development and testing. The ‘Statement of Loans’ historic data file contains approximately 750,000 rows of data (~265 MB). Although not exactly ‘big data’, the historic dataset is large enough to sufficiently explore Dataproc. Both IBRD files have an identical schema with 33 columns of data (gist).

In this demonstration, both the Java and Python jobs will perform the same simple analysis of the larger historic dataset. For the analysis, we will ascertain the top 25 historic IBRD borrower, we will determine their total loan disbursements, current loan obligations, and the average interest rates they were charged for all loans. This simple analysis will be performed using Spark’s SQL capabilities. The results of the analysis, a Spark DataFrame containing 25 rows, will be saved as a CSV-format data file.

SELECT country, country_code,
       Format_number(total_disbursement, 0) AS total_disbursement,
       Format_number(total_obligation, 0) AS total_obligation,
       Format_number(avg_interest_rate, 2) AS avg_interest_rate
FROM   (SELECT country,
               country_code,
               Sum(disbursed) AS total_disbursement,
               Sum(obligation) AS total_obligation,
               Avg(interest_rate) AS avg_interest_rate
        FROM   loans
        GROUP  BY country, country_code
        ORDER  BY total_disbursement DESC
        LIMIT  25)

Google Cloud Storage

First, we need a location to store our Spark jobs, data files, and results, which will be accessible to Dataproc. Although there are a number of choices, the simplest and most convenient location for Dataproc is a Google Cloud Storage bucket. According to Google, Cloud Storage offers the highest level of availability and performance within a single region and is ideal for compute, analytics, and ML workloads in a particular region. Cloud Storage buckets are nearly identical to Amazon Simple Storage Service (Amazon S3), their object storage service.

Using the Google Cloud Console, Google’s Web Admin UI, create a new, uniquely named Cloud Storage bucket. Our Dataproc clusters will eventually be created in a single regional location. We need to ensure our new bucket is created in the same regional location as the clusters; I chose us-east1.

screen_shot_2018-12-04_at_7.04.45_pm

We will need the new bucket’s link, to use within the Java and Python code as well from the command line with gsutil. The gsutil tool is a Python application that lets you access Cloud Storage from the command line. The bucket’s link may be found on the Storage Browser Console’s Overview tab. A bucket’s link is always in the format, gs://bucket-name.

screen_shot_2018-12-04_at_7.06.06_pm

Alternatively, we may also create the Cloud Storage bucket using gsutil with the make buckets (mb) command, as follows:

# Always best practice since features are updated frequently
gcloud components update
  
export PROJECT=your_project_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
# Make sure you are creating resources in the correct project
gcloud config set project $PROJECT
  
gsutil mb -p $PROJECT -c regional -l $REGION $BUCKET_NAME

Cloud Dataproc Cluster

Next, we will create two different Cloud Dataproc clusters for demonstration purposes. If you have not used Cloud Dataproc previously in your GCP Project, you will first need to enable the API for Cloud Dataproc.

screen_shot_2018-12-04_at_7.15.05_pm

Single Node Cluster

We will start with a single node cluster with no worker nodes, suitable for development and testing Spark and Hadoop jobs, using small datasets. Create a single-node Dataproc cluster using the Single Node Cluster mode option. Create the cluster in the same region as the new Cloud Storage bucket. This will allow the Dataproc cluster access to the bucket without additional security or IAM configuration. I used the n1-standard-1 machine type, with 1 vCPU and 3.75 GB of memory. Observe the resources assigned to Hadoop YARN for Spark job scheduling and cluster resource management.

screen_shot_2018-12-04_at_7.19.37_pm

The new cluster, consisting of a single node and no worker nodes, should be ready for use in a few minutes or less.

screen_shot_2018-12-04_at_7.38.23_pm

Note the Image version, 1.3.16-deb9. According to Google, Dataproc uses image versions to bundle operating system, big data components, and Google Cloud Platform connectors into one package that is deployed on a cluster.  This image, released in November 2018, is the latest available version at the time of this post. The image contains:

  • Apache Spark 2.3.1
  • Apache Hadoop 2.9.0
  • Apache Pig 0.17.0
  • Apache Hive 2.3.2
  • Apache Tez 0.9.0
  • Cloud Storage connector 1.9.9-hadoop2
  • Scala 2.11.8
  • Python 2.7

To avoid lots of troubleshooting, make sure your code is compatible with the image’s versions. It is important to note the image does not contain a version of Python 3. You will need to ensure your Python code is built to run with Python 2.7. Alternatively, use Dataproc’s --initialization-actions flag along with bootstrap and setup shell scripts to install Python 3 on the cluster using pip or conda. Tips for installing Python 3 on Datapoc be found on Stack Overflow and elsewhere on the Internet.

As as an alternative to the Google Cloud Console, we are able to create the cluster using a REST command. Google provides the Google Cloud Console’s equivalent REST request, as shown in the example below.

screen_shot_2018-12-04_at_7.20.07_pm

Additionally, we have the option of using the gcloud command line tool. This tool provides the primary command-line interface to Google Cloud Platform and is part of Google’s Cloud SDK, which also includes the aforementioned gsutil. Here again, Google provides the Google Cloud Console’s equivalent gcloud command. This is a great way to learn to use the command line.

screen_shot_2018-12-04_at_7.20.21_pm

Using the dataproc clusters create command, we are able to create the same cluster as shown above from the command line, as follows:

export PROJECT=your_project_name
export CLUSTER_1=your_single_node_cluster_name 
export REGION=us-east1
export ZONE=us-east1-b
export MACHINE_TYPE_SMALL=n1-standard-1
  
gcloud dataproc clusters create $CLUSTER_1 \
  --region $REGION \
  --zone $ZONE \
  --single-node \
  --master-machine-type $MACHINE_TYPE_SMALL \
  --master-boot-disk-size 500 \
  --image-version 1.3-deb9 \
  --project $PROJECT

There are a few useful commands to inspect your running Dataproc clusters. The dataproc clusters describe command, in particular, provides detailed information about all aspects of the cluster’s configuration and current state.

gcloud dataproc clusters list --region $REGION

gcloud dataproc clusters describe $CLUSTER_2 \
  --region $REGION --format json

Standard Cluster

In addition to the single node cluster, we will create a second three-node Dataproc cluster. We will compare the speed of a single-node cluster to that of a true cluster with multiple worker nodes. Create a new Dataproc cluster using the Standard Cluster mode option. Again, make sure to create the cluster in the same region as the new Storage bucket.

screen_shot_2018-12-04_at_10.15.05_pm

The second cluster contains a single master node and two worker nodes. All three nodes use the n1-standard-4 machine type, with 4 vCPU and 15 GB of memory. Although still considered a minimally-sized cluster, this cluster represents a significant increase in compute power over the first single-node cluster, which had a total of 2 vCPU, 3.75 GB of memory, and no worker nodes on which to distribute processing. Between the two workers in the second cluster, we have 8 vCPU and 30 GB of memory for computation.

screen_shot_2018-12-04_at_10.18.54_pm

Again, we have the option of using the gcloud command line tool to create the cluster:

export PROJECT=your_project_name
export CLUSTER_2=your_three_node_cluster_name 
export REGION=us-east1
export ZONE=us-east1-b
export NUM_WORKERS=2
export MACHINE_TYPE_LARGE=n1-standard-4
  
gcloud dataproc clusters create $CLUSTER_2 \
  --region $REGION \
  --zone $ZONE \
  --master-machine-type $MACHINE_TYPE_LARGE \
  --master-boot-disk-size 500 \
  --num-workers $NUM_WORKERS \
  --worker-machine-type $MACHINE_TYPE_LARGE \
  --worker-boot-disk-size 500 \
  --image-version 1.3-deb9 \
  --project $PROJECT

Cluster Creation Speed: Cloud Dataproc versus Amazon EMS?

In a series of rather unscientific tests, I found the three-node Dataproc cluster took less than two minutes on average to be created. Compare that time to a similar three-node cluster built with Amazon’s EMR service using their general purpose m4.4xlarge Amazon EC2 instance type. In a similar series of tests, I found the EMR cluster took seven minutes on average to be created. The EMR cluster took 3.5 times longer to create than the comparable Dataproc cluster. Again, although not a totally accurate comparison, since both services offer different features, it gives you a sense of the speed of Dataproc as compared to Amazon EMR.

Staging Buckets

According to Google, when you create a cluster, Cloud Dataproc creates a Cloud Storage staging bucket in your project or reuses an existing Cloud Dataproc-created bucket from a previous cluster creation request. Staging buckets are used to stage miscellaneous configuration and control files that are needed by your cluster. Below, we see the staging buckets created for the two Dataproc clusters.

screen_shot_2018-12-04_at_10.26.49_pm

Project Files

Before uploading the jobs and running them on the Cloud Dataproc clusters, we need to understand what is included in the two GitHub projects. If you recall from the Kaggle section of the post, both projects are basically the same but, written in different languages, Java and Python. The jobs they contain all perform the same basic analysis on the dataset.

Java Project

The dataproc-java-demo Java-based GitHub project contains three classes, each which are jobs to run by Spark. The InternationalLoansApp Java class is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file (gist).

On line 20, the Spark Session’s Master URL, .master("local[*]"), directs Spark to run locally with as many worker threads as logical cores on the machine. There are several options for setting the Master URL, detailed here.

On line 30, the path to the data file, and on line 84, the output path for the data file, is a local relative file path.

On lines 38–42, we do a bit of clean up on the column names, for only those columns we are interested in for the analysis. Be warned, the column names of the IBRD data are less than ideal for SQL-based analysis, containing mixed-cased characters, word spaces, and brackets.

On line 79, we call Spark DataFrame’s repartition method, dfDisbursement.repartition(1). The repartition method allows us to recombine the results of our analysis and output a single CSV file to the bucket. Ordinarily, Spark splits the data into partitions and executes computations on the partitions in parallel. Each partition’s data is written to separate CSV files when a DataFrame is written back to the bucket.

Using coalesce(1) or repartition(1) to recombine the resulting 25-Row DataFrame on a single node is okay for the sake of this demonstration, but is not practical for recombining partitions from larger DataFrames. There are more efficient and less costly ways to manage the results of computations, depending on the intended use of the resulting data.

screen_shot_2018-12-05_at_4.04.24_pm

The InternationalLoansAppDataprocSmall class is intended to be run on the Dataproc clusters, analyzing the same smaller CSV data file. The InternationalLoansAppDataprocLarge class is also intended to be run on the Dataproc clusters, however, it analyzes the larger 750K rows of data in the IRBD historic CSV file (gist).

On line 20, note the Spark Session’s Master URL, .master(yarn), directs Spark to connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode when submitting the job. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable. Recall, the Dataproc cluster runs Spark on YARN.

Also, note on line 30, the path to the data file, and on line 63, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv"). Cloud Dataproc clusters automatically install the Cloud Storage connector. According to Google, there are a number of benefits to choosing Cloud Storage over traditional HDFS including data persistence, reliability, and performance.

These are the only two differences between the local version of the Spark job and the version of the Spark job intended for Dataproc. To build the project’s JAR file, which you will later upload to the Cloud Storage bucket, compile the Java project using the gradle build command from the root of the project. For convenience, the JAR file is also included in the GitHub repository.

screen_shot_2018-12-07_at_12.57.55_pm

Python Project

The dataproc-python-demo Python-based GitHub project contains two Python scripts to be run using PySpark for this post. The international_loans_local.py Python script is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file. It does a few different analysis with the smaller dataset. (gist).

Identical to the corresponding Java class, note on line 12, the Spark Session’s Master URL, .master("local[*]"), directs Spark to run locally with as many worker threads as logical cores on the machine.

Also identical to the corresponding Java class, note on line 26, the path to the data file, and on line 66, the output path for the resulting data file, is a local relative file path.

screen_shot_2018-12-05_at_4.02.50_pm

The international_loans_dataproc-large.py Python script is intended to be run on the Dataproc clusters, analyzing the larger 750K rows of data in the IRBD historic CSV file (gist).

On line 12, note the Spark Session’s Master URL, .master(yarn), directs Spark to connect to a YARN cluster.

Again, note on line 26, the path to the data file, and on line 59, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv").

These are the only two differences between the local version of the PySpark job and the version of the PySpark job intended for Dataproc. With Python, there is no pre-compilation necessary. We will upload the second script, directly.

Uploading Job Resources to Cloud Storage

In total, we need to upload four items to the new Cloud Storage bucket we created previously. The items include the two Kaggle IBRD CSV files, the compiled Java JAR file from the dataproc-java-demo project, and the Python script from the dataproc-python-demo project. Using the Google Cloud Console, upload the four files to the new Google Storage bucket, as shown below. Make sure you unzip the two Kaggle IRBD CSV data files before uploading.

screen_shot_2018-12-05_at_12.52.51_pm

Like before, we also have the option of using gsutil with the copy (cp) command to upload the four files. The cp command accepts wildcards, as shown below.

export PROJECT=your_project_name
export BUCKET_NAME=gs://your_bucket_name
  
gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME
gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME
gsutil cp international_loans_dataproc_large.py $BUCKET_NAME

If our Java or Python jobs were larger, or more complex and required multiple files to run, we could also choose to upload ZIP or other common compression formatted archives using the --archives flag.

Running Jobs on Dataproc

The easiest way to run a job on the Dataproc cluster is by submitting a job through the Dataproc Jobs UI, part of the Google Cloud Console.

screen_shot_2018-12-05_at_11.29.34_pm

Dataproc has the capability of running multiple types of jobs, including:

  • Hadoop
  • Spark
  • SparkR
  • PySpark
  • Hive
  • SparkSql
  • Pig

We will be running both Spark and PySpark jobs as part of this demonstration.

Spark Jobs

To run a Spark job using the JAR file, select Job type Spark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters in your chosen region. Run both jobs at least twice, once on both clusters, for a total of four jobs.

screen_shot_2018-12-05_at_12.57.55_pm

Lastly, you will need to input the main class and the path to the JAR file. The JAR location will be:

gs://your_bucket_name/dataprocJavaDemo-1.0-SNAPSHOT.jar

The main class for the smaller dataset will be:

org.example.dataproc.InternationalLoansAppDataprocSmall

The main class for the larger dataset will be:

org.example.dataproc.InternationalLoansAppDataprocLarge

During or after job execution, you may view details in the Output tab of the Dataproc Jobs console.

screen_shot_2018-12-04_at_7.53.27_pm

Like every other step in this demonstration, we can also use the gcloud command line tool, instead of the web console, to submit our Spark jobs to each cluster. Here, I am submitting the larger dataset Spark job to the three-node cluster.

export CLUSTER_2=your_three_node_cluster_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
gcloud dataproc jobs submit spark \
  --region $REGION \
  --cluster $CLUSTER_2 \
  --class org.example.dataproc.InternationalLoansAppDataprocLarge \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar \
  --async

PySpark Jobs

To run a Spark job using the Python script, select Job type PySpark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters. Run the job at least twice, once on both clusters.

screen_shot_2018-12-05_at_12.53.36_pm

Lastly, you will need to input the main Python file path. There is only one Dataproc Python script, which analyzes the larger dataset. The script location will be:

gs://your_bucket_name/international_loans_dataproc_large.py

Like every other step in this demonstration, we can also use the gcloud command line tool instead of the web console to submit our PySpark jobs to each cluster. Below, I am submitting the PySpark job to the three-node cluster.

export CLUSTER_2=your_three_node_cluster_name
export REGION=us-east1
export BUCKET_NAME=gs://your_bucket_name
  
gcloud dataproc jobs submit pyspark \
  $BUCKET_NAME/international_loans_dataproc_large.py \
  --region $REGION \
  --cluster $CLUSTER_2 \
  --async

Including the optional --async flag with any of the dataproc jobs submit command, the job will be sent to the Dataproc cluster and immediately release the terminal back to the user. If you do not to use the --async flag, the terminal will be unavailable until the job is finished.

However, without the flag, we will get the standard output (stdout) and standard error (stderr) from Dataproc. The output includes some useful information, including different stages of the job execution lifecycle and execution times.

screen_shot_2018-12-05_at_10.38.52_pm

File Output

During development and testing, outputting results to the console is useful. However, in Production, the output from jobs is most often written to Apache Parquet, Apache Avro, CSV, JSON, or XML format files, persisted Apache Hive, SQL, or NoSQL database, or streamed to another system for post-processing, using technologies such as Apache Kafka.

Once both the Java and Python jobs have run successfully on the Dataproc cluster, you should observe the results have been saved back to the Storage bucket. Each script saves its results to a single CSV file in separate directories, as shown below.

screen_shot_2018-12-05_at_4.09.31_pm.png

The final dataset, written to the CSV file, contains the results of the analysis results (gist).

Cleaning Up

When you are finished, make sure to delete your running clusters. This may be done through the Google Cloud Console. Deletion of the three-node cluster took, on average, slightly more than one minute.

screen_shot_2018-12-04_at_11.11.40_pm

As usual, we can also use the gcloud command line tool instead of the web console to delete the Dataproc clusters.

export CLUSTER_1=your_single_node_cluster_name
export CLUSTER_2=your_three_node_cluster_name 
export REGION=us-east1
  
yes | gcloud dataproc clusters delete $CLUSTER_1 --region $REGION
yes | gcloud dataproc clusters delete $CLUSTER_2 --region $REGION

Results

Some observations, based on approximately 75 successful jobs. First, both the Python job and the Java jobs ran in nearly the same amount of time on the single-node cluster and then on the three-node cluster. This is beneficial since, although, a lot of big data analysis is performed with Python, Java is still the lingua franca of many large enterprises.

screen_shot_2018-12-05_at_1.49.01_pm

Consecutive Execution

Below are the average times for running the larger dataset on both clusters, in Java, and in Python. The jobs were all run consecutively as opposed to concurrently. The best time was 59 seconds on the three-node cluster compared to the best time of 150 seconds on the single-node cluster, a difference of 256%. Given the differences in the two clusters, this large variation is expected. The average difference between the two clusters for running the large dataset was 254%.

chart2

Concurrent Execution

It is important to understand the impact of concurrently running multiple jobs on the same Dataproc cluster. To demonstrate this, both the Java and Python jobs were also run concurrently. In one such test, ten copies of the Python job were run concurrently on the three-node cluster.

concurrent-jobs

Observe that the execution times of the concurrent jobs increase in near-linear time. The first job completes in roughly the same time as the consecutively executed jobs, shown above, but each proceeding job’s execution time increases linearly.

chart1

According to Apache, when running on a cluster, each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application. Each application is given a maximum amount of resources it can use and holds onto them for its whole duration. Note no tuning was done to the Dataproc clusters to optimize for concurrent execution.

Really Big Data?

Although there is no exact definition of ‘big data’, 750K rows of data at 265 MB is probably not generally considered big data. Likewise, the three-node cluster used in this demonstration is still pretty diminutive. Lastly, the SQL query was less than complex. To really test the abilities of Dataproc would require a multi-gigabyte or multi-terabyte-sized dataset, divided amongst multiple files, computed on a much beefier cluster with more workers nodes and more computer resources.

Monitoring and Instrumentation

In addition to viewing the results of running and completed jobs, there are a number of additional monitoring resources, including the Hadoop Yarn Resource Manager, HDFS NameNode, and Spark History Server Web UIs, and Google Stackdriver. I will only briefly introduce these resources, and not examine any of these interfaces in detail. You’re welcome to investigate the resources for your own clusters. Apache lists other Spark monitoring and instrumentation resources in their documentation.

To access the Hadoop Yarn Resource Manager, HDFS NameNode, and Spark History Server Web UIs, you must create an SSH tunnel and run Chrome through a proxy. Google Dataproc provides both commands and a link to documentation in the Dataproc Cluster tab, to connect.

screen_shot_2018-12-15_at_9.09.00_pm

Hadoop Yarn Resource Manager Web UI

Once you are connected to the Dataproc cluster, via the SSH tunnel and proxy, the Hadoop Yarn Resource Manager Web UI is accessed on port 8088. The UI allows you to view all aspects of the YARN cluster and the distributed applications running on the YARN system.

screen_shot_2018-12-15_at_9.15.27_pm

HDFS NameNode Web UI

Once you are connected to the Dataproc cluster, via the SSH tunnel and proxy, the HDFS NameNode Web UI may is accessed on port 9870. According to the Hadoop Wiki, the NameNode is the centerpiece of an HDFS file system. It keeps the directory tree of all files in the file system, and tracks were across the cluster the file data is kept. It does not store the data of these files itself.

screen_shot_2018-12-15_at_9.41.19_pm

Spark History Server Web UI

We can view the details of all completed jobs using the Spark History Server Web UI. Once you are connected to the cluster, via the SSH tunnel and proxy, the Spark History Server Web UI is accessed on port 18080. Of all the methods of reviewing aspects of a completed Spark job, the History Server provides the most detailed.

screen_shot_2018-12-15_at_9.15.09_pm

Using the History Server UI, we can drill into fine-grained details of each job, including the event timeline.

screen_shot_2018-12-15_at_9.22.32_pm

Also, using the History Server UI, we can see a visualization of the Spark job’s DAG (Directed Acyclic Graph). DataBricks published an excellent post on learning how to interpret the information visualized in the Spark UI.

screen_shot_2018-12-15_at_9.19.15_pm

Not only can view the DAG and drill into each Stage of the DAG, from the UI.

screen_shot_2018-12-15_at_10.22.33_pm

Stackdriver

We can also enable Google Stackdriver for monitoring and management of services, containers, applications, and infrastructure. Stackdriver offers an impressive array of services, including debugging, error reporting, monitoring, alerting, tracing, logging, and dashboards, to mention only a few Stackdriver features.

screen_shot_2018-12-05_at_3.18.31_pm

There are dozens of metrics available, which collectively, reflect the health of the Dataproc clusters. Below we see the states of one such metric, the YARN virtual cores (vcores). A YARN vcore, introduced in Hadoop 2.4, is a usage share of a host CPU.  The number of YARN virtual cores is equivalent to the number of worker nodes (2) times the number of vCPUs per node (4), for a total of eight YARN virtual cores. Below, we see that at one point in time, 5 of the 8 vcores have been allocated, with 2 more available.

screen_shot_2018-12-05_at_3.29.47_pm

Next, we see the states of the YARN memory size. YARN memory size is calculated as the number of worker nodes (2) times the amount of memory on each node (15 GB) times the fraction given to YARN (0.8), for a total of 24 GB (2 x 15 GB x 0.8). Below, we see that at one point in time, 20 GB of RAM is allocated with 4 GB available. At that instant in time, the workload does not appear to be exhausting the cluster’s memory.

screen_shot_2018-12-05_at_3.30.39_pm

Notifications

Since no one actually watches dashboards all day, waiting for something to fail, how do know when we have an issue with Dataproc? Stackdrive offers integrations with most popular notification channels, including email, SMS, Slack, PagerDuty, HipChat, and Webhooks. With Stackdriver, we define a condition which describes when a service is considered unhealthy. When triggered, Stackdriver sends a notification to one or more channels.

notifications

Below is a preview of two alert notifications in Slack. I enabled Slack as a notification channel and created an alert which is triggered each time a Dataproc job fails. Whenever a job fails, such as the two examples below, I receive a Slack notification through the Slack Channel defined in Stackdriver.

slack.png

Slack notifications contain a link, which routes you back to Stackdriver, to an incident which was opened on your behalf, due to the job failure.

incident

For convenience, the incident also includes a pre-filtered link directly to the log entries at the time of the policy violation. Stackdriver logging offers advanced filtering capabilities to quickly find log entries, as shown below.screen_shot_2018-12-09_at_12.52.51_pm

With Stackdriver, you get monitoring, logging, alerting, notification, and incident management as a service, with minimal cost and upfront configuration. Think about how much time and effort it takes the average enterprise to achieve this level of infrastructure observability on their own, most never do.

Conclusion

In this post, we have seen the ease-of-use, extensive feature-set, out-of-the-box integration ability with other cloud services, low cost, and speed of Google Cloud Dataproc, to run big data analytics workloads. Couple this with the ability of Stackdriver to provide monitoring, logging, alerting, notification, and incident management for Dataproc with minimal up-front configuration. In my opinion, based on these features, Google Cloud Dataproc leads other cloud competitors for fully-managed Spark and Hadoop Cluster management.

In future posts, we will examine the use of Cloud Dataproc Workflow Templates for process automation, the integration capabilities of Dataproc with services such as BigQuery, Bigtable, Cloud Dataflow, and Google Cloud Pub/Sub, and finally, DevOps for Big Data with Dataproc and tools like Spinnaker and Jenkins on GKE.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, nor Apache or Google.

, , , , , , , , , , , ,

2 Comments

Getting Started with PySpark for Big Data Analytics using Jupyter Notebooks and Jupyter Docker Stacks

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives. Due to their popularity and potential benefits, academic institutions and commercial enterprises are rushing to train large numbers of Data Scientists and ML and AI Engineers.

google_terms2

Learning popular programming paradigms, such as Python, Scala, R, Apache Hadoop, Apache Spark, and Apache Kafka, requires the use of multiple complex technologies. Installing, configuring, and managing these technologies often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. These barriers may prove a deterrent to Students, Mathematicians, Statisticians, and Data Scientists.

google_terms3

Driven by the explosive growth of these technologies and the need to train individuals, many commercial enterprises are lowering the barriers to entry, making it easier to get started. The three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data-, AI- and ML-as-a-Service offerings.

Similarly, many open-source projects are also lowering the barriers to entry into these technologies. An excellent example of an open-source project working on this challenge is Project Jupyter. Similar to the Spark Notebook and Apache Zeppelin projects, Jupyter Notebooks enables data-driven, interactive, and collaborative data analytics with Julia, Scala, Python, R, and SQL.

This post will demonstrate the creation of a containerized development environment, using Jupyter Docker Stacks. The environment will be suited for learning and developing applications for Apache Spark, using the Python, Scala, and R programming languages. This post is not intended to be a tutorial on Spark, PySpark, or Jupyter Notebooks.

Featured Technologies

The following technologies are featured prominently in this post.

pyspark_article_00b_feature

Jupyter Notebooks

According to Project Jupyter, the Jupyter Notebook, formerly known as the IPython Notebook, is an open-source web application that allows users to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The word, Jupyter, is a loose acronym for Julia, Python, and R, but today, the Jupyter supports many programming languages. Interest in Jupyter Notebooks has grown dramatically.

google_terms4

Jupyter Docker Stacks

To enable quick and easy access to Jupyter Notebooks, Project Jupyter has created Jupyter Docker Stacks. The stacks are ready-to-run Docker images containing Jupyter applications, along with accompanying technologies. Currently, eight different Jupyter Docker Stacks focus on a particular area of practice. They include SciPy (Python-based mathematics, science, and engineering), TensorFlow, R Project for statistical computing, Data Science with Julia, and the main subject of this post, PySpark. The stacks also include a rich variety of well-known packages to extend their functionality, such as scikit-learn, pandas, MatplotlibBokeh, ipywidgets (interactive HTML widgets), and Facets.

Apache Spark

According to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.

Spark’s polyglot programming model allows users to write applications quickly in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). You can run Spark using its standalone cluster mode, on Amazon EC2Apache Hadoop YARNMesos, or Kubernetes.

PySpark

The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.

Docker

According to Docker, their technology developers and IT the freedom to build, manage and secure business-critical applications without the fear of technology or infrastructure lock-in. Although Kubernetes is now the leading open-source container orchestration platform, Docker is still the predominant underlying container engine technology. For this post, I am using Docker Desktop Community version for MacOS.

screen_shot_2019-06-09_at_7_41_12_am.png

Docker Swarm

Current versions of Docker include both a Kubernetes and Swarm orchestrator for deploying and managing containers. We will choose Swarm for this demonstration. According to Docker, Swarm is the cluster management and orchestration features embedded in the Docker Engine are built using swarmkit. Swarmkit is a separate project which implements Docker’s orchestration layer and is used directly within Docker.

PostgreSQL

PostgreSQL is a powerful, open source object-relational database system. According to their website, PostgreSQL comes with many features aimed to help developers build applications, administrators to protect data integrity and build fault-tolerant environments, and help manage data no matter how big or small the dataset.

Demonstration

To show the capabilities of the Jupyter development environment, I will demonstrate a few typical use cases, such as executing Python scripts, submitting PySpark jobs, working with Jupyter Notebooks, and reading and writing data to and from different format files and to a database. We will be using the jupyter/all-spark-notebook Docker Image. This image includes Python, R, and Scala support for Apache Spark, using Apache Toree.

Architecture

As shown below, we will stand-up a Docker stack, consisting of Jupyter All-Spark-Notebook, PostgreSQL 10.5, and Adminer containers. The Docker stack will have local directories bind-mounted into the containers. Files from our GitHub project will be shared with the Jupyter application container through a bind-mounted directory. Our PostgreSQL data will also be persisted through a bind-mounted directory. This allows us to persist data external to the ephemeral containers.

PySparkDocker.png

Source Code

All open-sourced code for this post can be found on GitHub. Use the following command to clone the project. The post and project code was updated on 6/7/2019.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/pyspark-setup-demo.git

Source code samples are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers.

Deploy Docker Stack

To start, create the $HOME/data/postgres directory to store PostgreSQL data files. This directory will be bind mounted into the PostgreSQL container on line 36 of the stack.yml file, $HOME/data/postgres:/var/lib/postgresql/data. The HOME environment variable assumes you are working on Linux or MacOS and is equivalent to HOMEPATH on Windows.

The Jupyter container’s working directory is set on line 10 of the stack.yml file, working_dir:/home/$USER/workThe local bind-mounted working directory is $PWD/work. This path is bind-mounted to the working directory in the Jupyter container, on line 24 of the stack.yml file, $PWD/work:/home/$USER/work. The PWD environment variable assumes you are working on Linux or MacOS (CD on Windows).

By default, the user within the Jupyter container is jovyan. Optionally, I have chosen to override that user with my own local host’s user account, as shown on line 16 of the stack.yml file, NB_USER: $USER. I have used the MacOS host’s USER environment variable value (equivalent to USERNAME on Windows). There are many options for configuring the Jupyter container, detailed here. Several of those options are shown on lines 12-18 of the stack.yml file (gist).

 

 

 

 

 

Assuming you have a recent version of Docker installed on your local development machine, and running in swarm mode, standing up the stack is as easy as running the following command from the root directory of the project:

docker stack deploy -c stack.yml pyspark

The Docker stack consists of a new overlay network, pyspark-net, and the three containers. To confirm the stack deployed, you can run the following command:

docker stack ps pyspark --no-trunc

pyspark_article_01_stack_deploy

Note the jupyter/all-spark-notebook container is quite large. Depending on your Internet connection, if this is the first time you have pulled this Docker image, the stack may take several minutes to enter a running state.

To access the Jupyter Notebook application, you need to obtain the Jupyter URL and access token (read more here). This information is output in the Jupyter container log, which can be accessed with the following command:

docker logs $(docker ps | grep pyspark_pyspark | awk '{print $NF}')

pyspark_article_02_pyspark_logs

Using the URL and token shown in the log output, you will be able to access the Jupyter web-based user interface on localhost port 8888. Once there, from Jupyter dashboard landing page, you should see all the files in the project’s work/ directory.

Also shown below, note the types of files you are able to create from the dashboard, including Python 3, R, Scala (using Apache Toree or spylon-kernal), and text. You can also open a Jupyter Terminal or create a new Folder.

pyspark_article_27_browser.png

Running Python Scripts

Instead of worrying about installing and maintaining the latest version of Python and packages on your own development machine, we can run our Python scripts from the Jupyter container. At the time of this post update, the latest jupyter/all-spark-notebook Docker Image runs Python 3.7.3 and Conda 4.6.14. Let’s start with a simple example of the Jupyter container’s capabilities by running a Python script. I have included a sample Python script, 01_simple_script.py.

 

 

 

Run the script from within the Jupyter container, from a Jupyter Terminal window:

python ./01_simple_script.py

You should observe the following output.
pyspark_article_08_simple_script

Kaggle Datasets

To explore the features of the Jupyter Notebook container and PySpark, we will use a publically-available dataset from Kaggle. Kaggle is a fantastic open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.

For this demonstration, I chose the ‘Transactions from a Bakery’ dataset from Kaggle.

pyspark_article_03_kaggle

The dataset contains 21,294 rows, each with four columns of data. Although certainly nowhere near ‘big data’, the dataset is large enough to test out the Jupyter container functionality (gist).

 

 

Submitting Spark Jobs

We are not limited to Jupyter Notebooks to interact with Spark, we can also submit scripts directly to Spark from a Jupyter Terminal, or from our IDE. I have included a simple Python script, 02_bakery_dataframes.py. The script loads the Kaggle Bakery dataset from the CSV file into a Spark DataFrame. The script then prints out the top ten rows of data, along with a count of the total number of rows in the DataFrame.

 

Run the script directly from a Jupyter Terminal window:

python ./02_bakery_dataframes.py

An example of the output of the Spark job is shown below. At the time of this post update (6/7/2019), the latest jupyter/all-spark-notebook:latest Docker Image runs Spark 2.4.3, Scala 2.11.12, and Java 1.8.0_191 using the OpenJDK.
pyspark_article_09_simple_spark

More typically, you would submit the Spark job, using the spark-submit command. Use a Jupyter Terminal window to run the following command:

$SPARK_HOME/bin/spark-submit 02_bakery_dataframes.py

Below, we see the beginning of the output from Spark, using the spark-submit command.
pyspark_article_09B1_spark_submit

Below, we see the scheduled tasks executing and the output of the print statement, displaying the top 10 rows of bakery data.

Interacting with Databases

Often with Spark, you are loading data from one or more data sources (input). After performing operations and transformations on the data, the data is persisted or conveyed to another system for further processing (output).

To demonstrate the flexibility of the Jupyter Docker Stacks to work with databases, I have added PostgreSQL to the Docker Stack. We can read and write data from the Jupyter container to the PostgreSQL instance, running in a separate container.

To begin, we will run a SQL script, written in Python, to create our database schema and some test data in a new database table. To do so, we will need to install the psycopg2 package into our Jupyter container. You can use the docker exec command from your terminal. Alternatively, as a superuser, your user has administrative access to install Python packages within the Jupyter container using the Jupyter Terminal window. Both pip and conda are available to install packages, see details here.

Run the following command to install psycopg2:

# using pip
docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  pip install psycopg2-binary

These packages give Python the ability to interact with the PostgreSQL. The included Python script, 03_load_sql.py, will execute a set of SQL statements, contained in a SQL file, bakery_sample.sql, against the PostgreSQL container instance.

 

 

 

 

To execute the script, run the following command:

python ./03_load_sql.py

This should result in the following output, if successful.
pyspark_article_10_run_sql_py

To confirm the SQL script’s success, I have included Adminer. Adminer (formerly phpMinAdmin) is a full-featured database management tool written in PHP. Adminer natively recognizes PostgreSQL, MySQL, SQLite, and MongoDB, among other database engines.

Adminer should be available on localhost port 8080. The password credentials, shown below, are available in the stack.yml file. The server name, postgres, is the name of the PostgreSQL container. This is the domain name the Jupyter container will use to communicate with the PostgreSQL container.
pyspark_article_06_adminer_login

Connecting to the demo database with Adminer, we should see the bakery_basket table. The table should contain three rows of data, as shown below.
pyspark_article_07_bakery_data

Developing Jupyter NoteBooks

The true power of the Jupyter Docker Stacks containers is Jupyter Notebooks. According to the Jupyter Project, the notebook extends the console-based approach to interactive computing in a qualitatively new direction, providing a web-based application suitable for capturing the whole computation process: developing, documenting, and executing code, as well as communicating the results. Notebook documents contain the inputs and outputs of an interactive session as well as additional text that accompanies the code but is not meant for execution.

To see the power of Jupyter Notebooks, I have written a basic notebook document, 04_pyspark_demo_notebook.ipynb. The document performs some typical PySpark functions, such as loading data from a CSV file and from the PostgreSQL database, performing some basic data analytics with Spark SQL, graphing the data using BokehJS, and finally, saving data back to the database, as well as to the popular Apache Parquet file format. Below we see the notebook document, using the Jupyter Notebook user interface.

pyspark_article_11_notebook.png

PostgreSQL Driver

The only notebook document dependency, not natively part of the Jupyter Image, is the PostgreSQL JDBC driver. The driver, postgresql-42.2.5.jar, is included in the project and referenced in the configuration of the notebook’s Spark Session. The JAR is added to the spark.driver.extraClassPath runtime environment property. This ensures the JAR is available to Spark (written in Scala) when the job is run.

PyCharm

Since the working directory for the project is shared with the container, you can also edit files, including notebook documents, in your favorite IDE, such as JetBrains PyCharm. PyCharm has built-in language support for Jupyter Notebooks, as shown below.
pyspark_article_11_notebook_pycharm.png

As mentioned earlier, a key feature of Jupyter Notebooks is their ability to save the output from each Cell as part of the notebook document. Below, we see the notebook document on GitHub. The output is saved, as part of the notebook document. Not only can you distribute the notebook document, but you can also preserve and share the output from each cell.
pyspark_article_17_github

Using Additional Packages

As mentioned in the Introduction, the Jupyter Docker Stacks come ready-to-run, with a rich variety of Python packages to extend their functionality.  To demonstrate the use of these packages, I have created a second Jupyter notebook document, 05_pyspark_demo_notebook.ipynb. This notebook document uses SciPy (Python-based mathematics, science, and engineering), NumPy (Python-based scientific computing), and the Plotly Python Graphing Library. While NumPy and SciPy are included on the Jupyter Docker Image, the Notebook used pip to install Plotly. Similar to Bokeh, shown previously, we can combine these libraries to create richly interactive data visualizations. To use Plotly, you will need to sign up for a free account and obtain a username and API key.

Shown below, we use Plotly to construct a bar chart of daily bakery items sold for the year 2017 based on the Kaggle dataset. The chart uses SciPy and NumPy to construct a linear fit (regression) and plot a line of best fit for the bakery data. The chart also uses SciPy’s Savitzky-Golay Filter to plot the second line, illustrating a smoothing of our bakery data.

pyspark_article_23a_plotly

Plotly also provides Chart Studio Online Chart Maker. Plotly describes Chart Studio as the world’s most sophisticated editor for creating d3.js and WebGL charts. Shown below, we have the ability to enhance, stylize, and share our bakery data visualization using the free version of Chart Studio Cloud.

pyspark_article_23b_plotly

nbviewer

Notebooks can also be viewed using Jupyter nbviewer, hosted on Rackspace. Below, we see the output of a cell from this project’s notebook document, showing a BokehJS chart, using nbviewer. You can view this project’s actual notebook document, using nbviewer, here.

pyspark_article_26_nbviewer.png

Monitoring Spark Jobs

The Jupyter Docker container exposes Spark’s monitoring and instrumentation web user interface. We can observe each Spark Job in great detail.
pyspark_article_12_spark_jobs

We can review details of each stage of the Spark job, including a visualization of the DAG, which Spark constructs as part of the job execution plan, using the DAG Scheduler.
pyspark_article_12_spark_dag

We can also review the timing of each event, occurring as part of the stages of the Spark job.
pyspark_article_12_timeline

We can also use the Spark interface to review and confirm the runtime environment, including versions of Java, Scala, and Spark, as well as packages available on the Java classpath.
pyspark_article_13_enviornment

Spark Performance

Spark, running on a single node within the Jupyter container, on your development system, is not a substitute for a full Spark cluster, running on bare metal or robust virtualized hardware, with YARN, Mesos, or Kubernetes. In my opinion, you should adjust Docker to support an acceptable performance profile for the stack, running only a modest workload. You are not trying to replace the need to run real jobs on a Production Spark cluster.
screen_shot_2019-06-07_at_4_50_25_pm

We can use the docker stats command to examine the container’s CPU and memory metrics:

docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"

Below, we see the stats from the stack’s three containers immediately after being deployed, showing little or no activity. Here, Docker has been allocated 2 CPUs, 3GB of RAM, and 2 GB of swap space available, from the host machine.
pyspark_article_16a_perf

Compare the stats above with the same three containers, while the example notebook document is running on Spark. The CPU shows a spike, but memory usage appears to be within acceptable ranges.
pyspark_article_16b_perf

Linux top and htop

Another option to examine container performance metrics is with top. We can use the docker exec command to execute the top command within the Jupyter container, sorting processes by CPU usage:

docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  top -o %CPU

With top, we can observe the individual performance of each processes running in the Jupyter container.

pyspark_article_20_top.png

Lastly, htop, an interactive process viewer for Unix, can be installed into the container and ran with the following set of bash commands, from a Jupyter Terminal window or using docker exec:

docker exec -it \
  $(docker ps | grep pyspark_pyspark | awk '{print $NF}') \
  sh -c "apt-get update && apt-get install htop && htop --sort-key PERCENT_CPU"

With htop, we can observe individual CPU activity. The two CPUs at the top left of the htop window are the two CPUs assigned to Docker. We get insight into the way Docker is using each CPU, as well as other basic performance metrics, like memory and swap.

pyspark_article_16f_htop.png

Assuming your development machine host has them available, it is easy to allocate more compute resources to Docker if required. However, in my opinion, this stack is optimized for development and learning, using reasonably sized datasets for data analysis and ML. It should not be necessary to allocate excessive resources to Docker, possibly starving your host machine own compute capabilities.
screen_shot_2019-06-07_at_4_50_45_pm

Conclusion

In this brief post, we have seen how easy it is to get started learning and developing applications for big data analytics, using Python, Spark, and PySpark, thanks to the Jupyter Docker Stacks. We could use the same stack to learn and develop for machine learning, using Python, Scala, and R. Extending the stack’s capabilities is as simple as swapping out this Jupyter image for another, with a different set of tools, as well as adding additional containers to the stack, such as Apache Kafka or Apache Cassandra.

Search results courtesy GoogleTrends (https://trends.google.com)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients.

, , , , , , , , , , , ,

Leave a comment