Archive for category Python

Event-driven, Serverless Architectures with AWS Lambda, SQS, DynamoDB, and API Gateway

Introduction

In this post, we will explore modern application development using an event-driven, serverless architecture on AWS. To demonstrate this architecture, we will integrate several fully-managed services, all part of the AWS Serverless Computing platform, including Lambda, API Gateway, SQS, S3, and DynamoDB. The result will be an application composed of small, easily deployable, loosely coupled, independently scalable, serverless components.

What is ‘Event-Driven’?

According to Otavio Ferreira, Manager, Amazon SNS, and James Hood, Senior Software Development Engineer, in their AWS Compute Blog, Enriching Event-Driven Architectures with AWS Event Fork Pipelines, “Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.” This description of an event-driven architecture perfectly captures the essence of the following post. All interactions between application components in this post will be as a direct result of triggering an event.

What is ‘Serverless’?

Mistakingly, many of us think of serverless as just functions (aka Function-as-a-Service or FaaS). When it comes to functions on AWS, Lambda is just one of many fully-managed services that make up the AWS Serverless Computing platform. So, what is ‘serverless’? According to AWS, “Serverless applications don’t require provisioning, maintaining, and administering servers for backend components such as compute, databases, storage, stream processing, message queueing, and more.

As a Developer, one of my favorite features of serverless is the cost, or lack thereof. With serverless on AWS, you pay for consistent throughput or execution duration rather than by server unit, and, at least on AWS, you don’t pay for idle resources. This is not always true of ‘serverless’ offerings on other leading Cloud platforms. Remember, if you’re paying for it but not using it, it’s not serverless.

If you’re paying for it but not using it, it’s not serverless.

Demonstration

To demonstrate an event-driven, serverless architecture, we will build, package, and deploy an application capable of extracting messages from CSV files placed in S3, transforming those messages, queueing them to SQS, and finally, writing the messages to DynamoDB, using Lambda functions throughout. We will also expose a RESTful API, via API Gateway, to perform CRUD-like operations on those messages in DynamoDB.

AWS Technologies

In this demonstration, we will use several AWS serverless services, including the following.

Each Lambda will use function-specific execution roles, part of AWS Identity and Access Management (IAM). We will log the event details and monitor services using Amazon CloudWatch.

To codify, build, package, deploy, and manage the Lambda functions and other AWS resources in a fully automated fashion, we will also use the following AWS services:

Architecture

The high-level architecture for the platform provisioned and deployed in this post is illustrated in the diagram below. There are two separate workflows. In the first workflow (top), data is extracted from CSV files placed in S3, transformed, queued to SQS, and written to DynamoDB, using Python-based Lambda functions throughout. In the second workflow (bottom), data is manipulated in DynamoDB through interactions with a RESTful API, exposed via an API Gateway, and backed by Node.js-based Lambda functions.

new-01-sqs-dynamodb

Using the vast array of current AWS services, there are several ways we could extract, transform, and load data from static files into DynamoDB. The demonstration’s event-driven, serverless architecture represents just one possible approach.

Source Code

All source code for this post is available on GitHub in a single public repository, serverless-sqs-dynamo-demo. To clone the GitHub repository, execute the following command.

git clone --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/serverless-sqs-dynamo-demo.git

The project files relevant to this demonstration are organized as follows.

.
├── README.md
├── lambda_apigtw_to_dynamodb
│   ├── app.js
│   ├── events
│   ├── node_modules
│   ├── package.json
│   └── tests
├── lambda_s3_to_sqs
│   ├── __init__.py
│   ├── app.py
│   ├── requirements.txt
│   └── tests
├── lambda_sqs_to_dynamodb
│   ├── __init__.py
│   ├── app.py
│   ├── requirements.txt
│   └── tests
├── requirements.txt
├── template.yaml
└── sample_data
    ├── data.csv
    ├── data_bad_msg.csv
    └── data_good_msg.csv

Some source code samples in this post are GitHub Gists, which may not display correctly on all social media browsers, such as LinkedIn.

Prerequisites

The demonstration assumes you already have an AWS account. You will need the latest copy of the AWS CLI, SAM CLI, and Python 3 installed on your development machine.

Additionally, you will need two existing S3 buckets. One bucket will be used to store the packaged project files for deployment. The second bucket is where we will place CSV data files, which, in turn, will trigger events that invoke multiple Lambda functions.

Deploying the Project

Before diving into the code, we will deploy the project to AWS. Conveniently, the entire project’s resources are codified in an AWS SAM template. We are using the AWS Serverless Application Model (SAM). AWS SAM is a model used to define serverless applications on AWS. According to the official SAM GitHub project documentation, AWS SAM is based on AWS CloudFormation. A serverless application is defined in a CloudFormation template and deployed as a CloudFormation stack.

Template Parameter

CloudFormation will create and uniquely name the SQS queues and the DynamoDB table. However, to avoid circular references, a common issue when creating resources associated with S3 event notifications, it is easier to use a pre-existing bucket. To start, you will need to change the SAM template’s DataBucketName parameter’s default value to your own S3 bucket name. Again, this bucket is where we will eventually push the CSV data files. Alternately, override the default values using the sam build command, next.

Parameters:
  DataBucketName:
    Type: String
    Description: S3 bucket where CSV files are processed
    Default: your-data-bucket-name

SAM CLI Commands

With the DataBucketName parameter set, proceed to validate, build, package, and deploy the project using the SAM CLI and the commands below. In addition to the sam validate command, I also like to use the aws cloudformation validate-template command to validate templates and catch any potential, additional errors.

Note the S3_BUCKET_BUILD variable, below, refers to the name of the S3 bucket SAM will use package and deploy the project from, as opposed to the S3 bucket, which the CSV data files will be placed into (gist).

After validating the template, SAM will build and package each individual Lambda function and its associated dependencies. Below, we see each individual Lambda function being packaged with a copy of its dependencies.

screen_shot_2019-09-30_at_8_42_41_pm

Once packaged, SAM will deploy the project and create the AWS resources as a CloudFormation stack.

screen_shot_2019-09-30_at_8_43_11_pm

Once the stack creation is complete, use the CloudFormation management console to review the AWS resources created by SAM. There are approximately 14 resources defined in the SAM template, which result in 33 individual resources deployed as part of the CloudFormation stack.

screen_shot_2019-09-30_at_8_44_46_pm

Note the stack’s output values. You will need these values to interact with the deployed platform, later in the demonstration.

screen_shot_2019-09-30_at_8_45_13_pm

Test the Deployed Application

Once the CloudFormation stack has deployed without error, copying a CSV file to the S3 bucket is the quickest way to confirm everything is working. The project includes test data files with 20 rows of test message data. Below is a sample of the CSV file, which is included in the project. The data was collected from IoT devices that measured response time from wired versus wireless devices on a LAN network; the message details are immaterial to this demonstration (gist).

Run the following commands to copy the test data file to your S3 bucket.

S3_DATA_BUCKET=your_data_bucket_name
aws s3 cp sample_data/data.csv s3://$S3_DATA_BUCKET

Visit the DynamoDB management console. You should observe a new DynamoDB table.

screen_shot_2019-09-30_at_8_47_10_pm

Within the new DynamoDB table, you should observe twenty items, corresponding to each of the twenty rows in the CSV file, uploaded to S3.

screen_shot_2019-09-30_at_8_51_07_pm

Drill into an individual item within the table and review its attributes. They should match the rows in the CSV file.

screen_shot_2019-09-30_at_8_51_54_pm

Both the Python- and Node.js-based Lambda functions have their default logging levels set to debug. The debug-level output from each Lambda function is streamed to individual Amazon CloudWatch Log Groups. We can use the CloudWatch logs to troubleshoot any issues with the deployed application. Below we see an example of CloudWatch log entries for the request and response payloads generated from GetMessageFunction Lambda function, which is querying DynamoDB for a single Item.

screen_shot_2019-10-03_at_9_16_22_pm

Event-Driven Patterns

There are three distinct and discrete event-driven dataflows within the demonstration’s architecture

  1. S3 Event Source for Lambda (S3 to SQS)
  2. SQS Event Source for Lambda (SQS to DynamoDB)
  3. API Gateway Event Source for Lambda (API Gateway to DynamoDB)

Let’s examine each event-driven dataflow and the Lambda code associated with that part of the architecture.

S3 Event Source for Lambda

Whenever a file is copied into the target S3 bucket, an S3 Event Notification triggers an asynchronous invocation of a Lambda. According to AWS, when you invoke a function asynchronously, the Lambda sends the event to the SQS queue. A separate process reads events from the queue and executes your Lambda function.

new-02-sqs-dynamodb

The Lambda’s function handler, written in Python, reads the CSV file, whose filename is contained in the event. The Lambda extracts the rows in the CSV file, transforms the data, and pushes each message to the SQS queue (gist).

Below is an example of a message body, part an SQS message, extracted from a single row of the CSV file, and sent by the Lambda to the SQS queue. The timestamp has been converted to separate date and time fields by the Lambda. The DynamoDB table is part of the SQS message body. The key/value pairs in the Item JSON object reflect the schema of the DynamoDB table (gist).

SQS Event Source for Lambda

According to AWS, SQS offers two types of message queues, Standard and FIFO (First-In-First-Out). An SQS FIFO queue is designed to guarantee that messages are processed exactly once, in the exact order that they are sent. A Standard SQS queue offers maximum throughput, best-effort ordering, and at-least-once delivery.

Examining the SQS management console, you should observe that the CloudFormation stack creates two SQS Standard queues—a primary queue and a Dead Letter Queue (DLQ). According to AWS, Amazon SQS supports dead-letter queues, which other queues (source queues) can target for messages that cannot be processed (consumed) successfully.

screen_shot_2019-09-30_at_8_55_33_pm

Examining the SQS Lambda Triggers tab, you should observe the Lambda, which will be triggered by the SQS events.

screen_shot_2019-09-30_at_8_56_25_pm

When a message is pushed into the SQS queue by the previous process, an SQS event is fired, which synchronously triggers an invocation of the Lambda using the SQS Event Source for Lambda functionality. When a function is invoked synchronously, Lambda runs the function and waits for a response.

new-03-sqs-dynamodb

In the demonstration, the Lambda’s function handler, also written in Python, pulls the message off of the SQS queue and writes the message (DynamoDB put) to the DynamoDB table. Although writing is the primary use case in this demonstration, an event could also trigger a get, scan, update, or delete command to be executed on the DynamoDB table (gist).

API Gateway Event Source for Lambda

Examining the API Gateway management console, you should observe that CloudFormation created a new Edge-optimized API. The API contains several resources and their associated HTTP methods.

screen_shot_2019-09-30_at_9_02_52_pm

Each API resource is associated with a deployed Lambda function. Switching to the Lambda console, you should observe a total of seven new Lambda functions. There are five Lambda functions related to the API, in addition to the Lambda called by the S3 event notifications and the Lambda called by the SQS event notifications.

screen_shot_2019-09-30_at_8_46_24_pm

Examining one of the Lambda functions associated with the API Gateway, we should observe that the API Gateway trigger for the Lambda (lower left and bottom).

screen_shot_2019-09-30_at_8_59_39_pm

When an end-user makes an HTTP(S) request via the RESTful API exposed by the API Gateway, an event is fired, which synchronously invokes a Lambda using the API Gateway Event Source for Lambda functionality. The event contains details about the HTTP request that is received. The event triggers any one of five different Lambda functions, depending on the HTTP request method.

new-04-sqs-dynamodb

The Lambda code, written in Node.js, contains five function handlers. Each handler corresponds to an HTTP method, including GET (DynamoDB get) POST (put), PUT (update), DELETE (delete), and SCAN (scan). Below is an example of the getMessage handler function. The function accepts two inputs. First, a path parameter, the date, which is the primary partition key for the DynamoDB table. Second, a query parameter, the time, which is the primary sort key for the DynamoDB table. Both the primary partition key and sort key must be passed to DynamoDB to retrieve the requested record (gist).

Test the API

To test the Lambda functions, called by our API, we can use the sam local invoke command, part of the SAM CLI. Using this command, we can test the local Lambda functions, without them being deployed to AWS. The command allows us to trigger events, which the Lambda functions will handle. This is useful as we continue to develop, test, and re-deploy the Lambda functions to our Development, Staging, and Production environments.

The local Node.js-based, API-related Lambda functions, just like their deployed copies, will execute commands against the actual DynamoDB on AWS. The Github project contains a set of five sample events, corresponding to the five Lambda functions, which in turn are associated with five different HTTP methods and API resources. For example, the event_getMessage.json event is associated with the GET HTTP method and calls the /message/{date}?time={time} resource endpoint, to return a single item. This event, shown below, triggers the GetMessageFunction Lambda (gist).

We can trigger all the events from using the CLI. The local Lambda expects the DynamoDB table name to exist as an environment variable. Make sure you set it locally, first, before executing the sam local invoke commands (gist).

If the events were successfully handled by the local Lambda functions, in the terminal, you should see the same HTTP response status codes you would expect from calling the RESTful resources via the API Gateway. Below, for example, we see the POST event being handled by the PostMessageFunction Lambda, adding a record to the DynamoDB table, and returning a successful status of 201 Created.

screen_shot_2019-10-04_at_2_50_41_pm.png

Testing the Deployed API

To test the actual deployed API, we can call one of the API’s resources using an HTTP client, such as Postman. To locate the URL used to invoke the API resource, look at the ‘Prod’ Stage for the new API. This can be found in the Stages tab of the API Gateway console. For example, note the Invoke URL for the POST HTTP method of the /message resource, shown below.

screen_shot_2019-10-04_at_3_02_21_pm

Below, we see an example of using Postman to make an HTTP GET request the /message/{date}?time={time} resource. We pass the required query and path parameters for date and for time. The request should receive a single item in response from DynamoDB, via the API Gateway and the associated Lambda. Here, the request was successful, and the Lambda function returns a 200 OK status.

screen_shot_2019-09-30_at_9_20_45_pm

Similarly, below, we see an example of calling the same /message endpoint using the HTTP POST method. In the body of the POST request, we pass the DynamoDB table name and the Item object. Again, the POST is successful, and the Lambda function returns a 201 Created status.

screen_shot_2019-10-03_at_10_05_31_pm

Cleaning Up

To complete the demonstration and remove the AWS resources, run the following commands. It is necessary to delete all objects from the S3 data bucket, first, before deleting the CloudFormation stack. Else, the stack deletion will fail.

S3_DATA_BUCKET=your_data_bucket_name
STACK_NAME=your_stack_name

aws s3 rm s3://$S3_DATA_BUCKET/data.csv # and any other objects

aws cloudformation delete-stack \
  --stack-name $STACK_NAME

Conclusion

In this post, we explored a simple example of building a modern application using an event-driven serverless architecture on AWS. We used several services, all part of the AWS Serverless Computing platform, including Lambda, API Gateway, SQS, S3, and DynamoDB. In addition to these, AWS has additional serverless services, which could enhance this demonstration, in particular, Amazon Kinesis, AWS Step Functions, Amazon SNS, and AWS AppSync.

In a future post, we will look at how to further test the individual components within this demonstration’s application stack, and how to automate its deployment using DevOps and CI/CD principals on AWS.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

Leave a comment

Getting Started with PostgreSQL using Amazon RDS, CloudFormation, pgAdmin, and Python

Introduction

In the following post, we will explore how to get started with Amazon Relational Database Service (RDS) for PostgreSQL. CloudFormation will be used to build a PostgreSQL master database instance and a single read replica in a new VPC. AWS Systems Manager Parameter Store will be used to store our CloudFormation configuration values. Amazon RDS Event Notifications will send text messages to our mobile device to let us know when the RDS instances are ready for use. Once running, we will examine a variety of methods to interact with our database instances, including pgAdmin, Adminer, and Python.

Technologies

The primary technologies used in this post include the following.

PostgreSQL

Image result for postgres logoAccording to its website, PostgreSQL, commonly known as Postgres, is the world’s most advanced Open Source relational database. Originating at UC Berkeley in 1986, PostgreSQL has more than 30 years of active core development. PostgreSQL has earned a strong reputation for its proven architecture, reliability, data integrity, robust feature set, extensibility. PostgreSQL runs on all major operating systems and has been ACID-compliant since 2001.

Amazon RDS for PostgreSQL

Image result for amazon rds logoAccording to Amazon, Amazon Relational Database Service (RDS) provides six familiar database engines to choose from, including Amazon Aurora, PostgreSQL, MySQL, MariaDB, Oracle Database, and SQL Server. RDS is available on several database instance types - optimized for memory, performance, or I/O.

Amazon RDS for PostgreSQL makes it easy to set up, operate, and scale PostgreSQL deployments in the cloud. Amazon RDS supports the latest PostgreSQL version 11, which includes several enhancements to performance, robustness, transaction management, query parallelism, and more.

AWS CloudFormation

Deployment__Management_copy_AWS_CloudFormation-512

According to Amazon, CloudFormation provides a common language to describe and provision all the infrastructure resources within AWS-based cloud environments. CloudFormation allows you to use a JSON- or YAML-based template to model and provision all the resources needed for your applications across all AWS regions and accounts, in an automated and secure manner.

Demonstration

Architecture

Below, we see an architectural representation of what will be built in the demonstration. This is not a typical three-tier AWS architecture, wherein the RDS instances would be placed in private subnets (data tier) and accessible only by the application tier, running on AWS. The architecture for the demonstration is designed for interacting with RDS through external database clients such as pgAdmin, and applications like our local Python scripts, detailed later in the post.

RDS AWS Arch Diagram

Source Code

All source code for this post is available on GitHub in a single public repository, postgres-rds-demo.

.
├── LICENSE.md
├── README.md
├── cfn-templates
│   ├── event.template
│   ├── rds.template
├── parameter_store_values.sh
├── python-scripts
│   ├── create_pagila_data.py
│   ├── database.ini
│   ├── db_config.py
│   ├── query_postgres.py
│   ├── requirements.txt
│   └── unit_tests.py
├── sql-scripts
│   ├── pagila-insert-data.sql
│   └── pagila-schema.sql
└── stack.yml

To clone the GitHub repository, execute the following command.

git clone --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/aws-rds-postgres.git

Prerequisites

For this demonstration, I will assume you already have an AWS account. Further, that you have the latest copy of the AWS CLI and Python 3 installed on your development machine. Optionally, for pgAdmin and Adminer, you will also need to have Docker installed.

Steps

In this demonstration, we will perform the following steps.

  • Put CloudFormation configuration values in Parameter Store;
  • Execute CloudFormation templates to create AWS resources;
  • Execute SQL scripts using Python to populate the new database with sample data;
  • Configure pgAdmin and Python connections to RDS PostgreSQL instances;

AWS Systems Manager Parameter Store

With AWS, it is typical to use services like AWS Systems Manager Parameter Store and AWS Secrets Manager to store overt, sensitive, and secret configuration values. These values are utilized by your code, or from AWS services like CloudFormation. Parameter Store allows us to follow the proper twelve-factor, cloud-native practice of separating configuration from code.

To demonstrate the use of Parameter Store, we will place a few of our CloudFormation configuration items into Parameter Store. The demonstration’s GitHub repository includes a shell script, parameter_store_values.sh, which will put the necessary parameters into Parameter Store.

Below, we see several of the demo’s configuration values, which have been put into Parameter Store.

screen_shot_2019-08-08_at_7_00_17_pm

SecureString

Whereas our other parameters are stored in Parameter Store as String datatypes, the database’s master user password is stored as a SecureString data-type. Parameter Store uses an AWS Key Management Service (KMS) customer master key (CMK) to encrypt the SecureString parameter value.

screen_shot_2019-08-08_at_7_01_15_pm

SMS Text Alert Option

Before running the Parameter Store script, you will need to change the /rds_demo/alert_phone parameter value in the script (shown below) to your mobile device number, including country code, such as ‘+12038675309’. Amazon SNS will use it to send SMS messages, using Amazon RDS Event Notification. If you don’t want to use this messaging feature, simply ignore this parameter and do not execute the event.template CloudFormation template in the proceeding step.

aws ssm put-parameter \
  --name /rds_demo/alert_phone \
  --type String \
  --value "your_phone_number_here" \
  --description "RDS alert SMS phone number" \
  --overwrite

Run the following command to execute the shell script, parameter_store_values.sh, which will put the necessary parameters into Parameter Store.

sh ./parameter_store_values.sh

CloudFormation Templates

The GitHub repository includes two CloudFormation templates, cfn-templates/event.template and cfn-templates/rds.template. This event template contains two resources, which are an AWS SNS Topic and an AWS RDS Event Subscription. The RDS template also includes several resources, including a VPC, Internet Gateway, VPC security group, two public subnets, one RDS master database instance, and an AWS RDS Read Replica database instance.

The resources are split into two CloudFormation templates so we can create the notification resources, first, independently of creating or deleting the RDS instances. This will ensure we get all our SMS alerts about both the creation and deletion of the databases.

Template Parameters

The two CloudFormation templates contain a total of approximately fifteen parameters. For most, you can use the default values I have set or chose to override them. Four of the parameters will be fulfilled from Parameter Store. Of these, the master database password is treated slightly differently because it is secure (encrypted in Parameter Store). Below is a snippet of the template showing both types of parameters. The last two are fulfilled from Parameter Store.

DBInstanceClass:
  Type: String
  Default: "db.t3.small"
DBStorageType:
  Type: String
  Default: "gp2"
DBUser:
  Type: String
  Default: "{{resolve:ssm:/rds_demo/master_username:1}}"
DBPassword:
  Type: String
  Default: "{{resolve:ssm-secure:/rds_demo/master_password:1}}"
  NoEcho: True

Choosing the default CloudFormation parameter values will result in two minimally-configured RDS instances running the PostgreSQL 11.4 database engine on a db.t3.small instance with 10 GiB of General Purpose (SSD) storage. The db.t3 DB instance is part of the latest generation burstable performance instance class. The master instance is not configured for Multi-AZ high availability. However, the master and read replica each run in a different Availability Zone (AZ) within the same AWS Region.

Parameter Versioning

When placing parameters into Parameter Store, subsequent updates to a parameter result in the version number of that parameter being incremented. Note in the examples above, the version of the parameter is required by CloudFormation, here, ‘1’. If you chose to update a value in Parameter Store, thus incrementing the parameter’s version, you will also need to update the corresponding version number in the CloudFormation template’s parameter.

{
    "Parameter": {
        "Name": "/rds_demo/rds_username",
        "Type": "String",
        "Value": "masteruser",
        "Version": 1,
        "LastModifiedDate": 1564962073.774,
        "ARN": "arn:aws:ssm:us-east-1:1234567890:parameter/rds_demo/rds_username"
    }
}

Validating Templates

Although I have tested both templates, I suggest validating the templates yourself, as you usually would for any CloudFormation template you are creating. You can use the AWS CLI CloudFormation validate-template CLI command to validate the template. Alternately, or I suggest additionally, you can use CloudFormation Lintercfn-lint command.

aws cloudformation validate-template \
  --template-body file://cfn-templates/rds.template

cfn-lint -t cfn-templates/cfn-templates/rds.template

Create the Stacks

To execute the first CloudFormation template and create a CloudFormation Stack containing the two event notification resources, run the following create-stack CLI command.

aws cloudformation create-stack \
  --template-body file://cfn-templates/event.template \
  --stack-name RDSEventDemoStack

The first stack only takes less than one minute to create. Using the AWS CloudFormation Console, make sure the first stack completes successfully before creating the second stack with the command, below.

aws cloudformation create-stack \
  --template-body file://cfn-templates/rds.template \
  --stack-name RDSDemoStack

screen_shot_2019-08-04_at_10_35_20_pm

Wait for my Text

In my tests, the CloudFormation RDS stack takes an average of 25–30 minutes to create and 15–20 minutes to delete, which can seem like an eternity. You could use the AWS CloudFormation console (shown below) or continue to use the CLI to follow the progress of the RDS stack creation.

screen_shot_2019-08-08_at_7_39_45_pm.png

However, if you recall, the CloudFormation event template creates an AWS RDS Event Subscription. This resource will notify us when the databases are ready by sending text messages to our mobile device.

screen_shot_2019-08-04_at_11_06_31_pm

In the CloudFormation events template, the RDS Event Subscription is configured to generate Amazon Simple Notification Service (SNS) notifications for several specific event types, including RDS instance creation and deletion.

  MyEventSubscription:
    Properties:
      Enabled: true
      EventCategories:
        - availability
        - configuration change
        - creation
        - deletion
        - failover
        - failure
        - recovery
      SnsTopicArn:
        Ref: MyDBSNSTopic
      SourceType: db-instance
    Type: AWS::RDS::EventSubscription

Amazon SNS will send SMS messages to the mobile number you placed into Parameter Store. Below, we see messages generated during the creation of the two instances, displayed on an Apple iPhone.

img-2839

Amazon RDS Dashboard

Once the RDS CloudFormation stack has successfully been built, the easiest way to view the results is using the Amazon RDS Dashboard, as shown below. Here we see both the master and read replica instances have been created and are available for our use.

screen_shot_2019-08-08_at_7_05_24_pm

The RDS dashboard offers CloudWatch monitoring of each RDS instance.

screen_shot_2019-08-08_at_7_06_17_pm

The RDS dashboard also provides detailed configuration information about each RDS instance.

screen_shot_2019-08-08_at_7_06_26_pm

The RDS dashboard’s Connection & security tab is where we can obtain connection information about our RDS instances, including the RDS instance’s endpoints. Endpoints information will be required in the next part of the demonstration.

screen_shot_2019-08-08_at_7_06_01_pm

Sample Data

Now that we have our PostgreSQL database instance and read replica successfully provisioned and configured on AWS, with an empty database, we need some test data. There are several sources of sample PostgreSQL databases available on the internet to explore. We will use the Pagila sample movie rental database by pgFoundry. Although the database is several years old, it provides a relatively complex relational schema (table relationships shown below) and plenty of sample data to query, about 100 database objects and 46K rows of data.

pagila_tablespng

In the GitHub repository, I have included the two Pagila database SQL scripts required to install the sample database’s data structures (DDL), sql-scripts/pagila-schema.sql, and the data itself (DML), sql-scripts/pagila-insert-data.sql.

To execute the Pagila SQL scripts and install the sample data, I have included a Python script. If you do not want to use Python, you can skip to the Adminer section of this post. Adminer also has the capability to import SQL scripts.

Before running any of the included Python scripts, you will need to install the required Python packages and configure the database.ini file.

Python Packages

To install the required Python packages using the supplied python-scripts/requirements.txt file, run the below commands.

cd python-scripts
pip3 install --upgrade -r requirements.txt

We are using two packages, psycopg2 and configparser, for the scripts. Psycopg is a PostgreSQL database adapter for Python. According to their website, Psycopg is the most popular PostgreSQL database adapter for the Python programming language. The configparser module allows us to read configuration from files similar to Microsoft Windows INI files. The unittest package is required for a set of unit tests includes the project, but not discussed as part of the demo.

screen_shot_2019-08-13_at_11_06_10_pm

Database Configuration

The python-scripts/database.ini file, read by configparser, provides the required connection information to our RDS master and read replica instance’s databases. Use the input parameters and output values from the CloudFormation RDS template, or the Amazon RDS Dashboard to obtain the required connection information, as shown in the example, below. Your host values will be unique for your master and read replica. The host values are the instance’s endpoint, listed in the RDS Dashboard’s Configuration tab.

[docker]
host=localhost
port=5432
database=pagila
user=masteruser
password=5up3r53cr3tPa55w0rd

[master]
host=demo-instance.dkfvbjrazxmd.us-east-1.rds.amazonaws.com
port=5432
database=pagila
user=masteruser
password=5up3r53cr3tPa55w0rd

[replica]
host=demo-replica.dkfvbjrazxmd.us-east-1.rds.amazonaws.com
port=5432
database=pagila
user=masteruser
password=5up3r53cr3tPa55w0rd

With the INI file configured, run the following command, which executes a supplied Python script, python-scripts/create_pagila_data.py, to create the data structure and insert sample data into the master RDS instance’s Pagila database. The database will be automatically replicated to the RDS read replica instance. From my local laptop, I found the Python script takes approximately 40 seconds to create all 100 database objects and insert 46K rows of movie rental data. That is compared to about 13 seconds locally, using a Docker-based instance of PostgreSQL.

python3 ./create_pagila_data.py

The Python script’s primary function, create_pagila_db(), reads and executes the two external SQL scripts.

def create_pagila_db():
    """
    Creates Pagila database by running DDL and DML scripts
    """

    try:
        global conn
        with conn:
            with conn.cursor() as curs:
                curs.execute(open("../sql-scripts/pagila-schema.sql", "r").read())
                curs.execute(open("../sql-scripts/pagila-insert-data.sql", "r").read())
                conn.commit()
                print('Pagila SQL scripts executed')
    except (psycopg2.OperationalError, psycopg2.DatabaseError, FileNotFoundError) as err:
        print(create_pagila_db.__name__, err)
        close_conn()
        exit(1)

If the Python script executes correctly, you should see output indicating there are now 28 tables in our master RDS instance’s database.

screen_shot_2019-08-08_at_7_13_11_pm

pgAdmin

pgAdmin is a favorite tool for interacting with and managing PostgreSQL databases. According to its website, pgAdmin is the most popular and feature-rich Open Source administration and development platform for PostgreSQL.

The project includes an optional Docker Swarm stack.yml file. The stack will create a set of three Docker containers, including a local copy of PostgreSQL 11.4, Adminer, and pgAdmin 4. Having a local copy of PostgreSQL, using the official Docker image, is helpful for development and trouble-shooting RDS issues.

screen_shot_2019-08-10_at_1_43_24_pm.png

Use the following commands to deploy the Swarm stack.

# create stack
docker swarm init
docker stack deploy -c stack.yml postgres

# get status of new containers
docker stack ps postgres --no-trunc
docker container ls

If you do not want to spin up the whole Docker Swarm stack, you could use the docker run command to create just a single pgAdmin Docker container. The pgAdmin 4 Docker image being used is the image recommended by pgAdmin.

docker pull dpage/pgadmin4

docker run -p 81:80 \
  -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" \
  -e "PGADMIN_DEFAULT_PASSWORD=SuperSecret" \
  -d dpage/pgadmin4

docker container ls | grep pgadmin4

Database Server Configuration

Once pgAdmin is up and running, we can configure the master and read replica database servers (RDS instances) using the connection string information from your database.ini file or from the Amazon RDS Dashboard. Below, I am configuring the master RDS instance (server).

screen_shot_2019-08-08_at_7_25_27_pm

With that task complete, below, we see the master RDS instance and the read replica, as well as my local Docker instance configured in pgAdmin (left side of screengrab). Note how the Pagila database has been replicated automatically, from the RDS master to the read replica instance.

screen_shot_2019-08-08_at_7_29_00_pm

Building SQL Queries

Switching to the Query tab, we can run regular SQL queries against any of the database instances. Below, I have run a simple SELECT query against the master RDS instance’s Pagila database, returning the complete list of movie titles, along with their genre and release date.

screen_shot_2019-08-08_at_7_27_58_pm

The pgAdmin Query tool even includes an Explain tab to view a graphical representation of the same query, very useful for optimization. Here we see the same query, showing an analysis of the execution order. A popup window displays information about the selected object.

screen_shot_2019-08-08_at_7_28_35_pm

Query the Read Replica

To demonstrate the use of the read replica, below I’ve run the same query against the RDS read replica’s copy of the Pagila database. Any schema and data changes against the master instance are replicated to the read replica(s).

screen_shot_2019-08-08_at_7_30_14_pm

Adminer

Adminer is another good general-purpose database management tool, similar to pgAdmin, but with a few different capabilities. According to its website, with Adminer, you get a tidy user interface, rich support for multiple databases, performance, and security, all from a single PHP file. Adminer is my preferred tool for database admin tasks. Amazingly, Adminer works with MySQL, MariaDB, PostgreSQL, SQLite, MS SQL, Oracle, SimpleDB, Elasticsearch, and MongoDB.

Below, we see the Pagila database’s tables and views displayed in Adminer, along with some useful statistical information about each database object.

screen_shot_2019-08-09_at_7_04_07_am

Similar to pgAdmin, we can also run queries, along with other common development and management tasks, from within the Adminer interface.

screen_shot_2019-08-09_at_7_05_16_am

Import Pagila with Adminer

Another great feature of Adminer is the ability to easily import and export data. As an alternative to Python, you could import the Pagila data using Adminer’s SQL file import function. Below, you see an example of importing the Pagila database objects into the Pagila database, using the file upload function.

screen_shot_2019-08-09_at_7_27_53_am.png

IDE

For writing my AWS infrastructure as code files and Python scripts, I prefer JetBrains PyCharm Professional Edition (v19.2). PyCharm, like all the JetBrains IDEs, has the ability to connect to and manage PostgreSQL database. You can write and run SQL queries, including the Pagila SQL import scripts. Microsoft Visual Studio Code is another excellent, free choice, available on multiple platforms.

screen_shot_2019-08-11_at_9_40_57_pm

Python and RDS

Although our IDE, pgAdmin, and Adminer are useful to build and test our queries, ultimately, we still need to connect to the Amazon RDS PostgreSQL instances and perform data manipulation from our application code. The GitHub repository includes a sample python script, python-scripts/query_postgres.py. This script uses the same Python packages and connection functions as our Pagila data creation script we ran earlier. This time we will perform the same SELECT query using Python as we did previously with pgAdmin and Adminer.

cd python-scripts
python3 ./query_postgres.py

With a successful database connection established, the scripts primary function, get_movies(return_count), performs the SELECT query. The function accepts an integer representing the desired number of movies to return from the SELECT query. A separate function within the script handles closing the database connection when the query is finished.

def get_movies(return_count=100):
    """
    Queries for all films, by genre and year
    """

    try:
        global conn
        with conn:
            with conn.cursor() as curs:
                curs.execute("""
                    SELECT title AS title, name AS genre, release_year AS released
                    FROM film f
                             JOIN film_category fc
                                  ON f.film_id = fc.film_id
                             JOIN category c
                                  ON fc.category_id = c.category_id
                    ORDER BY title
                    LIMIT %s;
                """, (return_count,))

                movies = []
                row = curs.fetchone()
                while row is not None:
                    movies.append(row)
                    row = curs.fetchone()

                return movies
    except (psycopg2.OperationalError, psycopg2.DatabaseError) as err:
        print(get_movies.__name__, err)
    finally:
        close_conn()


def main():
    set_connection('docker')
    for movie in get_movies(10):
        print('Movie: {0}, Genre: {1}, Released: {2}'
              .format(movie[0], movie[1], movie[2]))

Below, we see an example of the Python script’s formatted output, limited to only the first ten movies.

screen_shot_2019-08-13_at_10_51_47_pm.png

Using the Read Replica

For better application performance, it may be optimal to redirect some or all of the database reads to the read replica, while leaving writes, updates, and deletes to hit the master instance. The script can be easily modified to execute the same query against the read replica rather than the master RDS instance by merely passing the desired section, ‘replica’ versus ‘master’, in the call to the set_connection(section) function. The section parameter refers to one of the two sections in the database.ini file. The configparser module will handle retrieving the correct connection information.

set_connection('replica')

Cleaning Up

When you are finished with the demonstration, the easiest way to clean up all the AWS resources and stop getting billed is to delete the two CloudFormation stacks using the AWS CLI, in the following order.

aws cloudformation delete-stack \
  --stack-name RDSDemoStack

# wait until the above resources are completely deleted
aws cloudformation delete-stack \
  --stack-name RDSEventDemoStack

You should receive the following SMS notifications as the first CloudFormation stack is being deleted.

img-2841

You can delete the running Docker stack using the following command. Note, you will lose all your pgAdmin server connection information, along with your local Pagila database.

docker stack rm postgres

Conclusion

In this brief post, we just scraped the surface of the many benefits and capabilities of Amazon RDS for PostgreSQL. The best way to learn PostgreSQL and the benefits of Amazon RDS is by setting up your own RDS instance, insert some sample data, and start writing queries in your favorite database client or programming language.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , ,

Leave a comment

Managing AWS Infrastructure as Code using Ansible, CloudFormation, and CodeBuild

Introduction

When it comes to provisioning and configuring resources on the AWS cloud platform, there is a wide variety of services, tools, and workflows you could choose from. You could decide to exclusively use the cloud-based services provided by AWS, such as CodeBuild, CodePipeline, CodeStar, and OpsWorks. Alternatively, you could choose open-source software (OSS) for provisioning and configuring AWS resources, such as community editions of Jenkins, HashiCorp Terraform, Pulumi, Chef, and Puppet. You might also choose to use licensed products, such as Octopus Deploy, TeamCity, CloudBees Core, Travis CI Enterprise, and XebiaLabs XL Release. You might even decide to write your own custom tools or scripts in Python, Go, JavaScript, Bash, or other common languages.

The reality in most enterprises I have worked with, teams integrate a combination of AWS services, open-source software, custom scripts, and occasionally licensed products to construct complete, end-to-end, infrastructure as code-based workflows for provisioning and configuring AWS resources. Choices are most often based on team experience, vendor relationships, and an enterprise’s specific business use cases.

In the following post, we will explore one such set of easily-integrated tools for provisioning and configuring AWS resources. The tool-stack is comprised of Red Hat Ansible, AWS CloudFormation, and AWS CodeBuild, along with several complementary AWS technologies. Using these tools, we will provision a relatively simple AWS environment, then deploy, configure, and test a highly-available set of Apache HTTP Servers. The demonstration is similar to the one featured in a previous post, Getting Started with Red Hat Ansible for Google Cloud Platform.

ansible-aws-stack2.png

Why Ansible?

With its simplicity, ease-of-use, broad compatibility with most major cloud, database, network, storage, and identity providers amongst other categories, Ansible has been a popular choice of Engineering teams for configuration-management since 2012. Given the wide variety of polyglot technologies used within modern Enterprises and the growing predominance of multi-cloud and hybrid cloud architectures, Ansible provides a common platform for enabling mature DevOps and infrastructure as code practices. Ansible is easily integrated with higher-level orchestration systems, such as AWS CodeBuild, Jenkins, or Red Hat AWX and Tower.

Technologies

The primary technologies used in this post include the following.

Red Hat Ansible

ansibleAnsible, purchased by Red Hat in October 2015, seamlessly provides workflow orchestration with configuration management, provisioning, and application deployment in a single platform. Unlike similar tools, Ansible’s workflow automation is agentless, relying on Secure Shell (SSH) and Windows Remote Management (WinRM). If you are interested in learning more on the advantages of Ansible, they’ve published a whitepaper on The Benefits of Agentless Architecture.

According to G2 Crowd, Ansible is a clear leader in the Configuration Management Software category, ranked right behind GitLab. Competitors in the category include GitLab, AWS Config, Puppet, Chef, Codenvy, HashiCorp Terraform, Octopus Deploy, and JetBrains TeamCity.

AWS CloudFormation

Deployment__Management_copy_AWS_CloudFormation-512

According to AWS, CloudFormation provides a common language to describe and provision all the infrastructure resources within AWS-based cloud environments. CloudFormation allows you to use a JSON- or YAML-based template to model and provision, in an automated and secure manner, all the resources needed for your applications across all AWS regions and accounts.

Codifying your infrastructure, often referred to as ‘Infrastructure as Code,’ allows you to treat your infrastructure as just code. You can author it with any IDE, check it into a version control system, and review the files with team members before deploying it.

AWS CodeBuild

code-build-console-iconAccording to AWS, CodeBuild is a fully managed continuous integration service that compiles your source code, runs tests, and produces software packages that are ready to deploy. With CodeBuild, you don’t need to provision, manage, and scale your own build servers. CodeBuild scales continuously and processes multiple builds concurrently, so your builds are not left waiting in a queue.

CloudBuild integrates seamlessly with other AWS Developer tools, including CodeStar, CodeCommit, CodeDeploy, and CodePipeline.

According to G2 Crowd, the main competitors to AWS CodeBuild, in the Build Automation Software category, include Jenkins, CircleCI, CloudBees Core and CodeShip, Travis CI, JetBrains TeamCity, and Atlassian Bamboo.

Other Technologies

In addition to the major technologies noted above, we will also be leveraging the following services and tools to a lesser extent, in the demonstration:

  • AWS CodeCommit
  • AWS CodePipeline
  • AWS Systems Manager Parameter Store
  • Amazon Simple Storage Service (S3)
  • AWS Identity and Access Management (IAM)
  • AWS Command Line Interface (CLI)
  • CloudFormation Linter
  • Apache HTTP Server

Demonstration

Source Code

All source code for this post is contained in two GitHub repositories. The CloudFormation templates and associated files are in the ansible-aws-cfn GitHub repository. The Ansible Roles and related files are in the ansible-aws-roles GitHub repository. Both repositories may be cloned using the following commands.

git clone --branch master --single-branch --depth 1 --no-tags \ 
  https://github.com/garystafford/ansible-aws-cfn.git

git clone --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/ansible-aws-roles.git

Development Process

The general process we will follow for provisioning and configuring resources in this demonstration are as follows:

  • Create an S3 bucket to store the validated CloudFormation templates
  • Create an Amazon EC2 Key Pair for Ansible
  • Create two AWS CodeCommit Repositories to store the project’s source code
  • Put parameters in Parameter Store
  • Write and test the CloudFormation templates
  • Configure Ansible and AWS Dynamic Inventory script
  • Write and test the Ansible Roles and Playbooks
  • Write the CodeBuild build specification files
  • Create an IAM Role for CodeBuild and CodePipeline
  • Create and test CodeBuild Projects and CodePipeline Pipelines
  • Provision, deploy, and configure the complete web platform to AWS
  • Test the final web platform

Prerequisites

For this demonstration, I will assume you already have an AWS account, the AWS CLI, Python, and Ansible installed locally, an S3 bucket to store the final CloudFormation templates and an Amazon EC2 Key Pair for Ansible to use for SSH.

 Continuous Integration and Delivery Overview

In this demonstration, we will be building multiple CI/CD pipelines for provisioning and configuring our resources to AWS, using several AWS services. These services include CodeCommit, CodeBuild, CodePipeline, Systems Manager Parameter Store, and Amazon Simple Storage Service (S3). The diagram below shows the complete CI/CD workflow we will build using these AWS services, along with Ansible.

aws_devops

AWS CodeCommit

According to Amazon, AWS CodeCommit is a fully-managed source control service that makes it easy to host secure and highly scalable private Git repositories. CodeCommit eliminates the need to operate your own source control system or worry about scaling its infrastructure.

Start by creating two AWS CodeCommit repositories to hold the two GitHub projects your cloned earlier. Commit both projects to your own AWS CodeCommit repositories.

screen_shot_2019-07-26_at_9_02_54_pm

Configuration Management

We have several options for storing the configuration values necessary to provision and configure the resources on AWS. We could set configuration values as environment variables directly in CodeBuild. We could set configuration values from within our Ansible Roles. We could use AWS Systems Manager Parameter Store to store configuration values. For this demonstration, we will use a combination of all three options.

AWS Systems Manager Parameter Store

According to Amazon, AWS Systems Manager Parameter Store provides secure, hierarchical storage for configuration data management and secrets management. You can store data such as passwords, database strings, and license codes as parameter values, as either plain text or encrypted.

The demonstration uses two CloudFormation templates. The two templates have several parameters. A majority of those parameter values will be stored in Parameter Store, retrieved by CloudBuild, and injected into the CloudFormation template during provisioning.

screen_shot_2019-07-26_at_9_38_33_pm

The Ansible GitHub project includes a shell script, parameter_store_values.sh, to put the necessary parameters into Parameter Store. The script requires the AWS Command Line Interface (CLI) to be installed locally. You will need to change the KEY_PATH key value in the script (snippet shown below) to match the location your private key, part of the Amazon EC2 Key Pair you created earlier for use by Ansible.

KEY_PATH="/path/to/private/key"

# put encrypted parameter to Parameter Store
aws ssm put-parameter \
  --name $PARAMETER_PATH/ansible_private_key \
  --type SecureString \
  --value "file://${KEY_PATH}" \
  --description "Ansible private key for EC2 instances" \
  --overwrite

SecureString

Whereas all other parameters are stored in Parameter Store as String datatypes, the private key is stored as a SecureString datatype. Parameter Store uses an AWS Key Management Service (KMS) customer master key (CMK) to encrypt the SecureString parameter value. The IAM Role used by CodeBuild (discussed later) will have the correct permissions to use the KMS key to retrieve and decrypt the private key SecureString parameter value.

screen_shot_2019-07-26_at_9_41_42_pm

CloudFormation

The demonstration uses two CloudFormation templates. The first template, network-stack.template, contains the AWS network stack resources. The template includes one VPC, one Internet Gateway, two NAT Gateways, four Subnets, two Elastic IP Addresses, and associated Route Tables and Security Groups. The second template, compute-stack.template, contains the webserver compute stack resources. The template includes an Auto Scaling Group, Launch Configuration, Application Load Balancer (ALB), ALB Listener, ALB Target Group, and an Instance Security Group. Both templates originated from the AWS CloudFormation template sample library, and were modified for this demonstration.

The two templates are located in the cfn_templates directory of the CloudFormation project, as shown below in the tree view.

.
├── LICENSE.md
├── README.md
├── buildspec_files
│   ├── build.sh
│   └── buildspec.yml
├── cfn_templates
│   ├── compute-stack.template
│   └── network-stack.template
├── codebuild_projects
│   ├── build.sh
│   └── cfn-validate-s3.json
├── codepipeline_pipelines
│   ├── build.sh
│   └── cfn-validate-s3.json
└── requirements.txt

The templates require no modifications for the demonstration. All parameters are in Parameter store or set by the Ansible Roles, and consumed by the Ansible Playbooks via CodeBuild.

Ansible

We will use Red Hat Ansible to provision the network and compute resources by interacting directly with CloudFormation, deploy and configure Apache HTTP Server, and finally, perform final integration tests of the system. In my opinion, the closest equivalent to Ansible on the AWS platform is AWS OpsWorks. OpsWorks lets you use Chef and Puppet (direct competitors to Ansible) to automate how servers are configured, deployed, and managed across Amazon EC2 instances or on-premises compute environments.

Ansible Config

To use Ansible with AWS and CloudFormation, you will first want to customize your project’s ansible.cfg file to enable the aws_ec2 inventory plugin. Below is part of my configuration file as a reference.

[defaults]
gathering = smart
fact_caching = jsonfile
fact_caching_connection = /tmp
fact_caching_timeout = 300

host_key_checking = False
roles_path = roles
inventory = inventories/hosts
remote_user = ec2-user
private_key_file = ~/.ssh/ansible

[inventory]
enable_plugins = host_list, script, yaml, ini, auto, aws_ec2

Ansible Roles

According to Ansible, Roles are ways of automatically loading certain variable files, tasks, and handlers based on a known file structure. Grouping content by roles also allows easy sharing of roles with other users. For the demonstration, I have written four roles, located in the roles directory, as shown below in the project tree view. The default, common role is not used in this demonstration.

.
├── LICENSE.md
├── README.md
├── ansible.cfg
├── buildspec_files
│   ├── buildspec_compute.yml
│   ├── buildspec_integration_tests.yml
│   ├── buildspec_network.yml
│   └── buildspec_web_config.yml
├── codebuild_projects
│   ├── ansible-test.json
│   ├── ansible-web-config.json
│   ├── build.sh
│   ├── cfn-compute.json
│   ├── cfn-network.json
│   └── notes.md
├── filter_plugins
├── group_vars
├── host_vars
├── inventories
│   ├── aws_ec2.yml
│   ├── ec2.ini
│   ├── ec2.py
│   └── hosts
├── library
├── module_utils
├── notes.md
├── parameter_store_values.sh
├── playbooks
│   ├── 10_cfn_network.yml
│   ├── 20_cfn_compute.yml
│   ├── 30_web_config.yml
│   └── 40_integration_tests.yml
├── production
├── requirements.txt
├── roles
│   ├── cfn_compute
│   ├── cfn_network
│   ├── common
│   ├── httpd
│   └── integration_tests
├── site.yml
└── staging

The four roles include a role for provisioning the network, the cfn_network role. A role for configuring the compute resources, the cfn_compute role. A role for deploying and configuring the Apache servers, the httpd role. Finally, a role to perform final integration tests of the platform, the integration_tests role. The individual roles help separate the project’s major parts, network, compute, and middleware, into logical code files. Each role was initially built using Ansible Galaxy (ansible-galaxy init). They follow Galaxy’s standard file structure, as shown in the tree view below, of the cfn_network role.

.
├── README.md
├── defaults
│   └── main.yml
├── files
├── handlers
│   └── main.yml
├── meta
│   └── main.yml
├── tasks
│   ├── create.yml
│   ├── delete.yml
│   └── main.yml
├── templates
├── tests
│   ├── inventory
│   └── test.yml
└── vars
    └── main.yml

Testing Ansible Roles

In addition to checking each role during development and on each code commit with Ansible Lint, each role contains a set of unit tests, in the tests directory, to confirm the success or failure of the role’s tasks. Below we see a basic set of tests for the cfn_compute role. First, we gather Facts about the deployed EC2 instances. Facts information Ansible can automatically derive from your remote systems. We check the facts for expected properties of the running EC2 instances, including timezone, Operating System, major OS version, and the UserID. Note the use of the failed_when conditional. This Ansible playbook error handling conditional is used to confirm the success or failure of tasks.

---
- name: Test cfn_compute Ansible role
  gather_facts: True
  hosts: tag_Group_webservers

  pre_tasks:
  - name: List all ansible facts
    debug:
      msg: "{{ ansible_facts }}"

  tasks:
  - name: Check if EC2 instance's timezone is set to 'UTC'
    debug:
      msg: Timezone is UTC
    failed_when: ansible_facts['date_time']['tz'] != 'UTC'

  - name: Check if EC2 instance's OS is 'Amazon'
    debug:
      msg: OS is Amazon
    failed_when: ansible_facts['distribution_file_variety'] != 'Amazon'

  - name: Check if EC2 instance's OS major version is '2018'
    debug:
      msg: OS major version is 2018
    failed_when: ansible_facts['distribution_major_version'] != '2018'

  - name: Check if EC2 instance's UserID is 'ec2-user'
    debug:
      msg: UserID is ec2-user
    failed_when: ansible_facts['user_id'] != 'ec2-user'

If we were to run the test on their own, against the two correctly provisioned and configured EC2 web servers, we would see results similar to the following.

screen_shot_2019-07-26_at_6_55_04_pm

In the cfn_network role unit tests, below, note the use of the Ansible cloudformation_facts module. This module allows us to obtain facts about the successfully completed AWS CloudFormation stack. We can then use these facts to drive additional provisioning and configuration, or testing. In the task below, we get the network CloudFormation stack’s Outputs. These are the exact same values we would see in the stack’s Output tab, from the AWS CloudFormation management console.

---
- name: Test cfn_network Ansible role
  gather_facts: False
  hosts: localhost

  pre_tasks:
    - name: Get facts about the newly created cfn network stack
      cloudformation_facts:
        stack_name: "ansible-cfn-demo-network"
      register: cfn_network_stack_facts

    - name: List 'stack_outputs' from cached facts
      debug:
        msg: "{{ cloudformation['ansible-cfn-demo-network'].stack_outputs }}"

  tasks:
  - name: Check if the AWS Region of the VPC is {{ lookup('env','AWS_REGION') }}
    debug:
      msg: "AWS Region of the VPC is {{ lookup('env','AWS_REGION') }}"
    failed_when: cloudformation['ansible-cfn-demo-network'].stack_outputs['VpcRegion'] != lookup('env','AWS_REGION')

Similar to the CloudFormation templates, the Ansible roles require no modifications. Most of the project’s parameters are decoupled from the code and stored in Parameter Store or CodeBuild buildspec files (discussed next). The few parameters found in the roles, in the defaults/main.yml files are neither account- or environment-specific.

Ansible Playbooks

The roles will be called by our Ansible Playbooks. There is a create and a delete set of tasks for the cfn_network and cfn_compute roles. Either create or delete tasks are accessible through the role, using the main.yml file and referencing the create or delete Ansible Tags.

---
- import_tasks: create.yml
  tags:
    - create

- import_tasks: delete.yml
  tags:
    - delete

Below, we see the create tasks for the cfn_network role, create.yml, referenced above by main.yml. The use of the cloudcormation module in the first task allows us to create or delete AWS CloudFormation stacks and demonstrates the real power of Ansible—the ability to execute complex AWS resource provisioning, by extending its core functionality via a module. By switching the Cloud module, we could just as easily provision resources on Google Cloud, Azure, AliCloud, OpenStack, or VMWare, to name but a few.

---
- name: create a stack, pass in the template via an S3 URL
  cloudformation:
    stack_name: "{{ stack_name }}"
    state: present
    region: "{{ lookup('env','AWS_REGION') }}"
    disable_rollback: false
    template_url: "{{ lookup('env','TEMPLATE_URL') }}"
    template_parameters:
      VpcCIDR: "{{ lookup('env','VPC_CIDR') }}"
      PublicSubnet1CIDR: "{{ lookup('env','PUBLIC_SUBNET_1_CIDR') }}"
      PublicSubnet2CIDR: "{{ lookup('env','PUBLIC_SUBNET_2_CIDR') }}"
      PrivateSubnet1CIDR: "{{ lookup('env','PRIVATE_SUBNET_1_CIDR') }}"
      PrivateSubnet2CIDR: "{{ lookup('env','PRIVATE_SUBNET_2_CIDR') }}"
      TagEnv: "{{ lookup('env','TAG_ENVIRONMENT') }}"
    tags:
      Stack: "{{ stack_name }}"

The CloudFormation parameters in the above task are mainly derived from environment variables, whose values were retrieved from the Parameter Store by CodeBuild and set in the environment. We obtain these external values using Ansible’s Lookup Plugins. The stack_name variable’s value is derived from the role’s defaults/main.yml file. The task variables use the Python Jinja2 templating system style of encoding.

variables

The associated Ansible Playbooks, which call the tasks, are located in the playbooks directory, as shown previously in the tree view. The playbooks define a few required parameters, like where the list of hosts will be derived and calls the appropriate roles. For our simple demonstration, only a single role is called per playbook. Typically, in a larger project, you would call multiple roles from a single playbook. Below, we see the Network playbook, playbooks/10_cfn_network.yml, which calls the cfn_network role.

---
- name: Provision VPC and Subnets
  hosts: localhost
  connection: local
  gather_facts: False

  roles:
    - role: cfn_network

Dynamic Inventory

Another principal feature of Ansible is demonstrated in the Web Server Configuration playbook, playbooks/30_web_config.yml, shown below. Note the hosts to which we want to deploy and configure Apache HTTP Server is based on an AWS tag value, indicated by the reference to tag_Group_webservers. This indirectly refers to an AWS tag, named Group, with the value of webservers, which was applied to our EC2 hosts by CloudFormation. The ability to generate a Dynamic Inventory, using a dynamic external inventory system, is a key feature of Ansible.

---
- name: Configure Apache Web Servers
  hosts: tag_Group_webservers
  gather_facts: False
  become: yes
  become_method: sudo

  roles:
    - role: httpd

To generate a dynamic inventory of EC2 hosts, we are using the Ansible AWS EC2 Dynamic Inventory script, inventories/ec2.py and inventories/ec2.ini files. The script dynamically queries AWS for all the EC2 hosts containing specific AWS tags, belonging to a particular Security Group, Region, Availability Zone, and so forth.

I have customized the AWS EC2 Dynamic Inventory script’s configuration in the inventories/aws_ec2.yml file. Amongst other configuration items, the file defines  keyed_groups. This instructs the script to inventory EC2 hosts according to their unique AWS tags and tag values.

plugin: aws_ec2
remote_user: ec2-user
private_key_file: ~/.ssh/ansible
regions:
  - us-east-1
keyed_groups:
  - key: tags.Name
    prefix: tag_Name_
    separator: ''
  - key: tags.Group
    prefix: tag_Group_
    separator: ''
hostnames:
  - dns-name
  - ip-address
  - private-dns-name
  - private-ip-address
compose:
  ansible_host: ip_address

Once you have built the CloudFormation compute stack in the proceeding section of the demonstration, to build the dynamic EC2 inventory of hosts, you would use the following command.

ansible-inventory -i inventories/aws_ec2.yml --graph

You would then see an inventory of all your EC2 hosts, resembling the following.

@all:
  |--@aws_ec2:
  |  |--ec2-18-234-137-73.compute-1.amazonaws.com
  |  |--ec2-3-95-215-112.compute-1.amazonaws.com
  |--@tag_Group_webservers:
  |  |--ec2-18-234-137-73.compute-1.amazonaws.com
  |  |--ec2-3-95-215-112.compute-1.amazonaws.com
  |--@tag_Name_Apache_Web_Server:
  |  |--ec2-18-234-137-73.compute-1.amazonaws.com
  |  |--ec2-3-95-215-112.compute-1.amazonaws.com
  |--@ungrouped:

Note the two EC2 web servers instances, listed under tag_Group_webservers. They represent the target inventory onto which we will install Apache HTTP Server. We could also use the tag, Name, with the value tag_Name_Apache_Web_Server.

AWS CodeBuild

Recalling our diagram, you will note the use of CodeBuild is a vital part of each of our five DevOps workflows. CodeBuild is used to 1) validate the CloudFormation templates, 2) provision the network resources,  3) provision the compute resources, 4) install and configure the web servers, and 5) run integration tests.

aws_devops

Splitting these processes into separate workflows, we can redeploy the web servers without impacting the compute resources or redeploy the compute resources without affecting the network resources. Often, different teams within a large enterprise are responsible for each of these resources categories—architecture, security (IAM), network, compute, web servers, and code deployments. Separating concerns makes a shared ownership model easier to manage.

Build Specifications

CodeBuild projects rely on a build specification or buildspec file for its configuration, as shown below. CodeBuild’s buildspec file is synonymous to Jenkins’ Jenkinsfile. Each of our five workflows will use CodeBuild. Each CodeBuild project references a separate buildspec file, included in the two GitHub projects, which by now you have pushed to your two CodeCommit repositories.

screen_shot_2019-07-26_at_6_10_59_pm

Below we see an example of the buildspec file for the CodeBuild project that deploys our AWS network resources, buildspec_files/buildspec_network.yml.

version: 0.2

env:
  variables:
    TEMPLATE_URL: "https://s3.amazonaws.com/garystafford_cloud_formation/cf_demo/network-stack.template"
    AWS_REGION: "us-east-1"
    TAG_ENVIRONMENT: "ansible-cfn-demo"
  parameter-store:
    VPC_CIDR: "/ansible_demo/vpc_cidr"
    PUBLIC_SUBNET_1_CIDR: "/ansible_demo/public_subnet_1_cidr"
    PUBLIC_SUBNET_2_CIDR: "/ansible_demo/public_subnet_2_cidr"
    PRIVATE_SUBNET_1_CIDR: "/ansible_demo/private_subnet_1_cidr"
    PRIVATE_SUBNET_2_CIDR: "/ansible_demo/private_subnet_2_cidr"

phases:
  install:
    runtime-versions:
      python: 3.7
    commands:
      - pip install -r requirements.txt -q
  build:
    commands:
      - ansible-playbook -i inventories/aws_ec2.yml playbooks/10_cfn_network.yml --tags create  -v
  post_build:
    commands:
      - ansible-playbook -i inventories/aws_ec2.yml roles/cfn_network/tests/test.yml

There are several distinct sections to the buildspec file. First, in the variables section, we define our variables. They are a combination of three static variable values and five variable values retrieved from the Parameter Store. Any of these may be overwritten at build-time, using the AWS CLI, SDK, or from the CodeBuild management console. You will need to update some of the variables to match your particular environment, such as the TEMPLATE_URL to match your S3 bucket path.

Next, the phases of our build. Again, if you are familiar with Jenkins, think of these as Stages with multiple Steps. The first phase, install, builds a Docker container, in which the build process is executed. Here we are using Python 3.7. We also run a pip command to install the required Python packages from our requirements.txt file. Next, we perform our build phase by executing an Ansible command.

 ansible-playbook \
  -i inventories/aws_ec2.yml \
  playbooks/10_cfn_network.yml --tags create -v

The command calls our playbook, playbooks/10_cfn_network.yml. The command references the create tag. This causes the playbook to run to cfn_network role’s create tasks (roles/cfn_network/tasks/create.yml), as defined in the main.yml file (roles/cfn_network/tasks/main.yml). Lastly, in our post_build phase, we execute our role’s unit tests (roles/cfn_network/tests/test.yml), using a second Ansible command.

CodeBuild Projects

Next, we need to create CodeBuild projects. You can do this using the AWS CLI or from the CodeBuild management console (shown below). I have included individual templates and a creation script in each project, in the codebuild_projects directory, which you could use to build the projects, using the AWS CLI. You would have to modify the JSON templates, replacing all references to my specific, unique AWS resources, with your own. For the demonstration, I suggest creating the five projects manually in the CodeBuild management console, using the supplied CodeBuild project templates as a guide.

screen_shot_2019-07-26_at_6_10_12_pm

CodeBuild IAM Role

To execute our CodeBuild projects, we need an IAM Role or Roles CodeBuild with permission to such resources as CodeCommit, S3, and CloudWatch. For this demonstration, I chose to create a single IAM Role for all workflows. I then allowed CodeBuild to assign the required policies to the Role as needed, which is a feature of CodeBuild.

screen_shot_2019-07-26_at_6_52_23_pm

CodePipeline Pipeline

In addition to CodeBuild, we are using CodePipeline for our first of five workflows. CodePipeline validates the CloudFormation templates and pushes them to our S3 bucket. The pipeline calls the corresponding CodeBuild project to validate each template, then deploys the valid CloudFormation templates to S3.

codepipeline

In true CI/CD fashion, the pipeline is automatically executed every time source code from the CloudFormation project is committed to the CodeCommit repository.

screen_shot_2019-07-26_at_6_12_51_pm

CodePipeline calls CodeBuild, which performs a build, based its buildspec file. This particular CodeBuild buildspec file also demonstrates another ability of CodeBuild, executing an external script. When we have a complex build phase, we may choose to call an external script, such as a Bash or Python script, verses embedding the commands in the buildspec.

version: 0.2

phases:
  install:
    runtime-versions:
      python: 3.7
  pre_build:
    commands:
      - pip install -r requirements.txt -q
      - cfn-lint -v
  build:
    commands:
      - sh buildspec_files/build.sh

artifacts:
  files:
    - '**/*'
  base-directory: 'cfn_templates'
  discard-paths: yes

Below, we see the script that is called. Here we are using both the CloudFormation Linter, cfn-lint, and the cloudformation validate-template command to validate our templates for comparison. The two tools give slightly different, yet relevant, linting results.

#!/usr/bin/env bash

set -e

for filename in cfn_templates/*.*; do
    cfn-lint -t ${filename}
    aws cloudformation validate-template \
      --template-body file://${filename}
done

Similar to the CodeBuild project templates, I have included a CodePipeline template, in the codepipeline_pipelines directory, which you could modify and create using the AWS CLI. Alternatively, I suggest using the CodePipeline management console to create the pipeline for the demo, using the supplied CodePipeline template as a guide.

screen_shot_2019-07-26_at_6_11_51_pm

Below, the stage view of the final CodePipleine pipeline.

screen_shot_2019-07-26_at_6_12_26_pm

Build the Platform

With all the resources, code, and DevOps workflows in place, we should be ready to build our platform on AWS. The CodePipeline project comes first, to validate the CloudFormation templates and place them into your S3 bucket. Since you are probably not committing new code to the CloudFormation file CodeCommit repository,  which would trigger the pipeline, you can start the pipeline using the AWS CLI (shown below) or via the management console.

# list names of pipelines
aws codepipeline list-pipelines

# execute the validation pipeline
aws codepipeline start-pipeline-execution --name cfn-validate-s3

screen_shot_2019-07-26_at_6_08_03_pm

The pipeline should complete within a few seconds.

screen_shot_2019-07-26_at_10_12_53_pm.png

Next, execute each of the four CodeBuild projects in the following order.

# list the names of the projects
aws codebuild list-projects

# execute the builds in order
aws codebuild start-build --project-name cfn-network
aws codebuild start-build --project-name cfn-compute

# ensure EC2 instance checks are complete before starting
# the ansible-web-config build!
aws codebuild start-build --project-name ansible-web-config
aws codebuild start-build --project-name ansible-test

As the code comment above states, be careful not to start the ansible-web-config build until you have confirmed the EC2 instance Status Checks have completed and have passed, as shown below. The previous, cfn-compute build will complete when CloudFormation finishes building the new compute stack. However, the fact CloudFormation finished does not indicate that the EC2 instances are fully up and running. Failure to wait will result in a failed build of the ansible-web-config CodeBuild project, which installs and configures the Apache HTTP Servers.

screen_shot_2019-07-26_at_6_27_52_pm

Below, we see the cfn_network CodeBuild project first building a Python-based Docker container, within which to perform the build. Each build is executed in a fresh, separate Docker container, something that can trip you up if you are expecting a previous cache of Ansible Facts or previously defined environment variables, persisted across multiple builds.

screen_shot_2019-07-26_at_6_15_12_pm

Below, we see the two completed CloudFormation Stacks, a result of our CodeBuild projects and Ansible.

screen_shot_2019-07-26_at_6_44_43_pm

The fifth and final CodeBuild build tests our platform by attempting to hit the Apache HTTP Server’s default home page, using the Application Load Balancer’s public DNS name.

screen_shot_2019-07-26_at_6_32_09_pm

Below, we see an example of what happens when a build fails. In this case, one of the final integration tests failed to return the expected results from the ALB endpoint.

screen_shot_2019-07-26_at_6_40_37_pm

Below, with the bug is fixed, we rerun the build, which re-executed the tests, successfully.

screen_shot_2019-07-26_at_6_38_21_pm

We can manually confirm the platform is working by hitting the same public DNS name of the ALB as our tests in our browser. The request should load-balance our request to one of the two running web server’s default home page. Normally, at this point, you would deploy your application to Apache, using a software continuous deployment tool, such as Jenkins, CodeDeploy, Travis CI, TeamCity, or Bamboo.

screen_shot_2019-07-26_at_6_39_26_pm

Cleaning Up

To clean up the running AWS resources from the demonstration, first delete the CloudFormation compute stack, then delete the network stack. To do so, execute the following commands, one at a time. The commands call the same playbooks we called to create the stacks, except this time, we use the delete tag, as opposed to the create tag.

# first delete cfn compute stack
ansible-playbook \ 
  -i inventories/aws_ec2.yml \ 
  playbooks/20_cfn_compute.yml -t delete -v

# then delete cfn network stack
ansible-playbook \ 
  -i inventories/aws_ec2.yml \ 
  playbooks/10_cfn_network.yml -t delete -v

You should observe the following output, indicating both CloudFormation stacks have been deleted.

screen_shot_2019-07-26_at_7_12_38_pm

Confirm the stacks were deleted from the CloudFormation management console or from the AWS CLI.

 

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , , , ,

Leave a comment

IoT Telemetry Collection using Google Protocol Buffers, Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas

Business team meeting. Photo professional investor working new s
Collect IoT sensor telemetry using Google Protocol Buffers’ serialized binary format over HTTPS, serverless Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas on GCP, as an alternative to integrated Cloud IoT platforms and standard IoT protocols. Aggregate, analyze, and build machine learning models with the data using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

Introduction

Most of the dominant Cloud providers offer IoT (Internet of Things) and IIotT  (Industrial IoT) integrated services. Amazon has AWS IoT, Microsoft Azure has multiple offering including IoT Central, IBM’s offering including IBM Watson IoT Platform, Alibaba Cloud has multiple IoT/IIoT solutions for different vertical markets, and Google offers Google Cloud IoT platform. All of these solutions are marketed as industrial-grade, highly-performant, scalable technology stacks. They are capable of scaling to tens-of-thousands of IoT devices or more and massive amounts of streaming telemetry.

In reality, not everyone needs a fully integrated IoT solution. Academic institutions, research labs, tech start-ups, and many commercial enterprises want to leverage the Cloud for IoT applications, but may not be ready for a fully-integrated IoT platform or are resistant to Cloud vendor platform lock-in.

Similarly, depending on the performance requirements and the type of application, organizations may not need or want to start out using IoT/IIOT industry standard data and transport protocols, such as MQTT (Message Queue Telemetry Transport) or CoAP (Constrained Application Protocol), over UDP (User Datagram Protocol). They may prefer to transmit telemetry over HTTP using TCP, or securely, using HTTPS (HTTP over TLS).

Demonstration

In this demonstration, we will collect environmental sensor data from a number of IoT device sensors and stream that telemetry over the Internet to Google Cloud. Each IoT device is installed in a different physical location. The devices contain a variety of common sensors, including humidity and temperature, motion, and light intensity.

iot_3.jpg

Prototype IoT Devices used in this Demonstration

We will transmit the sensor telemetry data as JSON over HTTP to serverless Google Cloud Function HTTPS endpoints. We will then switch to using Google’s Protocol Buffers to transmit binary data over HTTP. We should observe a reduction in the message size contained in the request payload as we move from JSON to Protobuf, which should reduce system latency and cost.

Data received by Cloud Functions over HTTP will be published asynchronously to Google Cloud Pub/Sub. A second Cloud Function will respond to all published events and push the messages to MongoDB Atlas on GCP. Once in Atlas, we will aggregate, transform, analyze, and build machine learning models with the data, using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

For this demonstration, the architecture for JSON over HTTP will look as follows. All sensors will transmit data to a single Cloud Function HTTPS endpoint.

JSON IoT Basic Icons.png

For Protobuf over HTTP, the architecture will look as follows in the demonstration. Each type of sensor will transmit data to a different Cloud Function HTTPS endpoint.

Demo IoT Diagram Icons.png

Although the Cloud Functions will automatically scale horizontally to accommodate additional load created by the volume of telemetry being received, there are also other options to scale the system. For example, we could create individual pipelines of functions and topic/subscriptions for each sensor type. We could also split the telemetry data across multiple MongoDB Atlas Collections, based on sensor type, instead of a single collection. In all cases, we will still benefit from the Cloud Function’s horizontal scaling capabilities.

Complex IoT Diagram Icons.png

Source Code

All source code is all available on GitHub. Use the following command to clone the project.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/iot-protobuf-demo.git

You will need to adjust the project’s environment variables to fit your own development and Cloud environments. All source code for this post is written in Python. It is intended for Python 3 interpreters but has been tested using Python 2 interpreters. The project’s Jupyter Notebooks can be viewed from within the project on GitHub or using the free, online Jupyter nbviewer.

screen_shot_2019-05-21_at_12_55_25_pm

Technologies

Protocol Buffers

Image result for google developerAccording to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.

Each protocol buffer message is a small logical record of information, containing a series of strongly-typed name-value pairs. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto file to generate data access classes.

Google Cloud Functions

Cloud-Functions.png

According to Google, Cloud Functions is Google’s event-driven, serverless compute platform. Key features of Cloud Functions include automatic scaling, high-availability, fault-tolerance,
no servers to provision, manage, patch or update, only
pay while your code runs, and they easily connect and extend other cloud services. Cloud Functions natively support multiple event-types, including HTTP, Cloud Pub/Sub, Cloud Storage, and Firebase. Current language support includes Python, Go, and Node.

Google Cloud Pub/Sub

google pub-subAccording to Google, Cloud Pub/Sub is an enterprise message-oriented middleware for the Cloud. It is a scalable, durable event ingestion and delivery system. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independent applications. Cloud Pub/Sub delivers low-latency, durable messaging that integrates with systems hosted on the Google Cloud Platform and externally.

MongoDB Atlas

mongodbMongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime service-level agreements, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, free M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

Cost Effectiveness of Cloud Functions

At true IIoT scale, Google Cloud Functions may not be the most efficient or cost-effective method of ingesting telemetry data. Based on Google’s pricing model, you get two million free function invocations per month, with each additional million invocations costing USD $0.40. The total cost also includes memory usage, total compute time, and outbound data transfer. If your system is comprised of tens or hundreds of IoT devices, Cloud Functions may prove cost-effective.

However, with thousands of devices or more, each transmitting data multiple times per minutes, you could quickly outgrow the cost-effectiveness of Google Functions. In that case, you might look to Google’s Google Cloud IoT platform. Alternately, you can build your own platform with Google products such as Knative, letting you choose to run your containers either fully managed with the newly-released Cloud Run, or in your Google Kubernetes Engine cluster with Cloud Run on GKE.

Sensor Scripts

For each sensor type, I have developed separate Python scripts, which run on each IoT device. There are two versions of each script, one for JSON over HTTP and one for Protobuf over HTTP.

JSON over HTTPS

Below we see the script, dht_sensor_http_json.py, used to transmit humidity and temperature data via JSON over HTTP to a Google Cloud Function running on GCP. The JSON request payload contains a timestamp, IoT device ID, device type, and the temperature and humidity sensor readings. The URL for the Google Cloud Function is stored as an environment variable, local to the IoT devices, and set when the script is deployed.

import json
import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests

URL = os.environ.get('GCF_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)
        payload = {'device': device_id,
                   'type': TYPE,
                   'timestamp': time.time(),
                   'data': {'temperature': temperature,
                            'humidity': humidity}}
        post_data(payload)
        time.sleep(FREQUENCY)


def post_data(payload):
    payload = json.dumps(payload)

    headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': JWT
    }

    try:
        requests.post(URL, json=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Telemetry Frequency

Although the sensors are capable of producing data many times per minute, for this demonstration, sensor telemetry is intentionally limited to only being transmitted every 15 seconds. To reduce system complexity, potential latency, back-pressure, and cost, in my opinion, you should only produce telemetry data at the frequency your requirements dictate.

JSON Web Tokens

For security, in addition to the HTTPS endpoints exposed by the Google Cloud Functions, I have incorporated the use of a JSON Web Token (JWT). JSON Web Tokens are an open, industry standard RFC 7519 method for representing claims securely between two parties. In this case, the JWT is used to verify the identity of the sensor scripts sending telemetry to the Cloud Functions. The JWT contains an id, password, and expiration, all encrypted with a secret key, which is known to each Cloud Function, in order to verify the IoT device’s identity. Without the correct JWT being passed in the Authorization header, the request to the Cloud Function will fail with an HTTP status code of 401 Unauthorized. Below is an example of the JWT’s payload data.

{
  "sub": "IoT Protobuf Serverless Demo",
  "id": "iot-demo-key",
  "password": "t7J2gaQHCFcxMD6584XEpXyzWhZwRrNJ",
  "iat": 1557407124,
  "exp": 1564664724
}

For this demonstration, I created a temporary JWT using jwt.io. The HTTP Functions are using PyJWT, a Python library which allows you to encode and decode the JWT. The PyJWT library allows the Function to decode and validate the JWT (Bearer Token) from the incoming request’s Authorization header. The JWT token is stored as an environment variable. Deployment instructions are included in the GitHub project.

screen_shot_2019-05-09_at_5_13_28_pm

JSON Payload

Below is a typical JSON request payload (pretty-printed), containing DHT sensor data. This particular message is 148 bytes in size. The message format is intentionally reader-friendly. We could certainly shorten the message’s key fields, to reduce the payload size by an additional 15-20 bytes.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Protocol Buffers

For the demonstration, I have built a Protocol Buffers file, sensors.proto, to support the data output by three sensor types: digital humidity and temperature (DHT), passive infrared sensor (PIR), and digital light intensity (DLI). I am using the newer proto3 version of the protocol buffers language. I have created a common Protobuf sensor message schema, with the variable sensor telemetry stored in the nested data object, within each message type.

It is important to use the correct Protobuf Scalar Value Type to maintain numeric precision in the language you compile for. For simplicity, I am using a double to represent the timestamp, as well as the numeric humidity and temperature readings. Alternately, you could choose Google’s Protobuf WellKnownTypesTimestamp to store timestamp.

syntax = "proto3";

package sensors;

// DHT22
message SensorDHT {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDHT data = 4;
}

message DataDHT {
    double temperature = 1;
    double humidity = 2;
}

// Onyehn_PIR
message SensorPIR {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataPIR data = 4;
}

message DataPIR {
    bool motion = 1;
}

// Anmbest_MD46N
message SensorDLI {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDLI data = 4;
}

message DataDLI {
    bool light = 1;
}

Since the sensor data will be captured with scripts written in Python 3, the Protocol Buffers file is compiled for Python, resulting in the file, sensors_pb2.py.

protoc --python_out=. sensors.proto

Protocol Buffers over HTTPS

Below we see the alternate DHT sensor script, dht_sensor_http_pb.py, which transmits a Protocol Buffers-based binary request payload over HTTPS to a Google Cloud Function running on GCP. Note the request’s Content-Type header has been changed from application/json to application/x-protobuf. In this case, instead of JSON, the same data fields are stored in an instance of the Protobuf’s SensorDHT message type (sensors_pb2.SensorDHT()). Note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device.

import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests
import sensors_pb2

URL = os.environ.get('GCF_DHT_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        try:
            humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)

            sensor_dht = sensors_pb2.SensorDHT()
            sensor_dht.device = device_id
            sensor_dht.type = TYPE
            sensor_dht.timestamp = time.time()
            sensor_dht.data.temperature = temperature
            sensor_dht.data.humidity = humidity

            payload = sensor_dht.SerializeToString()

            post_data(payload)
            time.sleep(FREQUENCY)
        except TypeError:
            logging.error('Error getting sensor data!')


def post_data(payload):
    headers = {
        'Content-Type': 'application/x-protobuf',
        'Authorization': JWT
    }

    try:
        requests.post(URL, data=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Protobuf Binary Payload

To understand the binary Protocol Buffers-based payload, we can write a sample SensorDHT message to a file on disk as a byte array.

message = sensorDHT.SerializeToString()

binary_file_output = open("./data_binary.txt", "wb")
file_byte_array = bytearray(message)
binary_file_output.write(file_byte_array)

Then, using the hexdump command, we can view a representation of the binary data file.

> hexdump -C data_binary.txt
00000000  0a 08 38 32 39 63 37 65  30 65 12 05 44 48 54 32  |..829c7e0e..DHT2|
00000010  32 1d 05 a0 b9 4e 22 0a  0d ec 51 b2 41 15 cd cc  |2....N"...Q.A...|
00000020  38 42                                             |8B|
00000022

The binary data file size is 48 bytes on disk, as compared to the equivalent JSON file size of 148 bytes on disk (32% the size). As a test, we could then send that binary data file as the payload of a POST to the Cloud Function, as shown below using Postman. Postman will serialize the binary data file’s contents to a binary string before transmitting.

screen_shot_2019-05-14_at_7_00_39_am.png

Similarly, we can serialize the same binary Protocol Buffers-based SensorDHT message to a binary string using the SerializeToString method.

message = sensorDHT.SerializeToString()
print(message)

The resulting binary string resembles the following.

b'\n\nrp829c7e0e\x12\x05DHT22\x19c\xee\xbcg\xf5\x8e\xccA"\x12\t\x00\x00\x00\xa0\x99\x191@\x11\x00\x00\x00`f\x06Q@'

The binary string length of the serialized message, and therefore the request payload sent by Postman and received by the Cloud Function for this particular message, is 111 bytes, as compared to the JSON payload size of 148 bytes (75% the size).

Validate Protobuf Payload

To validate the data contained in the Protobuf payload is identical to the JSON payload, we can parse the payload from the serialized binary string using the Protobuf ParseFromString method. We then convert it to JSON using the Protobuf MessageToJson method.

message = sensorDHT.SerializeToString() 
message_parsed = sensors_pb2.SensorDHT()
message_parsed.ParseFromString(message)
print(MessageToJson(message_parsed))

The resulting JSON object is identical to the JSON payload sent using JSON over HTTPS, earlier in the demonstration.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Google Cloud Functions

There are a series of Google Cloud Functions, specifically four HTTP Functions, which accept the sensor data over HTTP from the IoT devices. Each function exposes an HTTPS endpoint. According to Google, you use HTTP functions when you want to invoke your function via an HTTP(S) request. To allow for HTTP semantics, HTTP function signatures accept HTTP-specific arguments.

Below, I have deployed a single function that accepts JSON sensor telemetry from all sensor types, and three functions for Protobuf, one for each sensor type: DHT, PIR, and DLI.

screen-shot-2019-05-13-at-8_34_49-pm

JSON Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming JSON over HTTPS payload from all sensor types. Once the request’s JWT is validated, the JSON message payload is serialized to a byte string and sent to a common Google Cloud Pub/Sub Topic. Note the JWT secret key, id, and password, and the Google Cloud Pub/Sub Topic are all stored as environment variables, local to the Cloud Functions. In my tests, the JSON-based HTTP Functions took an average of 9–18 ms to execute successfully.

import logging
import os

import jwt
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    request_json = request.get_json()
    if not request_json:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    send_message(request_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, 
                      data=bytes(str(message), 'utf-8'))

The Cloud Functions are deployed to GCP using the gcloud functions deploy CLI command (I use Jenkins to automate the deployments). I have wrapped the deploy commands into bash scripts. The script also copies over a common environment variables YAML file, consumed by the Cloud Function. Each Function has a deployment script, included in the project.

# get latest env vars file
cp -f ./../env_vars_file/env.yaml .

# deploy function
gcloud functions deploy http_json_to_pubsub \
  --runtime python37 \
  --trigger-http \
  --region us-central1 \
  --memory 256 \
  --entry-point incoming_message \
  --env-vars-file env.yaml

Using a .gcloudignore file, the gcloud functions deploy CLI command deploys three files: the cloud function (main.py), required Python packages file (requirements.txt), the environment variables file (env.yaml). Google automatically installs dependencies using the requirements.txt file.

Protobuf Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming Protobuf over HTTPS payload from DHT sensor types. Once the sensor data Protobuf message payload is received by the HTTP Function, it is deserialized to JSON and then serialized to a byte string. The byte string is then sent to a Google Cloud Pub/Sub Topic. In my tests, the Protobuf-based HTTP Functions took an average of 7–14 ms to execute successfully.

As before, note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device. It is used to parse a serialized message into its original Protobuf’s SensorDHT message type.

import logging
import os

import jwt
import sensors_pb2
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1
from google.protobuf.json_format import MessageToJson

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    data = request.get_data()
    if not data:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    sensor_pb = sensors_pb2.SensorDHT()
    sensor_pb.ParseFromString(data)
    sensor_json = MessageToJson(sensor_pb)
    send_message(sensor_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, data=bytes(message, 'utf-8'))

Cloud Pub/Sub Functions

In addition to HTTP Functions, the demonstration uses a function triggered by Google Cloud Pub/Sub Triggers. According to Google, Cloud Functions can be triggered by messages published to Cloud Pub/Sub Topics in the same GCP project as the function. The function automatically subscribes to the Topic. Below, we see that the function has automatically subscribed to iot-data-demo Cloud Pub/Sub Topic.

screen_shot_2019-05-09_at_2_41_17_pm

Sending Telemetry to MongoDB Atlas

The common Cloud Function, triggered by messages published to Cloud Pub/Sub, then sends the messages to MongoDB Atlas. There is a minimal amount of cleanup required to re-format the Cloud Pub/Sub messages to BSON (binary JSON). Interestingly, according to bsonspec.org, BSON can be com­pared to bin­ary inter­change for­mats, like Proto­col Buf­fers. BSON is more schema-less than Proto­col Buf­fers, which can give it an ad­vant­age in flex­ib­il­ity but also a slight dis­ad­vant­age in space ef­fi­ciency (BSON has over­head for field names with­in the seri­al­ized data).

The function uses the PyMongo to connect to MongoDB Atlas. According to their website, PyMongo is a Python distribution containing tools for working with MongoDB and is the recommended way to work with MongoDB from Python.

import base64
import json
import logging
import os
import pymongo

MONGODB_CONN = os.environ.get('MONGODB_CONN')
MONGODB_DB = os.environ.get('MONGODB_DB')
MONGODB_COL = os.environ.get('MONGODB_COL')


def read_message(event, context):
    message = base64.b64decode(event['data']).decode('utf-8')
    message = message.replace("'", '"')
    message = message.replace('True', 'true')
    message = json.loads(message)

    client = pymongo.MongoClient(MONGODB_CONN)
    db = client[MONGODB_DB]
    col = db[MONGODB_COL]
    col.insert_one(message)

The function responds to the published events and sends the messages to the MongoDB Atlas cluster, running in the same Region, us-central1, as the Cloud Functions and Pub/Sub Topic. Below, we see the current options available when provisioning an Atlas cluster.

screen_shot_2019-05-09_at_6_17_18_pm

MongoDB Atlas provides a rich, web-based UI for managing and monitoring MongoDB clusters, databases, collections, security, and performance.

screen_shot_2019-05-10_at_10_08_14_pm

Although Cloud Pub/Sub to Atlas function execution times are longer in duration than the HTTP functions, the latency is greatly reduced by locating the Cloud Pub/Sub Topic, Cloud Functions, and MongoDB Atlas cluster into the same GCP Region. Cross-region execution times were as high as 500-600 ms, while same-region execution times averaged 200-225 ms. Selecting a more performant Atlas cluster would likely result in even lower function execution times.

screen_shot_2019-05-10_at_10_16_49_pm

Aggregating Data with MongoDB Compass

MongoDB Compass is a free, convenient, desktop application for interacting with your MongoDB databases. You can view the collected sensor data, review message (document) schema, manage indexes, and build complex MongoDB aggregations.

screen_shot_2019-05-14_at_5_19_17_pm

screen_shot_2019-05-21_at_1_17_40_pm.pngscreen_shot_2019-05-21_at_1_15_09_pm

When performing analytics or machine learning, I primarily use MongoDB Compass to preview the captured telemetry data and build aggregation pipelines. Aggregation operations process data records and returns computed results. This feature saves a ton of time, filtering and preparing data for further analysis, visualization, and machine learning with Jupyter Notebooks.

screen_shot_2019-05-14_at_5_22_58_pm

Aggregation pipelines can be directly exported to Java, Node, C#, and Python 3. The exported aggregation pipeline code can be placed directly into your Python applications and Jupyter Notebooks.

screen_shot_2019-05-14_at_5_23_39_pm

Below, the exported aggregation pipelines code from MongoDB Compass is used to load a resultset directly into a Pandas DataFrame. This particular aggregation returns time-series DHT sensor data from a specific IoT device over a 72-hour period.

DEVICE_1 = 'rp59adf374'
pipeline = [
    {
        '$match': {
            'type': 'DHT22', 
            'device': DEVICE_1, 
            'timestamp': {
                '$gt': 1557619200,
                '$lt': 1557792000
            }
        }
    }, {
        '$project': {
            '_id': 0,
            'timestamp': 1, 
            'temperature': '$data.temperature', 
            'humidity': '$data.humidity'
        }
    }, {
        '$sort': {
            'timestamp': 1
        }
    }
]
aggResult = iot_data.aggregate(pipeline)
df1 = pd.DataFrame(list(aggResult))

MongoDB Atlas Performance

In this demonstration, from Python3-based Jupyter Notebooks, I was able to consistently query a MongoDB Atlas collection of almost 70k documents for resultsets containing 3 days (72 hours) worth of digital temperature and humidity data, roughly 10.2k documents, in an average of 825 ms. That is round trip from my local development laptop to MongoDB Atlas running on GCP, in a different geographic region.

Query times on GCP are much faster, such as when running a Notebook in JupyterLab on Google’s AI Platform, or a PySpark job with Cloud Dataproc, against Atlas. Running the same Jupyter Notebook directly on Google’s AI Platform, the same MongoDB Atlas query took an average of 450 ms versus 825 ms (1.83x faster). This was across two different GCP Regions; same Region times should be even faster.

screen-shot-2019-05-13-at-9_09_52-pm

GCP Observability

There are several choices for observing the system’s Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas. As shown above, the GCP Cloud Functions interface lets you see the individual function executions, execution times, memory usage, and active instances, over varying time intervals.

For a more detailed view of Google Cloud Functions and Google Cloud Pub/Sub, I built two custom dashboards using Stackdriver. According to Google, Stackdriver aggregates metrics, logs, and events from infrastructure, giving developers and operators a rich set of observable signals. I built a custom Stackdriver Cloud Functions dashboard (shown below) and a Cloud Pub/Sub Topics and Subscriptions dashboard.

For functions, I chose to display execution times, memory usage, the number of executions, and network egress, all in a single pane of glass, using four graphs. Below, I am using the 95th percentile average for monitoring. The 95th percentile asserts that 95% of the time, the observed values are below this amount and the remaining 5% of the time, the observed values are above that amount.

screen_shot_2019-05-10_at_10_13_37_pm

Data Analysis using Jupyter Notebooks

According to jupyter.org, the Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The widespread use of Jupyter Notebooks has grown significantly, as Big Data, AI, and ML have all experienced explosive growth.

PyCharm

JetBrains PyCharm, my favorite Python IDE, has direct integrations with Jupyter Notebooks. In fact, PyCharm’s most recent updates to the Professional Edition greatly enhanced those integrations. PyCharm offers round-trip editing in the IDE and the Jupyter Notebook web browser interface. PyCharm allows you to run and debug individual cells within the notebook. PyCharm automatically starts the Jupyter Server and appropriate kernel for the Notebook you have opened. And, one of my favorite features, PyCharm’s variable viewer tracks the current value of a variable, automatically.

screen_shot_2019-05-10_at_10_38_42_am

Below, we see the example Analytics Notebook, included in the demonstration’s project, displayed in PyCharm 19.1.2 (Professional Edition). To effectively work with Notebooks in PyCharm really requires a full-size monitor. Working on a laptop with PyCharm’s crowded Notebook UI is workable, but certainly not as effective as on a larger monitor.

screen_shot_2019-05-20_at_2_23_46_pm.png

Jupyter Notebook Server

Below, we see the same Analytics Notebook, shown above in PyCharm, opened in Jupyter Notebook Server’s web-based client interface, running locally on the development workstation. The web browser-based interface also offers a rich set of features for Notebook development.

From within the Notebook, we are able to query the data from MongoDB Atlas, again using PyMongo, and load the resultsets into Panda DataFrames. As an alternative to hard-coded values and environment variables, with Notebooks, I use the python-dotenv Python package. This package allows me to place my environment variables in a common .env file and reference them from any Notebook. The package has many options for managing environment variables.

screen_shot_2019-05-19_at_9_46_32_am.png

We can then analyze the data using a number of common frameworks, including PandasMatplotlib, SciPy, PySpark, and NumPy, to name but a few. Below, we see time series data from four different sensors, on the same IoT device. Viewing the data together, we can study the causal effect of one environment variable on another, such as the impact of light on temperature or humidity.

screen_shot_2019-05-23_at_5_25_44_pm

Below, we can use histograms to visualize temperature frequencies for
intervals, over time, for a given device location.

screen-shot-2019-05-13-at-9_13_35-pm

Machine Learning using Jupyter Notebooks

In addition to data analytics, we can use Jupyter Notebooks with tools such as scikit-learn to build machine learning models based on our sensor telemetry.  Scikit-learn is a set of machine learning tools in Python, built on NumPy, SciPy, and matplotlib. Below, I have used JupyterLab on Google’s AI Platform and scikit-learn to build several models, based on the sensor data.

screen_shot_2019-05-19_at_12_25_45_pm

screen_shot_2019-05-19_at_12_26_45_pm

Using scikit-learn, we can build models to predict such things as which IoT device generated a specific temperature and humidity reading, or the temperature and humidity, given the time of day, device location, and external environment variables, or discover anomalies in the sensor telemetry.

Scikit-learn makes it easy to construct randomized training and test datasets, to build models, using data from multiple IoT devices, as shown below.

screen_shot_2019-05-19_at_12_27_39_pm

The project includes a Jupyter Notebook that demonstrates how to build several ML models using sensor data. Examples of supervised learning algorithms used to build the classification models in this demonstration include Support Vector Machine (SVM), k-nearest neighbors (k-NN), and Random Forest Classifier.

screen_shot_2019-05-19_at_12_30_38_pm

Having data from multiple sensors, we are able to enrich the ML models by adding additional categorical (discrete) features to our training data. For example, we could look at the effect of light, motion, and time of day on temperature and humidity.

screen_shot_2019-05-19_at_12_29_25_pm

Conclusion

Hopefully, this post has demonstrated how to efficiently collect telemetry data from IoT devices using Google Protocol Buffers over HTTPS, serverless Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas, all on the Google Cloud Platform. Once captured, the telemetry data was easily aggregated and analyzed using common tools, such as MongoDB Compass and Jupyter Notebooks. Further, we used the data and tools to build machine learning models for prediction and anomaly detection.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

Image: everythingpossible © 123RF.com

, , , , , , , , , ,

Leave a comment

Apache Solr: Because your Database is not a Search Engine

In this post, we will examine what sets Apache Solr aside as a search engine, from conventional databases like MongoDB. We will explore the similarities and differences between Solr and MongoDB by analyzing a series of comparative queries. We will then delve into some of Solr’s more advanced search capabilities.

Copyright: Dejan Bozic (123RF)

Why Search?

The ability to search for information is a basic requirement of many applications. Architects and Developers who limit themselves to traditional databases, often attempt to meet search requirements by creating unnecessarily and overly complex SQL query-based solutions. They force end-users to search in unnatural or highly-structured ways or provide results that lack a sense of relevancy. End-users are not Database Administrators, they do not understand the nuances of SQL, they simply want relevant responses to their inquiries.

In a scenario where data consumers are arbitrarily searching for relevant information within a distinct domain, implementing a search-optimized, Lucene-based platform, such as Elasticsearch or Apache Solr, for reads, is often an effective solution.

Separating database reads from writes is not uncommon. I’ve worked on many projects where the requirements suggested an architecture in which one type of data storage technology should be implemented to optimize for writes, while a different type or types of data storage technologies should be implemented to optimize for reads. Architectures in which this is common include the following.

  • CQRS (Command Query Responsibility Segregation) and Event Sourcing;
  • Reporting, Data Analytics, and Big Data;
  • ML (Machine Learning) and AI (Artificial Intelligence);
  • Real-Time and Streaming Data (such as IoT);
  • Search: Content, Document, Knowledge;

In this post, we will examine the search capabilities of Apache Solr. We will compare and contrast Solr’s search capabilities to those of MongoDB, the leading NoSQL database. We will consider the differences between querying for data and searching for information.

lucene

Apache Lucene

According to Apache, the Apache Lucene project develops open-source search software, including the following sub-projects: Lucene Core, Solr, and PyLucene. The Lucene Core sub-project provides Java-based indexing and search technology, as well as spellchecking, hit highlighting, and advanced analysis/tokenization capabilities.

Apache Lucene 7.7.0 and Apache Solr 7.7.0 were just released in February 2019 and used for all the post’s examples.

solr_logoApache Solr

According to Apache, Apache Solr is the popular, blazing fast, open source, enterprise search platform built on Apache Lucene. Solr powers the search and navigation features of many of the world’s largest internet sites.

Apache Solr includes the ability to set up a cluster of Solr servers that combines fault tolerance and high availability. Referred to as SolrCloud, and backed by Apache Zookeeper, these capabilities provide distributed, sharded, and replicated indexing and search capabilities.

According to Wikipedia, Solr was created at CNET Networks in 2004, donated to the Apache Software Foundation in 2006, and graduated from their incubator in 2007. Solr version 1.3 was released in 2008. In 2010, the Lucene and Solr projects merged; Solr became a Lucene subproject. With Apache Solr 7.7.0 just released, Solr has well over ten years of development and enterprise adoption behind it.

mongodbMongoDB

The leading NoSQL database, MongoDB, describes itself as a document database with the scalability and flexibility that you want with the querying and indexing that users need. Mongo features include ad hoc queries, indexing, and real-time aggregation, which provide powerful ways to access and analyze your data.

Released less than a year ago, MongoDB 4.0 added multi-document ACID transactions, data type conversions, non-blocking secondary replica reads, SHA-2 authentication, MongoDB Compass aggregation pipeline builder, Kubernetes integration, and the MongoDB Stitch serverless platform. MongoDB 4.0.6 was just released in February 2019 and used for all the post’s examples.

Comparing Search Features

Solr and MongoDB appear to have many search-related features in common.

  • Both Solr and MongoDB are document-based data stores;
  • Both Solr and MongoDB use a non-relational data model;
  • Both feature advanced querying and indexing capabilities;
  • Solr implements Lucene-based search capabilities; MongoDB has text-based search capabilities;
  • Solr scores the relevance of search results using the Lucene scoring algorithm; MongoDB has the capability of ranking text search results using the $meta operator;
  • Solr is able to selectively boost the relative importance of search fields and specific values in a field when calculating scores; MongoDB has the capability of boosting the relative importance of fields used in a text search using text indexes;
  • Both Solr and MongoDB are capable of implementing stop words, stemming, and tokenization;

Demonstration

Source Code Examples

All examples shown in this post are available as a series of Python 3 scripts, contained in an open-source project on GitHub, searching-solr-vs-mongodb. The project contains the script, query_mongo.py, which uses the Python driver for MongoDB, pymongo, to execute all the MongoDB queries in this post. The project also contains the script, query_solr.py, which uses the lightweight Python wrapper for Apache Solr, pysolr, to execute all the Solr searches in this post. Both packages, along with ancillary packages, may be installed with pip.

pip3 install pysolr pymongo bson.json_util requests

MongoDB and Solr Instances

To follow along, you will need your own MongoDB and Solr instances. Both are easily stood up locally with Docker, using the official MongoDB and Solr Docker Hub images. Example docker run commands are shown below.

The second command, the Solr command, also creates a new Solr core. The command also bind-mounts the ‘conf’ directory, within the local project, into the container. This will give us the ability to modify our index’s configuration and to store that configuration in source control. All data is ephemeral, neither container persists data outside the container, using these particular commands.

docker run --name mongo -p 27017:27017 -d mongo:latest

docker run --name solr -d \
  -p 8983:8983 \
  -v $PWD/conf:/conf \
  solr:latest \
  solr-create -c movies -d /conf

screen_shot_2019-02-24_at_1_33_33_am

Environment Variables

The source code expects two environment variables, which contain the connection information for MongoDB and Solr. You will need to replace the values below with your own connection strings if they are different than the examples below, used for Docker.

export SOLR_URL="http://localhost:8983/solr"
export MONOGDB_CONN="mongodb://localhost:27017/movies"

screen_shot_2019-02-23_at_8_57_13_am

Importing Movies to MongoDB

For this post, we will be using a publicly available movie dataset from MongoDB. A copy of the dataset is available in the project, as well as on MongoDB’s website, Setup and Import the Data.

Assuming you have an instance of MongoDB accessible and have set the two environment variables above, import the specially-formatted JSON file for MongoDB, movieDetails_mongo.json, directly into the movies database’s movieDetails collection, using the following mongoimport command.

mongoimport \
  --uri $MONOGDB_CONN \
  --collection "movieDetails" \
  --drop --file "data/movieDetails_mongo.json"

screen_shot_2019-02-23_at_8_57_42_am

Below is a view of the movies database’s movieDetails collection, running in the Docker container, as shown in the MongoDB Compass application.

Screen Shot 2019-02-25 at 7.45.40 AM.png

Indexing Movies to Solr

Assuming you have an instance of Solr accessible and have set the two environment variables above, import the contents of the JSON file, movieDetails.json, by running the Python script, solr_index_movies.py, using the following command.

python3 ./solr_index_movies.py

The command executes a series of HTTP calls to Solr’s exposed RESTful API.

screen_shot_2019-02-23_at_10_28_56_pm.png

Below is a view of the Solr Administration User Interface, running within the Docker container, and showing the new movies core. After running the script, we should have 2,250 movie documents indexed.

screen_shot_2019-02-23_at_9_15_42_am

The Solr Admin UI offers a number of useful tools for examing indexes, reviewing schemas and field types, and creating, analyzing, and debugging Solr queries. Below we see the Query UI with the results of a query displayed.

screen_shot_2019-02-25_at_7_34_22_am

Tuning the Solr Index

The movies index uses a default schema, which was created when the movie documents were indexed. To optimize our query results, we will want to make a few adjustments to the default movies schema. First, we want to ensure that our Solr searches consider the pluralization of words. For example, when we search for the search term ‘Adventure’, we want Solr to also return documents containing terms like adventure, adventures, adventure’s, adventuring, and adventurer, but not misadventure. This is known as Stemming, or reducing words to their word stem. An example is shown below in the Solr Analysis UI.

screen_shot_2019-03-01_at_7_08_37_am.png

The fields that we want to search, such as title, plot, and genres, were all indexed by default as the ‘text_general’ Solr field type. The ‘text_general’ field type does not implement stemming when indexing or querying. We need to switch the title, plot, and genres fields to the ‘text_en’ (English text) field type. The ‘text_en’ field type implements multiple indexing and querying filters, including the PorterStemFilterFactory filter, which removes common endings from words. Similar filters include the English Minimal Stem Filter and the English Possessive Filter.

Additionally, the MultiValued field property is set to true by default for these fields in Solr. Since the title and plot fields, amongst others, were only intended to hold a single text value, as opposed to an array of values, we will switch the MultiValued field property to false. This helps with sorting and filtering, and the correct deserialization of documents.

The solr_index_movies.py script will change the title, plot, and genres fields from text_general’ to ‘text_en’ and change the title and plot fields from multi-valued to single-valued. Since we have changed the index’s schema, the script will re-import all the documents after making the schema changes.

To get a better sense of what the schema changes look like, let’s look at the equivalent cURL command to change the schema. This gives you a better sense of the field-level modifications we are making.

You can use the Schema UI to view the results, as shown below. Note the new field types for title, plot, and genres. Also, note the index and query analyzers, including the  PorterStemFilterFactory, used by the ‘text_en’ field type.

screen_shot_2019-02-23_at_10_07_42_pm

Comparative Queries

To demonstrate the similarities and the differences between Solr and MongoDB, we will examine a series of comparative queries, followed by a series of Solr-only searches. Again, all queries and output shown are included in the two project’s Python scripts.

Query 1a: All Documents

To start, we will perform a simple query for all the movie documents in the MongoDB collection, followed by the Solr index. With MongoDB, we use the find method. With Solr, we will use the Standard Query Parser, commonly known as the ‘lucene’ query parser, and the q (query) parameter. The result of the queries should be identical, with all 2,250 documents returned.

MongoDB:

Parameters
----------
query: {}
  
Results
----------
document count: 2250

Solr:

Parameters
----------
q: *:*
kwargs: {}
  
Results
----------
document count: 2250

Query 1b: Count Only

We can alter our first query to limit our response to only the count of documents for a given query in MongoDB; no documents will be returned. Since our query is empty, we will get back a count of all documents in the MongoDB database’s collection.

db.movieDetails.count()

Similarly, in Solr, we can set the rows parameter to zero to return only the document count. For brevity, we can also omit the Solr response header using the omitHeader  parameter.

Parameters
----------
q: *:*
kwargs: {
  'omitHeader': 'true', 
  'rows': '0'
}

Results
----------
document count: 2250

Query 2: Exact Search

Next, we will perform a query for the exact movie title, ‘Star Wars: Episode V – The Empire Strikes Back’ in MongoDB, then Solr. Again, the results of the queries should be identical, with one document returned, matching the title.

screen_shot_2019-02-25_at_7_41_07_am

MongoDB:

Parameters
----------
query: {'title': 'Star Wars: Episode V - The Empire Strikes Back'}
projection: {'_id': 0, 'title': 1}
  
Results
----------
document count: 1
{'title': 'Star Wars: Episode V - The Empire Strikes Back'}

The quotes around the title are key for Solr to view the query as a single phrase as opposed to a series of search terms.

Solr:

Parameters
----------
q: title:"Star Wars: Episode V - The Empire Strikes Back"
kwargs: {
  'defType': 'lucene', 
  'fl': 'title, score'
}
  
Results
----------
document count: 1
{'title': 'Star Wars: Episode V - The Empire Strikes Back', 'score': 29.41}

Note the use of 'defType': 'lucene' is optional. The standard Lucene query parser is the default parser used by Solr. I am merely showing this parameter to improve the reader’s understanding. Later, we will use other query parsers.

Query 3: Search Phrase

Next, we will perform a query for the phrase ‘star wars’. With MongoDB, we will use the $regex and $options Evaluation Query Operators. The results from both the MongoDB and Solr queries should be identical, the six Star Wars movies are returned.

MongoDB:

Parameters
----------
query: {
  'title': {'$regex': '\\bstar wars\\b', 
  '$options': 'i'}
}
projection: {'_id': 0, 'title': 1}
  
Results
----------
document count: 6
{'title': 'Star Wars: Episode I - The Phantom Menace'}
{'title': 'Star Wars: Episode II - Attack of the Clones'}
{'title': 'Star Wars: Episode III - Revenge of the Sith'}
{'title': 'Star Wars: Episode IV - A New Hope'}
{'title': 'Star Wars: Episode V - The Empire Strikes Back'}
{'title': 'Star Wars: Episode VI - Return of the Jedi'}

With Solr, wrapping the phrase ‘star wars’ in quotes ensures Solr will treat the query string as an exact phrase, not individual search terms. Solr results are scored, but scores are almost all identical since all six movies contain the exact phrase.

Solr:

Parameters
----------
q: "star wars"
kwargs: {
  'defType': 'lucene', 
  'df': 'title', 
  'fl': 'title, score'
}
  
Results
----------
document count: 6
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'score': 8.21}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'score': 8.21}
{'title': 'Star Wars: Episode IV - A New Hope', 'score': 8.21}
{'title': 'Star Wars: Episode I - The Phantom Menace', 'score': 8.21}
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'score': 8.21}
{'title': 'Star Wars: Episode V - The Empire Strikes Back', 'score': 7.55}

Here is the actual Lucene query (q) Solr will run.

title:"star war"

Query 4: Search Terms

Next, we will perform a query for all movies whose title contains either the search terms ‘star’ or ‘wars’, as opposed to the phrase, ‘star wars’. The Solr web console has a very powerful Analysis tool. Using the Analysis tool, we can examine how each filter (abbreviated in the far left column, below), associated with a particular field type, will impact the matching capabilities of Solr. To use the Analysis tool, place your search term(s) or phrase on the right side, an indexed field value on the left, and choose a field or field type from the dropdown.

Below, we see how the search terms ‘Star’ and ‘Wars’ (shown below right) would match a series of variations on the two words (shown below left) if the fields being searched are of the field type, ‘text_en’, as discussed earlier. For example, a query for ‘Star’ would match ‘star’, ‘stars’, ‘star‘s’, ‘starring’, ‘starred’, ‘star-shaped’, but not ‘shaped’, ‘superstars’, ‘started’.

screen_shot_2019-02-23_at_9_21_37_am

Below, we see similar results for the search term, ‘Wars’.

screen_shot_2019-02-23_at_9_27_12_am

MongoDB Text Search

To accomplish the query with MongoDB, we will use MongoDB’s $text Evaluation Query Operator. The MongoDB $text operator is able to perform a text search across multiple fields indexed with a text index. MongoDB’s text indexes support text search queries on string content. The text indexes can include any field whose value is a string or an array of string elements, such as the movieDetail collection’s genres field. Although not as powerful as Solr’s search capabilities, MongoDB’s text search may address many basic search requirements without the need to augment the architecture with a search engine.

For our next query, we will rely on a text index on the title field. When the Python script runs, it creates the following three indexes on the collection, including the title text index.

With the text index in place, the result of the queries should be identical, with 18 documents returned. Both the MongoDB and Solr resultsets are scored, however, both are scored differently, using different algorithms.

MongoDB:

Parameters
----------
query: {
  '$text': {'$search': 'star wars', '$language': 'en', '$caseSensitive': False}, 
  'countries': 'USA'
}
projection: {'score': {'$meta': 'textScore'}, '_id': 0, 'title': 1}
sort: [('score', {'$meta': 'textScore'})]
  
Results
----------
document count: 18
{'title': 'Star Wars: Episode I - The Phantom Menace', 'score': 1.2}
{'title': 'Star Wars: Episode IV - A New Hope', 'score': 1.17}
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'score': 1.17}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'score': 1.17}
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'score': 1.17}

Solr:

Parameters
----------
q: star wars
kwargs: {
  'defType': 'lucene', 
  'fq': 'countries: USA' 
  'df': 'title', 
  'fl': 'title, score', 
  'rows': '5'
}
  
Results
----------
document count: 18
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'score': 8.21}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'score': 8.21}
{'title': 'Star Wars: Episode IV - A New Hope', 'score': 8.21}
{'title': 'Star Wars: Episode I - The Phantom Menace', 'score': 8.21}
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'score': 8.21}

Here is the actual Lucene query (q) Solr will run. The countries filter is applied afterward.

title:star title:war

Find me a Good Western

Query 5a: Multiple Search Terms

Next, we will perform a query for movies, produced in the USA, with the search terms ‘western’, ‘action’, or ‘adventure’ in the movie genres field. The genres field may hold multiple genre values. Although this is a simple query, we can start to see the advantages of Solr’s Lucene scoring capability to provide a way to measure the relevancy of individual results.

Even limited to the USA-based movies, this genres query returns a large number of results, 244 documents. With MongoDB, we have no sense of which documents are more relevant than others. Compared to the Solr results, MongoDB got a few in the top five results, but not the most relevant, based on matching all or most of the genres.

MongoDB:

Parameters
----------
query: {
  'genres': {'$in': ['Adventure', 'Action', 'Western']}, 
  'countries': 'USA'
}
projection: {'_id': 0, 'genres': 1, 'title': 1}
  
Results
----------
document count: 244
{'title': 'Wild Wild West', 'genres': ['Action', 'Western', 'Comedy']}
{'title': 'A Million Ways to Die in the West', 'genres': ['Comedy', 'Western']}
{'title': 'An American Tail: Fievel Goes West', 'genres': ['Animation', 'Adventure', 'Family']}
{'title': 'Once Upon a Time in the West', 'genres': ['Western']}
{'title': 'How the West Was Won', 'genres': ['Western']}

However, with Solr’s scoring, we see the first (top) result, ‘The Wild Bunch’, has a score of 7.18. It genres contain exactly ‘western’, ‘action’, or ‘adventure’. The last (bottom) result, ‘S.S. Doomtrooper’, has a score of 1.47. The most relevant result scored nearly 5x higher (488%) than the least most relevant result. If you were searching for a western action adventure movie, it is pretty apparent the top Solr result, ‘The Wild Bunch’, is a much better choice than the bottom result, ‘S.S. Doomtrooper’. In fact, as shown below, all five top-scoring Solr results look pretty promising based on their score, genres, and title.

Solr:

Parameters
----------
q: adventure action western
kwargs: {
  'defType': 'lucene', 
  'fq': 'countries: USA', 
  'df': 'genres', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 244
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 7.18}
{'title': 'Crossfire Trail', 'genres': ['Action', 'Western'], 'score': 6.26}
{'title': 'The Big Trail', 'genres': ['Adventure', 'Western', 'Romance'], 'score': 5.46}
{'title': 'Once Upon a Time in the West', 'genres': ['Western'], 'score': 5.26}
{'title': 'How the West Was Won', 'genres': ['Western'], 'score': 5.26}

Here is the actual Lucene query (q) Solr will run. The countries filter is applied afterward.

genres:adventur genres:action genres:western

Query 5b: Required Search Term

There are nearly endless options that can be used with Solr to influence Solr’s results. For example, we could perform the same Solr query above, but this time require that the word ‘western’ is the genres field, by using the plus symbol (+boolean operator. The top five results and scores are the same, but the total number of relevant results have decreased from 244 to just 24. That means 220 of the previous results contained ‘action’, and/or ‘adventure’, but not ‘western’. The opposite is also true, using the minus symbol (-) boolean operator will ensure the results do not contain a particular word or phrase.

Solr:

Parameters
----------
q: adventure action +western
kwargs: { 
  'defType': 'lucene', 
  'fq': 'countries: USA', 
  'df': 'genres', 
  'fl': 'title, genres, score', 
  'rows': '5'
} 
  
Results
----------
document count: 24
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 7.18}
{'title': 'Crossfire Trail', 'genres': ['Action', 'Western'], 'score': 6.26}
{'title': 'The Big Trail', 'genres': ['Adventure', 'Western', 'Romance'], 'score': 5.46}
{'title': 'Once Upon a Time in the West', 'genres': ['Western'], 'score': 5.26}
{'title': 'How the West Was Won', 'genres': ['Western'], 'score': 5.26}

Here is the actual Lucene query (q) Solr will run. The countries filter is applied afterward.

(genres:adventur genres:action) +genres:western

Query 6a: eDisMax Query

For our next query, we will compare Solr’s eDisMax query parser to MongoDB’s text search capabilities.

Solr Extended DisMax

According to Solr, The DisMax query parser is designed to process simple phrases and to search for individual terms across several fields using different weighting (boosts) based on the significance of each field. Additional options enable users to influence the score based on rules specific to each use case (independent of user input). Solr’s Extended DisMax (eDisMax) query parser is an improved version of the DisMax query parser.

In my opinion, in addition to the Lucene-based scoring, the ability to easily search across multiple fields and selectively boost results with the DisMax and eDisMax query parsers is what starts to differentiate querying data in a database, from searching for relevant results with a search engine.

Multi-Field Text Index

For our next query, the Python script will drop the previous MongoDB text index on the title field and create a new compound text index, which will incorporate the title, plot, and genres fields.

Below, we see the new compound text index in the MongoDB Compass application’s Indexes tab.

screen_shot_2019-02-23_at_11_39_06_am

We will perform a query for movies, produced in the USA, with the search terms ‘western’, ‘action’, or ‘adventure’ in the movie title, plot, or genres fields. The results of the queries should be identical, with 259 documents returned. Both the MongoDB and Solr resultsets are scored, but again, the scores and ordering of results are not identical. Of the top ten results, the two queries matched six movies in their top ten results.

MongoDB:

Parameters
----------
query: {
  '$text': {'$search': 'western action adventure', '$language': 'en', '$caseSensitive': False}, 
  'countries': 'USA'
}
projection: {'score': {'$meta': 'textScore'}, '_id': 0, 'title': 1}
  
Results
----------
document count: 259
{'title': 'Zathura: A Space Adventure', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 3.3}
{'title': 'The Extraordinary Adventures of Adèle Blanc-Sec', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 3.24}
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 3.2}
{'title': 'The Adventures of Tintin', 'genres': ['Animation', 'Action', 'Adventure'], 'score': 2.85}
{'title': 'Adventures in Babysitting', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 2.85}

Solr:

Parameters
----------
q: western action adventure
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 259
{'title': 'The Secret Life of Walter Mitty', 'genres': ['Adventure', 'Comedy', 'Drama'], 'score': 7.67}
{'title': 'Western Union', 'genres': ['History', 'Western'], 'score': 7.39}
{'title': 'The Adventures of Tintin', 'genres': ['Animation', 'Action', 'Adventure'], 'score': 7.36}
{'title': 'Adventures in Babysitting', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 7.36}
{'title': 'The Poseidon Adventure', 'genres': ['Action', 'Adventure', 'Drama'], 'score': 7.36}

Here is the actual Lucene query Solr will run.

+(
  (title:adventur | plot:adventur | genres:adventur) 
  (title:action | plot:action | genres:action) 
  (title:western | plot:western | genres:western)
)

Query 6b: Boosting Fields

If you really wanted a ‘western action adventure’ movie as opposed to a ‘western’, ‘action’, or ‘adventure’ movie, then neither Solr or MongoDB’s probably completely satisfied you with their first five search results. Boosting or weighting fields can often provide more relevant search results if the correct fields are boosted, and the amount of positive or negative boost is appropriate.

MongoDB’s text indexes also allow for weighting individual fields. The weight of an indexed field denotes the significance of the field relative to the other indexed fields and directly impacts the text search score. Weighting fields are the equivalent to boosting fields with Solr. Below, we see a modification applied to our previous text index in which the title field is given twice the weight of the plot field (1.0 is default) and the genres field is given twice the weight of the title field. The Python script also applies this index for you.

Likewise, Solr is also capable of boosting fields for both the DisMax and eDisMax query parsers. For our next query, we will repeat the previous query, but boost fields in the eDisMax’s qf (Query Fields) parameter to match the boost in the MongoDB weighted text index, shown above.

The results of the queries should be identical, with 259 documents returned. MongoDB and Solr’s results are scored and ordered differently. However, compared to the previous, un-weighted/boosted MongoDB and Solr query results above, the relative scores are higher, the order of movies returned are different, and most importantly, the Solr results seem more relevant to the original search intent.

For example with Solr, take the movie, ‘The Secret Life of Walter Mitty’, which previously scored highest at 7.58. In the boosted search results, the movie, ‘The Wild Bunch’ is now ranked first with a score of 28.71. The movie, ‘The Secret Life of Walter Mitty’ is no longer even in the top 50 Solr results. Comparatively, other movies, like ‘The Adventures of Tintin’ and ‘Adventures in Babysitting’, barely moved in position even though their scores changed proportionally.

MongoDB:

Parameters
----------
query: {
  '$text': {'$search': 'western action adventure', '$language': 'en', '$caseSensitive': False}, 
  'countries': 'USA'
}
projection: {'score': {'$meta': 'textScore'}, '_id': 0, 'title': 1}
  
Results
----------
document count: 259
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 12.8}
{'title': 'Zathura: A Space Adventure', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 10.27}
{'title': 'The Extraordinary Adventures of Adèle Blanc-Sec', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 10.14}
{'title': 'The Adventures of Tintin', 'genres': ['Animation', 'Action', 'Adventure'], 'score': 9.9}
{'title': 'Adventures in Babysitting', 'genres': ['Action', 'Adventure', 'Comedy'], 'score': 9.9}

Solr:

Parameters
----------
q: western action adventure
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title^2.0 genres^4.0', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 259
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 28.71}
{'title': 'Crossfire Trail', 'genres': ['Action', 'Western'], 'score': 25.05}
{'title': 'The Big Trail', 'genres': ['Adventure', 'Western', 'Romance'], 'score': 21.84}
{'title': 'Once Upon a Time in the West', 'genres': ['Western'], 'score': 21.05}
{'title': 'How the West Was Won', 'genres': ['Western'], 'score': 21.05}

Here is the actual Lucene query Solr will run.

+(
  ((title:adventur)^2.0 | plot:adventur | (genres:adventur)^4.0) 
  ((title:action)^2.0 | plot:action | (genres:action)^4.0) 
  ((title:western)^2.0 | plot:western | (genres:western)^4.0)
)

Query 6c: eDisMax Boosted with Required/Prohibited Terms

We can use both the plus (+) and minus (-) boolean operators to obtain more relevant search results. Let’s repeat the last Solr boosted query, but this time, also require any results to contain the search term, ‘western’, and prohibit the responses from containing the search term, ‘romance’. I would consider the new search results, based these modifications to the Solr query, to be more relevant to the original intent of the search, than the previous results. For example, the movie, ‘The Big Trail’, a romantic western adventure movie, according to its genres, is no longer included in the results sets.

Solr:

Parameters
----------
q: adventure action +western -romance
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title^2.0 genres^4.0',
  'fl': 'title, genres, score', 
  'rows': '5'
}

Results
----------
document count: 25
{'title': 'The Wild Bunch', 'genres': ['Action', 'Adventure', 'Western'], 'score': 28.71}
{'title': 'Crossfire Trail', 'genres': ['Action', 'Western'], 'score': 25.05}
{'title': 'Once Upon a Time in the West', 'genres': ['Western'], 'score': 21.05}
{'title': 'How the West Was Won', 'genres': ['Western'], 'score': 21.05}
{'title': 'Cowboy', 'genres': ['Western'], 'score': 21.05}

Here is the actual Lucene query Solr will run.

+(
  ((title:adventur)^2.0 | plot:adventur | (genres:adventur)^4.0) 
  ((title:action)^2.0 | plot:action | (genres:action)^4.0) 
  +((title:western)^2.0 | plot:western | (genres:western)^4.0) 
  -((title:romanc)^2.0 | plot:romanc | (genres:romanc)^4.0)
)

Query 7a: The Movie Dilemma

Frequently, end-users interact with a search engine, such as Google, through a search box. We type something into a search box and get back a list of relevant results. By now, most of us have learned how to phrase our Google search to get optimal results. However, the reality is, people can and will type just about anything into a search box.

To try and improve the average search results for end-users, search engineers will often try to tune query parameters, such as boosting the importance certain search fields over other, adjusting fuzzy search parameters, or ignore irrelevant words in the search phases by adding them to the stop words. Default English in Solr stop words include like: ‘a’, ‘an’, ‘and’, ‘are’, ‘as’, ‘at’, ‘be’, ‘but’, ‘by’, ‘for’, and so forth.

Take, for example, the word ‘movie’. Someone searching for a movie to watch, using a search box, might enter the phrase ‘A cowboy movie’. The term, ‘A’, is ignored as a stop word. This leaves the search terms ‘cowboy’ and ‘movie’ to be searched for in the title, plot, and genres fields. As we see by the top ten results below, most appear to be about cowboys, having the word ‘cowboy’ in their title or plot. Then there is the result, ‘TV: The Movie’. This does not appear to be a movie about cowboys. The word ‘cowboy’ is not in the title, plot, or genres, yet here it is in third place since it contains the word ‘movie’ in the title, plot, and/or genres.

Similarly, the top movie result, ‘Cowboy Bebop: The Movie’, is probably no more relevant than the second, third, or fourth place movies. However, ‘Cowboy Bebop: The Movie’ has scored significantly higher than even the number two results (11.24 vs. 7.31). This is because the movie’s title contains both search terms, ‘cowboy’ and ‘movie’, even though the word ‘movie’ is irrelevant to the original intent of the search phrase.

Solr:

Parameters
----------
q: A cowboys movie
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, genres, score', 
  'rows': '10'
}
  
Results
----------
document count: 23
{'title': 'Cowboy Bebop: The Movie', 'genres': ['Animation', 'Action', 'Crime'], 'score': 11.24}
{'title': 'Cowboy', 'genres': ['Western'], 'score': 7.31}
{'title': 'TV: The Movie', 'genres': ['Comedy'], 'score': 6.42}
{'title': 'Space Cowboys', 'genres': ['Action', 'Adventure', 'Thriller'], 'score': 6.33}
{'title': 'Midnight Cowboy', 'genres': ['Drama'], 'score': 6.33}
{'title': 'Drugstore Cowboy', 'genres': ['Crime', 'Drama'], 'score': 6.33}
{'title': 'Urban Cowboy', 'genres': ['Drama', 'Romance', 'Western'], 'score': 6.33}
{'title': 'The Cowboy Way', 'genres': ['Action', 'Comedy', 'Drama'], 'score': 6.33}
{'title': 'The Cowboy and the Lady', 'genres': ['Comedy', 'Drama', 'Romance'], 'score': 6.33}
{'title': 'Toy Story', 'genres': ['Animation', 'Adventure', 'Comedy'], 'score': 5.65}

Here is the actual Lucene query Solr will run.

+(
  (title:cowboi | plot:cowboi | genres:cowboi) 
  (title:movi | plot:movi | genres:movi)
)

Query 7b: Stop Words

To solve the movie dilemma, we might consider adding the word ‘movie’ to the stop words, since the word ‘movie’ seems to be irrelevant to the search phrase ‘A cowboys movie’, or to a movie search engine in general. However, this choice will negatively impact other searches. There are 12 movie titles containing the word ‘movie’. If you were searching for ‘The Lego Movie’, ignoring the word ‘movie’, as a stop word, would negatively impact the accuracy and relevance of your search results. You would end up with only one of two Lego movies in your search results, the one without the title that contained the word ‘movie’. Note only one of the two Lego movies is returned.

Solr:

Parameters
----------
q: The Lego Movie
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 1
{'title': 'Lego DC Comics Super Heroes: Justice League vs. Bizarro League', 'genres': ['Animation', 'Action', 'Adventure'], 'score': 3.63}

Here is the actual Lucene query Solr will run. Note neither the term ‘a’ and ‘movie’ are part of the search.

+((title:lego | plot:lego | genres:lego)

Query 7c: Negative Boost

A second method to solve the movie dilemma might be to negatively boost the word ‘movie’ when it appears in the title field, thus reducing its relevance. Negatively boosting fields, or more precisely, negatively boosting a specific field value, is possible with both the DisMax and eDisMax query parsers. We can assign a negative boost to the word ‘movie’ when it appears in the title field, by using the bq (Boost Query) parameter.  According to Solr, The bq parameter specifies an additional, optional, query clause that will be added to the user’s main query to influence the score. As Developers, we could programmatically append the negatively boosted term(s) into the query without directly altering the user’s original search phrase. Again, like stop words, boosting may also negatively impact other searches.

After some experimentation, we will try a boost value of -2. We still get 23 results, however, the top ten results now appear to be more relevant, based on the intent of our search phrase. The movies, ‘Cowboy Bebop: The Movie’ and ‘TV: The Movie’ are not present in the top ten results; their scoring was lowered. You can repeat this process for more search terms, like ‘movie’, positively or negatively boosting their scores, to improve the relevancy of the results.

Solr:

Parameters
----------
q: A cowboys movie
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'bq': 'title:movie^-2.0', 
  'fl': 'title, genres, score', 
  'rows': '10'
}
  
Results
----------
document count: 23
{'title': 'Cowboy', 'genres': ['Western'], 'score': 7.31}
{'title': 'Space Cowboys', 'genres': ['Action', 'Adventure', 'Thriller'], 'score': 6.33}
{'title': 'Midnight Cowboy', 'genres': ['Drama'], 'score': 6.33}
{'title': 'Drugstore Cowboy', 'genres': ['Crime', 'Drama'], 'score': 6.33}
{'title': 'Urban Cowboy', 'genres': ['Drama', 'Romance', 'Western'], 'score': 6.33}
{'title': 'The Cowboy Way', 'genres': ['Action', 'Comedy', 'Drama'], 'score': 6.33}
{'title': 'The Cowboy and the Lady', 'genres': ['Comedy', 'Drama', 'Romance'], 'score': 6.33}
{'title': 'Toy Story', 'genres': ['Animation', 'Adventure', 'Comedy'], 'score': 5.65}
{'title': "Ride 'Em Cowboy", 'genres': ['Comedy', 'Western', 'Musical'], 'score': 5.58}
{'title': "G.M. Whiting's Enemy", 'genres': ['Mystery'], 'score': 5.32}

Here is the actual Lucene query Solr will run, accounting for the negative boost.

+((title:cowboi | plot:cowboi | genres:cowboi) 
  (title:movi | plot:movi | genres:movi)) 
(title:movi)^-2.0

Function Query

Solr’s Lucene scoring is effective, but what if we wanted to use an additional, subjective measure of relevance to enrich our search results? If we examine the movies index schema, we will see there are quite a few rating-related fields that provide a sense of the movie’s quality, as judged by viewers and organizations. For example, there is a nested ‘tomato’ object, containing a number of qualitative data fields, such as the tomato rating and a tomato user rating. Tomato refers to Rotten Tomatoes. According to their site, Rotten Tomatoes provides the world’s most trusted recommendation resources for quality entertainment. Additionally, the movies index schema includes similar ‘awards’ and ‘imdb’ objects and a ‘metacritic’ rating. Here is a snippet of data from the movie, ‘Butch Cassidy and the Sundance Kid’, showing many of those qualitative fields.

  "imdb": {
    "id": "tt0064115",
    "rating": 8.1,
    "votes": 142642
  },
  "tomato": {
    "meter": 89,
    "image": "certified",
    "rating": 8.2,
    "reviews": 46,
    "fresh": 41,
    "consensus": "With its iconic pairing of Paul Newman and Robert Redford, jaunty screenplay and Burt Bacharach score, Butch Cassidy and the Sundance Kid has gone down as among the defining moments in late-'60s American cinema.",
    "userMeter": 93,
    "userRating": 4,
    "userReviews": 70088
  },
  "metacritic": 58,
  "awards": {
    "wins": 16,
    "nominations": 14,
    "text": "Won 4 Oscars. Another 16 wins & 14 nominations."
  }

According to Apache, Lucene scoring is a combination of the Vector Space Model (VSM) of Information Retrieval and the Boolean model. Lucene allows influencing search results by ‘boosting’. Using the Solr’s Function Query, we can apply a document-level multiplicative boost function, which will alter the scores of the query’s search results.

Query 8: Boost Function

If you remember in our previous example, we queried for ‘adventure action +western -romance’. If we run it again, without the boosted fields, we got back 25 documents, of which ‘Western Union’ ranked highest, with a score of 7.39.

Solr:

Parameters
----------
q: adventure action +western -romance
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, awards.wins, score', 
  'rows': '5'
}

Results
----------
document count: 25
{'title': 'Western Union', 'awards.wins': [0.0], 'score': 7.39}
{'title': 'The Wild Bunch', 'awards.wins': [5.0], 'score': 7.18}
{'title': 'Western Spaghetti', 'awards.wins': [2.0], 'score': 6.64}
{'title': 'Crossfire Trail', 'awards.wins': [1.0], 'score': 6.26}
{'title': 'Butch Cassidy and the Sundance Kid', 'awards.wins': [16.0], 'score': 6.23}

Here is the actual Lucene query Solr will run.

+(
  (title:adventur | plot:adventur | genres:adventur) 
  (title:action | plot:action | genres:action) 
  +(title:western | plot:western | genres:western) 
  -(title:romanc | plot:romanc | genres:romanc)
)

Now, we will apply a boost function. In the function below, I have arbitrarily taken the number of awards won by each movie and divided it in half. The function is applied to the eDisMax’s boost parameter.

div(field(awards.wins,min),2)

This function has a multiplicative effect on the Lucene scoring of the documents in the result set, by boosting scores in proportion to the number of awards each movie has won. We now get movies that are a blend of both relevant results, based on our search phrase, as well as those movies that are highly acclaimed.

The impact of the multiplicative boost function is immediately apparent with the top result, ‘Butch Cassidy and the Sundance Kid’. This widely acclaimed movie climbed from fifth place in the previous search to first place, using the boost formula. The movie, ‘Butch Cassidy and the Sundance Kid’, won an amazing 16 awards, which is 16 more than that of the previous first place movie, ‘Western Union’, which won no awards, moving it down all the way down to 17th place in the boosted results. A movie that wasn’t even in the top five results, ‘Wild Wild West’, is now in second place, having received ten awards.

The impact of the boost function is most apparent in the scores. Previously, the score delta between the first and fifth positions in the results was 1.16. The delta between the first and second position was a mear 0.21. Now, with the boost function applied, the range of scoring, and consequently, the two deltas increased significantly, 36.78 compared to 1.16 and 23.83 compared to 0.21.

Solr:

Parameters
----------
q: adventure action +western -romance
kwargs: {
  'defType': 'edismax', 
  'fq': 'countries: USA', 
  'qf': 'plot title genres', 
  'fl': 'title, awards.wins, score', 
  'boost': 'div(field(awards.wins,min),2)', 
  'rows': '5'
}

Results
----------
document count: 25
{'title': 'Butch Cassidy and the Sundance Kid', 'awards.wins': [16.0], 'score': 49.86}
{'title': 'Wild Wild West', 'awards.wins': [10.0], 'score': 26.03}
{'title': 'How the West Was Won', 'awards.wins': [7.0], 'score': 18.42}
{'title': 'The Wild Bunch', 'awards.wins': [5.0], 'score': 17.95}
{'title': 'All Quiet on the Western Front', 'awards.wins': [5.0], 'score': 13.08}

Here is the actual Lucene query Solr will run, using the boost function.

FunctionScoreQuery(+((title:adventur | plot:adventur | 
  genres:adventur) (title:action | plot:action | genres:action) 
  +(title:western | plot:western | genres:western) 
  -(title:romanc | plot:romanc | genres:romanc)), 
  scored by boost(div(double(awards.wins,MIN),const(2))))

Solr’s Function Query offers a large number of mathematical functions, which can be combined into complex formulas. For example, we could also take the square root of the sum of the IMDB rating and the number of award nominations.

sqrt(sum(field(imdb.rating,min),field(awards.wins,min)))

More Like This, Please

You’ve found a good movie, and now you want more movies just like that one. Maybe you want more movies by a particular director, or starring a certain actor, or based on the same theme. Solr has a solution for this, the More Like This Query Parser. The MLTQParser, for short, enables retrieving documents that are similar to a given document. It uses Lucene’s existing MoreLikeThis logic. To use the parser, you provide the unique Solr ID of the document you want to find more like and the field(s) to use for the comparison. For example:

q: {!mlt qf="genres"}da54520e-a013-4ea3-9698-230ed02c8cf0

Query 9a: MLT Genres

In the first MLTQParser query, we will select the movie, ‘Star Wars: Episode I – The Phantom Menace’. We will look for more movies, produced in the USA, that are similar to ‘Star Wars: Episode I – The Phantom Menace’, by looking for similarities in the genres field. The MLTQParser’s qf field specifies the fields to use for similarity. We will require amintf (Minimum Term Frequency) of 1. This is the frequency below which search terms will be ignored in the source document. We will also require a mindf (Minimum Document Frequency) of 1. This is the frequency at which words will be ignored when they do not occur in at least this many documents.

The results of the Solr search appear logical, they are the other five Star Wars movies. Note that since the first five results are exact matches on The Phantom Menace’s three genres, ‘action’, ‘adventure’, and ‘fantasy’, their scores are identical, 6.33.

Solr:

Parameters
----------
q: {!mlt qf="genres" mintf=1 mindf=1}aaf956a5-afa5-4284-91c1-69455142884f
kwargs: {
  'defType': 'lucene', 
  'fq': 'countries: USA', 
  'fl': 'title, genres, score', 
  'rows': '5'
}
  
Results
----------
document count: 252
{'title': 'Star Wars: Episode IV - A New Hope', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}
{'title': 'Star Wars: Episode V - The Empire Strikes Back', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'genres': ['Action', 'Adventure', 'Fantasy'], 'score': 6.33}

Here is the actual Lucene query Solr will run.

+(genres:action genres:adventur genres:fantasi) 
-id:652adcfa-c59c-4fa0-ace5-6345fec3cfff

Query 9b: The Problem with George

In the second MLTQParser query example, we will again choose the movie, ‘Star Wars: Episode I – The Phantom Menace’. However, this time will look for similar movies based on a comparison of the actors, director, and writers fields (shown below). Basically, we are looking for similarities between the people associated with ‘Star Wars: Episode I – The Phantom Menace’ and other movies.

Solr:

Parameters
----------
q: id:"aaf956a5-afa5-4284-91c1-69455142884f"
kwargs: {
  'defType': 'lucene', 
  'fl': 'actors, director, writers'
}
  
Results
----------
document count: 1
{
  'director': ['George Lucas'], 
  'writers': ['George Lucas'], 
  'actors': ['Liam Neeson', 'Ewan McGregor', 'Natalie Portman', 'Jake Lloyd']
}

As you can tell by the results of the MLTQParser query below, the first nine out of ten search results make sense. However, the tenth movie result, ‘New Meet Me on South Street: The Story of JC Dobbs’, has no obvious similarities with ‘Star Wars: Episode I – The Phantom Menace’. The movie does not share a common director, writer, or actor.

Solr:

Parameters
----------
q: {!mlt qf="actors director writers" mintf=1 mindf=1}aaf956a5-afa5-4284-91c1-69455142884f
kwargs: {
  'defType': 'lucene', 
  'fq': 'countries: USA', 
  'fl': 'title, actors, director, writers, score', 
  'rows': '10'
}
  
Results
----------
document count: 55
{'title': 'Star Wars: Episode III - Revenge of the Sith', 'director': ['George Lucas'], 'writers': ['George Lucas'], 'actors': ['Ewan McGregor', 'Natalie Portman', 'Hayden Christensen', 'Ian McDiarmid'], 'score': 44.84}
{'title': 'Star Wars: Episode II - Attack of the Clones', 'director': ['George Lucas'], 'writers': ['George Lucas', 'Jonathan Hales', 'George Lucas'], 'actors': ['Ewan McGregor', 'Natalie Portman', 'Hayden Christensen', 'Christopher Lee'], 'score': 44.58}
{'title': 'Star Wars: Episode IV - A New Hope', 'director': ['George Lucas'], 'writers': ['George Lucas'], 'actors': ['Mark Hamill', 'Harrison Ford', 'Carrie Fisher', 'Peter Cushing'], 'score': 23.51}
{'title': 'Star Wars: Episode VI - Return of the Jedi', 'director': ['Richard Marquand'], 'writers': ['Lawrence Kasdan', 'George Lucas', 'George Lucas'], 'actors': ['Mark Hamill', 'Harrison Ford', 'Carrie Fisher', 'Billy Dee Williams'], 'score': 11.96}
{'title': 'A Million Ways to Die in the West', 'director': ['Seth MacFarlane'], 'writers': ['Seth MacFarlane', 'Alec Sulkin', 'Wellesley Wild'], 'actors': ['Seth MacFarlane', 'Charlize Theron', 'Amanda Seyfried', 'Liam Neeson'], 'score': 11.72}
{'title': 'Run All Night', 'director': ['Jaume Collet-Serra'], 'writers': ['Brad Ingelsby'], 'actors': ['Liam Neeson', 'Ed Harris', 'Joel Kinnaman', 'Boyd Holbrook'], 'score': 11.72}
{'title': 'I Love You Phillip Morris', 'director': ['Glenn Ficarra, John Requa'], 'writers': ['John Requa', 'Glenn Ficarra', 'Steve McVicker'], 'actors': ['Jim Carrey', 'Ewan McGregor', 'Leslie Mann', 'Rodrigo Santoro'], 'score': 10.97}
{'title': 'The Island', 'director': ['Michael Bay'], 'writers': ['Caspian Tredwell-Owen', 'Alex Kurtzman', 'Roberto Orci', 'Caspian Tredwell-Owen'], 'actors': ['Ewan McGregor', 'Scarlett Johansson', 'Djimon Hounsou', 'Sean Bean'], 'score': 10.97}
{'title': 'Big Fish', 'director': ['Tim Burton'], 'writers': ['Daniel Wallace', 'John August'], 'actors': ['Ewan McGregor', 'Albert Finney', 'Billy Crudup', 'Jessica Lange'], 'score': 10.97}
{'title': 'New Meet Me on South Street: The Story of JC Dobbs', 'director': ['George Manney'], 'writers': ['George Manney'], 'actors': ['Tony Bidgood', 'Peter Stone Brown', 'Stephen Caldwell', 'Tommy Conwell'], 'score': 10.5}

Here is the actual Lucene query Solr run.

+(writers:george director:george actors:natalie writers:lucas 
  actors:jake actors:portman actors:ewan actors:mcgregor 
  actors:liam actors:lloyd director:lucas actors:neeson) 
-id:652adcfa-c59c-4fa0-ace5-6345fec3cfff

Examining the query, we can plainly see the problem with MLTQParser. The MLTQParser query is splitting the first and last names of actors, directors, and writers, then searching for each name individually, but not their whole name. In my opinion, this is a bug with the MLTQParser, since each value in the actors, director, and writers MultiValued fields are wrapped in double quotes. The query should treat each value an exact phrase, not individual search terms.

Given the MLTQParser’s query logic, it is now clear why a seemingly irrelevant movie, like ‘New Meet Me on South Street: The Story of JC Dobbs’, was part of the search results. Examining the debug output of the scoring explanation, we see the following.

"544637e5-e96d-4f62-9b22-5174c60ee512":"
10.504457 = sum of:
  10.504457 = sum of:
    5.5608187 = weight(writers:george in 649) [SchemaSimilarity], result of:
      5.5608187 = score(doc=649,freq=1.0 = termFreq=1.0
), product of:
        4.226696 = idf, computed as log(1 + (docCount - docFreq + 0.5) / (docFreq + 0.5)) from:
          26.0 = docFreq
          1814.0 = docCount
        1.315642 = tfNorm, computed as (freq * (k1 + 1)) / (freq + k1 * (1 - b + b * fieldLength / avgFieldLength)) from:
          1.0 = termFreq=1.0
          1.2 = parameter k1
          0.75 = parameter b
          4.836273 = avgFieldLength
          2.0 = fieldLength
    4.943639 = weight(director:george in 649) [SchemaSimilarity], result of:
      4.943639 = score(doc=649,freq=1.0 = termFreq=1.0
), product of:
        4.6206594 = idf, computed as log(1 + (docCount - docFreq + 0.5) / (docFreq + 0.5)) from:
          20.0 = docFreq
          2081.0 = docCount
        1.069899 = tfNorm, computed as (freq * (k1 + 1)) / (freq + k1 * (1 - b + b * fieldLength / avgFieldLength)) from:
          1.0 = termFreq=1.0
          1.2 = parameter k1
          0.75 = parameter b
          2.3801057 = avgFieldLength
          2.0 = fieldLength
"

George Manney was both the Director and Writer of ‘New Meet Me on South Street: The Story of JC Dobbs’. George Manney shares a first name with George Lucas, the Director and Writer of ‘Star Wars: Episode I – The Phantom Menace’. This is the only similarity between people associated with both movies. Therefore, there were two matches, one match on the director fields and a match on the writers field. Unfortunately, having the same first names negatively impacts the results from the MLTQParser.

Synonyms

For our last example, we will examine the use of synonyms with Solr. In respect to the movie index, we could perform the following eDisMax search on the title, plot, and genres fields: ‘scary’ OR ‘slasher’ OR ‘spooky’ OR ‘evil’ OR ‘horror’, or more simply ‘scary slasher spooky evil horror’. Based on this search, we would get back a truly gruesome collection of 141 films. The search is effective because it uses multiple, similar search terms to return a larger resultset of movies within a similar theme. However, the search relies on each end-user to enter the same relevant search terms, every time.

With Solr’s synonym capability, we can build some intelligence into our index by defining synonymous relationships between terms. There are multiple ways to define synonymous relationships between terms in Solr. Lucidworks has an excellent article, Synonyms Files, on the different synonymous relationships. We will look at three types of relationships, as described by Lucidworks: Replacement Synonyms, Oneway Expansion Synonyms, and Multiway Expansion Synonyms.

I have pre-define some examples for each of the three types of relationships, in the movies index’s synonmys.txt configuration file. This file is created when a new index is created.

## Custom synonym groups for movies index ##

# Replacement Synonyms examples
scarey => scary
ciborg => cyborg

# Multiway Expansion Synonyms examples
scary,slasher,spooky,evil,horror

# Oneway Expansion Synonyms examples
droid => droid,android,robot,cyborg

There is a copy of the file in this project, which will be used by the movies index, running in the Docker container.

Screen Shot 2019-02-25 at 7.57.41 AM

Query 10a: Replacement Synonyms

We will start with a Replacement Synonyms example. I have added the following synonym mapping for a common misspelling of the word ‘cyborg’.

ciborg => cyborg

If we perform a search for the term ‘ciborg’, Solr will substitute it with the term ‘cyborg’. We can confirm this by viewing the query, as shown in the debug snippet below.

+(title:cyborg | plot:cyborg | genres:cyborg)

Performing the query returns two documents, including the most famous cyborg, Arnold Schwarzenegger, the Terminator.

Parameters
----------
q: ciborg
kwargs: {
  'defType': 'edismax', 
  'qf': 'title plot genres', 
  'fl': 'title, score', 
  'rows': '5'
}

Results
----------
document count: 2
{'title': 'Terminator 2: Judgment Day', 'score': 8.17}
{'title': "I'm a Cyborg, But That's OK", 'score': 7.13}

Query 10b: Oneway Expansion Synonyms

Next, we will demonstrate Oneway Expansion Synonyms. I have added the following synonym mapping for the word, ‘droid’. Note we must include the word ‘droid’ in the expansion synonyms on the right side of the mapping, as well as on the left.

droid => droid,android,robot,cyborg

When we perform a search on the term ‘droid’, Solr will also search for the synonyms ‘android’, ‘robot’, and ‘cyborg’. We can confirm this by viewing the query Solr constructs for the search term ‘droid’, as shown in the debugger snippet below.

+(
  Synonym(title:android title:cyborg title:droid title:robot) | 
  Synonym(plot:android plot:cyborg plot:droid plot:robot) | 
  Synonym(genres:android genres:cyborg genres:droid genres:robot)
)

Note the converse is not true since this is a one-way relationship. If we search on ‘cyborg’, Solr will not search on ‘droid’, ‘android’, and ‘robot’.

+(title:cyborg | plot:cyborg | genres:cyborg)

Performing the ‘droid’ eDisMax query returns 15 documents.

Parameters
----------
q: droid
kwargs: {
  'defType': 'edismax', 
  'qf': 'title plot genres', 
  'fl': 'title, score', 
  'rows': '5'
}

Results
----------
document count: 15
{'title': 'Robo Jî', 'score': 7.67}
{'title': "I'm a Cyborg, But That's OK", 'score': 7.13}
{'title': 'BV-01', 'score': 6.6}
{'title': 'Robot Chicken: DC Comics Special', 'score': 6.44}
{'title': 'Terminator 2: Judgment Day', 'score': 6.23}

Query 10c: Multiway Expansion Synonyms

Lastly, we will demonstrate Multiway Expansion Synonyms. I have added the following synonym mapping for the word, ‘scary’.

scary,slasher,spooky,evil,horror

If we perform a search on the term ‘scary’, Solr will also search for the synonyms ‘slasher’, ‘spooky’, ‘evil’, and ‘horror’. Unlike the previous example, the converse is true since this is a multi-way relationship. If we search on any of the five synonyms, Solr will also search on the other four terms and return identical results. We can confirm this by viewing the query Solr constructs for the term ‘scary’, as shown in the debug snippet below.

+(Synonym(title:evil title:horror title:scari title:slasher 
  title:spooki) | Synonym(plot:evil plot:horror plot:scari 
  plot:slasher plot:spooki) | Synonym(genres:evil genres:horror 
  genres:scari genres:slasher genres:spooki))

Performing the ‘scary’ eDisMax query returns 141 documents.

Parameters
----------
q: scary
kwargs: {
  'defType': 'edismax', 
  'qf': 'title plot genres', 
  'fl': 'title, score', 
  'rows': '5'
}

Results
----------
document count: 141
{'title': 'See No Evil, Hear No Evil', 'score': 7.9}
{'title': 'The Evil Dead', 'score': 7.23}
{'title': 'Evil Dead', 'score': 7.23}
{'title': 'Evil Ed', 'score': 7.23}
{'title': 'Evil Dead II', 'score': 6.37}

You may have noticed the other replacement synonym mapping I placed in the index’s synonmys.txt configuration file, shown below.

scarey => scary

You might be wondering if you entered the common misspelling ‘scarey’ as a search term, would Solr replace the term with ‘scary’ and search on the term ‘scary’, but also search on it’s four synonyms, ‘slasher’, ‘spooky’, ‘evil’, and ‘horror’. The answer is no, Solr will only search on ‘scary’. However, Solr does not create indirect or secondary relationships between synonym mappings. You would have to correct the spelling and input the term correctly to take advantage of the multi-way relationship.

Managing Unique Vocabulary with Synonyms

Large corporations, industry verticles, government agencies, and other entities often use a unique lexicon. Their vocabulary includes uncommon terms, phrases, acronyms, abbreviations, and other idioms. These commonly represent products and services, organizational structures, and technical jargon. Synonyms are an excellent way to support domain-specific dictions within indexes. We see this in the examples Solr includes in the synonmys.txt configuration file. The two examples, shown below, demonstrate how associate multiple abbreviations and technical terms to equivalent amounts of compute.

# Some synonym groups specific to this example
GB,gib,gigabyte,gigabytes
MB,mib,megabyte,megabytes

10d: Synonymous Phrases

A largely undocumented feature of synonyms is the ability to create synonymous relationships between common acronyms, abbreviations, terms, and phrases. Below I have provided a few examples of how to create these relationships. The only apparent, logical limitation, you cannot use stop words in the phrases; a good reason not to overuse stop words.

ai,artificial intelligence
cia,central intelligence agency
fbi,federal bureau investigation
lol,laughing out loud,league legends

For example, If we include the full phrase ‘league of legends’ in the synonyms map with ‘lol’, Solr ignores this phrase when I search on ‘LOL’. However, if we remove the stop word, ‘of’, then Solr will create a query that requires the two terms, ‘league’ and ‘legends’, or the two terms ‘laugh’ and ‘loud’, or the single term ‘lol’.

+(
  (((+title:laugh +title:out +title:loud) (+title:leagu +title:legend) title:lol)) | 
  (((+plot:laugh +plot:out +plot:loud) (+plot:leagu +plot:legend) plot:lol)) | 
  (((+genres:laugh +genres:out +genres:loud) (+genres:leagu +genres:legend) genres:lol))
)

Solr:

Parameters
----------
q: lol
kwargs: {
  'defType': 'edismax', 
  'qf': 'title plot genres', 
  'fl': 'title, score', 
  'rows': '5'
}

Results
----------
document count: 1
{'title': 'JK LOL', 'score': 9.05}

Conclusion

By understanding the capabilities of Apache Solr, the characteristics of the data contained in your indexes and the search patterns of your end users, you will be able to craft queries that ensure responses contain high-quality, relevant search results.

The query examples in this post demonstrate only a very small portion of Solr’s vast search capabilities. There are several additional query examples available in each of the two Python scripts, which you can uncomment and explore their results, further.

Solr’s documentation is very good; to learn more about Solr’s capabilities, I suggest reviewing the various parsers and their options, in the current Solr version 7.6 documentation.

I also suggest reviewing Solr’s Analyzers, Tokenizers, and Filters, and understand how they affect the way in which Solr indexes documents and how Solr interprets the content of the indexes when performing a search.

 

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

Feature Illustration Copyright: Dejan Bozic (123RF)

, , , , , , ,

1 Comment

Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine

Leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications. In previous posts, we have integrated other SaaS products, including as MongoDB Atlas fully-managed MongoDB-as-a-service, ElephantSQL fully-manage PostgreSQL-as-a-service, and CloudAMQP RabbitMQ-as-a-service, into cloud-native applications on Azure, AWS, GCP, and PCF.

In this post, we will build and deploy an existing, Spring Framework, microservice-based, cloud-native API to Google Kubernetes Engine (GKE), replete with Istio 1.0, on Google Cloud Platform (GCP). The API will rely on Confluent Cloud to provide a fully-managed, Kafka-based messaging-as-a-service (MaaS). Similarly, the API will rely on MongoDB Atlas to provide a fully-managed, MongoDB-based Database-as-a-service (DBaaS).

Background

In a previous two-part post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1 and Part 2, we examined the role of Apache Kafka in an event-driven, eventually consistent, distributed system architecture. The system, an online storefront RESTful API simulation, was composed of multiple, Java Spring Boot microservices, each with their own MongoDB database. The microservices used a publish/subscribe model to communicate with each other using Kafka-based messaging. The Spring services were built using the Spring for Apache Kafka and Spring Data MongoDB projects.

Given the use case of placing an order through the Storefront API, we examined the interactions of three microservices, the Accounts, Fulfillment, and Orders service. We examined how the three services used Kafka to communicate state changes to each other, in a fully-decoupled manner.

The Storefront API’s microservices were managed behind an API Gateway, Netflix’s Zuul. Service discovery and load balancing were handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. In that post, the entire containerized system was deployed to Docker Swarm.

Kafka-Eventual-Cons-Swarm.png

Developing the services, not operationalizing the platform, was the primary objective of the previous post.

Featured Technologies

The following technologies are featured prominently in this post.

Confluent Cloud

confluent_cloud_apache-300x228

In May 2018, Google announced a partnership with Confluence to provide Confluent Cloud on GCP, a managed Apache Kafka solution for the Google Cloud Platform. Confluent, founded by the creators of Kafka, Jay Kreps, Neha Narkhede, and Jun Rao, is known for their commercial, Kafka-based streaming platform for the Enterprise.

Confluent Cloud is a fully-managed, cloud-based streaming service based on Apache Kafka. Confluent Cloud delivers a low-latency, resilient, scalable streaming service, deployable in minutes. Confluent deploys, upgrades, and maintains your Kafka clusters. Confluent Cloud is currently available on both AWS and GCP.

Confluent Cloud offers two plans, Professional and Enterprise. The Professional plan is optimized for projects under development, and for smaller organizations and applications. Professional plan rates for Confluent Cloud start at $0.55/hour. The Enterprise plan adds full enterprise capabilities such as service-level agreements (SLAs) with a 99.95% uptime and virtual private cloud (VPC) peering. The limitations and supported features of both plans are detailed, here.

MongoDB Atlas

mongodb

Similar to Confluent Cloud, MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime SLAs, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

MongoDB Atlas has been featured in several past posts, including Deploying and Configuring Istio on Google Kubernetes Engine (GKE) and Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.

Kubernetes Engine

gkeAccording to Google, Google Kubernetes Engine (GKE) provides a fully-managed, production-ready Kubernetes environment for deploying, managing, and scaling your containerized applications using Google infrastructure. GKE consists of multiple Google Compute Engine instances, grouped together to form a cluster.

A forerunner to other managed Kubernetes platforms, like EKS (AWS), AKS (Azure), PKS (Pivotal), and IBM Cloud Kubernetes Service, GKE launched publicly in 2015. GKE was built on Google’s experience of running hyper-scale services like Gmail and YouTube in containers for over 12 years.

GKE’s pricing is based on a pay-as-you-go, per-second-billing plan, with no up-front or termination fees, similar to Confluent Cloud and MongoDB Atlas. Cluster sizes range from 1 – 1,000 nodes. Node machine types may be optimized for standard workloads, CPU, memory, GPU, or high-availability. Compute power ranges from 1 – 96 vCPUs and memory from 1 – 624 GB of RAM.

Demonstration

In this post, we will deploy the three Storefront API microservices to a GKE cluster on GCP. Confluent Cloud on GCP will replace the previous Docker-based Kafka implementation. Similarly, MongoDB Atlas will replace the previous Docker-based MongoDB implementation.

ConfluentCloud-v3a.png

Kubernetes and Istio 1.0 will replace Netflix’s Zuul and  Eureka for API management, load-balancing, routing, and service discovery. Google Stackdriver will provide logging and monitoring. Docker Images for the services will be stored in Google Container Registry. Although not fully operationalized, the Storefront API will be closer to a Production-like platform, than previously demonstrated on Docker Swarm.

ConfluentCloudRouting.png

For brevity, we will not enable standard API security features like HTTPS, OAuth for authentication, and request quotas and throttling, all of which are essential in Production. Nor, will we integrate a full lifecycle API management tool, like Google Apigee.

Source Code

The source code for this demonstration is contained in four separate GitHub repositories, storefront-kafka-dockerstorefront-demo-accounts, storefront-demo-orders, and, storefront-demo-fulfillment. However, since the Docker Images for the three storefront services are available on Docker Hub, it is only necessary to clone the storefront-kafka-docker project. This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources (gist).

Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

Setup Process

The setup of the Storefront API platform is divided into a few logical steps:

  1. Create the MongoDB Atlas cluster;
  2. Create the Confluent Cloud Kafka cluster;
  3. Create Kafka topics;
  4. Modify the Kubernetes resources;
  5. Modify the microservices to support Confluent Cloud configuration;
  6. Create the GKE cluster with Istio on GCP;
  7. Apply the Kubernetes resources to the GKE cluster;
  8. Test the Storefront API, Kafka, and MongoDB are functioning properly;

MongoDB Atlas Cluster

This post assumes you already have a MongoDB Atlas account and an existing project created. MongoDB Atlas accounts are free to set up if you do not already have one. Account creation does require the use of a Credit Card.

For minimal latency, we will be creating the MongoDB Atlas, Confluent Cloud Kafka, and GKE clusters, all on the Google Cloud Platform’s us-central1 Region. Available GCP Regions and Zones for MongoDB Atlas, Confluent Cloud, and GKE, vary, based on multiple factors.

screen_shot_2018-12-23_at_6.48.12_pm

For this demo, I suggest creating a free, M0-sized MongoDB cluster. The M0-sized 3-data node cluster, with shared RAM and 512 MB of storage, and currently running MongoDB 4.0.4, is fine for individual development. The us-central1 Region is the only available US Region for the free-tier M0-cluster on GCP. An M0-sized Atlas cluster may take between 7-10 minutes to provision.

screen_shot_2018-12-23_at_6.49.24_pm

MongoDB Atlas’ Web-based management console provides convenient links to cluster details, metrics, alerts, and documentation.

screen_shot_2018-12-23_at_6.51.41_pm

Once the cluster is ready, you can review details about the cluster and each individual cluster node.

screen_shot_2018-12-23_at_6.51.54_pm

In addition to the account owner, create a demo_user account. This account will be used to authenticate and connect with the MongoDB databases from the storefront services. For this demo, we will use the same, single user account for all three services. In Production, you would most likely have individual users for each service.

screen_shot_2018-12-23_at_6.52.18_pm

Again, for security purposes, Atlas requires you to whitelist the IP address or CIDR block from which the storefront services will connect to the cluster. For now, open the access to your specific IP address using whatsmyip.com, or much less-securely, to all IP addresses (0.0.0.0/0). Once the GKE cluster and external static IP addresses are created, make sure to come back and update this value; do not leave this wide open to the Internet.

screen_shot_2018-12-23_at_6.52.36_pm

The Java Spring Boot storefront services use a Spring Profile, gke. According to Spring, Spring Profiles provide a way to segregate parts of your application configuration and make it available only in certain environments. The gke Spring Profile’s configuration values may be set in a number of ways. For this demo, the majority of the values will be set using Kubernetes Deployment, ConfigMap and Secret resources, shown later.

The first two Spring configuration values will need are the MongoDB Atlas cluster’s connection string and the demo_user account password. Note these both for later use.

screen_shot_2018-12-23_at_6.53.00_pm

Confluent Cloud Kafka Cluster

Similar to MongoDB Atlas, this post assumes you already have a Confluent Cloud account and an existing project. It is free to set up a Professional account and a new project if you do not already have one. Atlas account creation does require the use of a Credit Card.

The Confluent Cloud web-based management console is shown below. Experienced users of other SaaS platforms may find the Confluent Cloud web-based console a bit sparse on features. In my opinion, the console lacks some necessary features, like cluster observability, individual Kafka topic management, detailed billing history (always says $0?), and persistent history of cluster activities, which survives cluster deletion. It seems like Confluent prefers users to download and configure their Confluent Control Center to get the functionality you might normally expect from a web-based Saas management tool.

screen_shot_2018-12-23_at_6.34.18_pm

As explained earlier, for minimal latency, I suggest creating the MongoDB Atlas cluster, Confluent Cloud Kafka cluster, and the GKE cluster, all on the Google Cloud Platform’s us-central1 Region. For this demo, choose the smallest cluster size available on GCP, in the us-central1 Region, with 1 MB/s R/W throughput and 500 MB of storage. As shown below, the cost will be approximately $0.55/hour. Don’t forget to delete this cluster when you are done with the demonstration, or you will continue to be charged.

screen_shot_2018-12-23_at_6.34.56_pm

Cluster creation of the minimally-sized Confluent Cloud cluster is pretty quick.

screen_shot_2018-12-23_at_6.39.52_pmOnce the cluster is ready, Confluent provides instructions on how to interact with the cluster via the Confluent Cloud CLI. Install the Confluent Cloud CLI, locally, for use later.

screen_shot_2018-12-23_at_6.35.56_pm

As explained earlier, the Java Spring Boot storefront services use a Spring Profile, gke. Like MongoDB Atlas, the Confluent Cloud Kafka cluster configuration values will be set using Kubernetes ConfigMap and Secret resources, shown later. There are several Confluent Cloud Java configuration values shown in the Client Config Java tab; we will need these for later use.

screen_shot_2018-12-23_at_6.36.12_pm

SASL and JAAS

Some users may not be familiar with the terms, SASL and JAAS. According to Wikipedia, Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols. According to Confluent, Kafka brokers support client authentication via SASL. SASL authentication can be enabled concurrently with SSL encryption (SSL client authentication will be disabled).

There are numerous SASL mechanisms.  The PLAIN SASL mechanism (SASL/PLAIN), used by Confluent, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use. The SASL/PLAIN mechanism should only be used with SSL as a transport layer to ensure that clear passwords are not transmitted on the wire without encryption.

According to Wikipedia, Java Authentication and Authorization Service (JAAS) is the Java implementation of the standard Pluggable Authentication Module (PAM) information security framework. According to Confluent, Kafka uses the JAAS for SASL configuration. You must provide JAAS configurations for all SASL authentication mechanisms.

Cluster Authentication

Similar to MongoDB Atlas, we need to authenticate with the Confluent Cloud cluster from the storefront services. The authentication to Confluent Cloud is done with an API Key. Create a new API Key, and note the Key and Secret; these two additional pieces of configuration will be needed later.

screen_shot_2018-12-23_at_6.38.09_pm

Confluent Cloud API Keys can be created and deleted as necessary. For security in Production, API Keys should be created for each service and regularly rotated.

screen_shot_2018-12-23_at_6.38.21_pm

Kafka Topics

With the cluster created, create the storefront service’s three Kafka topics manually, using the Confluent Cloud’s ccloud CLI tool. First, configure the Confluent Cloud CLI using the ccloud init command, using your new cluster’s Bootstrap Servers address, API Key, and API Secret. The instructions are shown above Clusters Client Config tab of the Confluent Cloud web-based management interface.

screen_shot_2018-12-26_at_2.05.09_pm

Create the storefront service’s three Kafka topics using the ccloud topic create command. Use the list command to confirm they are created.

# manually create kafka topics
ccloud topic create accounts.customer.change
ccloud topic create fulfillment.order.change
ccloud topic create orders.order.fulfill
  
# list kafka topics
ccloud topic list
  
accounts.customer.change
fulfillment.order.change
orders.order.fulfill

Another useful ccloud command, topic describe, displays topic replication details. The new topics will have a replication factor of 3 and a partition count of 12.

screen_shot_2018-12-26_at_5.03.11_pm

Adding the --verbose flag to the command, ccloud --verbose topic describe, displays low-level topic and cluster configuration details, as well as a log of all topic-related activities.

screen_shot_2018-12-26_at_5.07.20_pm

Kubernetes Resources

The deployment of the three storefront microservices to the dev Namespace will minimally require the following Kubernetes configuration resources.

  • (1) Kubernetes Namespace;
  • (3) Kubernetes Deployments;
  • (3) Kubernetes Services;
  • (1) Kubernetes ConfigMap;
  • (2) Kubernetes Secrets;
  • (1) Istio 1.0 Gateway;
  • (1) Istio 1.0 VirtualService;
  • (2) Istio 1.0 ServiceEntry;

The Istio networking.istio.io v1alpha3 API introduced the last three configuration resources in the list, to control traffic routing into, within, and out of the mesh. There are a total of four new io networking.istio.io v1alpha3 API routing resources: Gateway, VirtualService, DestinationRule, and ServiceEntry.

Creating and managing such a large number of resources is a common complaint regarding the complexity of Kubernetes. Imagine the resource sprawl when you have dozens of microservices replicated across several namespaces. Fortunately, all resource files for this post are included in the storefront-kafka-docker project’s gke directory.

To follow along with the demo, you will need to make minor modifications to a few of these resources, including the Istio Gateway, Istio VirtualService, two Istio ServiceEntry resources, and two Kubernetes Secret resources.

Istio Gateway & VirtualService

Both the Istio Gateway and VirtualService configuration resources are contained in a single file, istio-gateway.yaml. For the demo, I am using a personal domain, storefront-demo.com, along with the sub-domain, api.dev, to host the Storefront API. The domain’s primary A record (‘@’) and sub-domain A record are both associated with the external IP address on the frontend of the load balancer. In the file, this host is configured for the Gateway and VirtualService resources. You can choose to replace the host with your own domain, or simply remove the host block altogether on lines 13–14 and 21–22. Removing the host blocks, you would then use the external IP address on the frontend of the load balancer (explained later in the post) to access the Storefront API (gist).

Istio ServiceEntry

There are two Istio ServiceEntry configuration resources. Both ServiceEntry resources control egress traffic from the Storefront API services, both of their ServiceEntry Location items are set to MESH_INTERNAL. The first ServiceEntry, mongodb-atlas-external-mesh.yaml, defines MongoDB Atlas cluster egress traffic from the Storefront API (gist).

The other ServiceEntry, confluent-cloud-external-mesh.yaml, defines Confluent Cloud Kafka cluster egress traffic from the Storefront API (gist).

Both need to have their host items replaced with the appropriate Atlas and Confluent URLs.

Inspecting Istio Resources

The easiest way to view Istio resources is from the command line using the istioctl and kubectl CLI tools.

istioctl get gateway
istioctl get virtualservices
istioctl get serviceentry
  
kubectl describe gateway
kubectl describe virtualservices
kubectl describe serviceentry

Multiple Namespaces

In this demo, we are only deploying to a single Kubernetes Namespace, dev. However, Istio will also support routing traffic to multiple namespaces. For example, a typical non-prod Kubernetes cluster might support devtest, and uat, each associated with a different sub-domain. One way to support multiple Namespaces with Istio 1.0 is to add each host to the Istio Gateway (lines 14–16, below), then create a separate Istio VirtualService for each Namespace. All the VirtualServices are associated with the single Gateway. In the VirtualService, each service’s host address is the fully qualified domain name (FQDN) of the service. Part of the FQDN is the Namespace, which we change for each for each VirtualService (gist).

MongoDB Atlas Secret

There is one Kubernetes Secret for the sensitive MongoDB configuration and one Secret for the sensitive Confluent Cloud configuration. The Kubernetes Secret object type is intended to hold sensitive information, such as passwords, OAuth tokens, and SSH keys.

The mongodb-atlas-secret.yaml file contains the MongoDB Atlas cluster connection string, with the demo_user username and password, one for each of the storefront service’s databases (gist).

Kubernetes Secrets are Base64 encoded. The easiest way to encode the secret values is using the Linux base64 program. The base64 program encodes and decodes Base64 data, as specified in RFC 4648. Pass each MongoDB URI string to the base64 program using echo -n.

MONGODB_URI=mongodb+srv://demo_user:your_password@your_cluster_address/accounts?retryWrites=true
echo -n $MONGODB_URI | base64

bW9uZ29kYitzcnY6Ly9kZW1vX3VzZXI6eW91cl9wYXNzd29yZEB5b3VyX2NsdXN0ZXJfYWRkcmVzcy9hY2NvdW50cz9yZXRyeVdyaXRlcz10cnVl

Repeat this process for the three MongoDB connection strings.

screen_shot_2018-12-26_at_2.15.21_pm

Confluent Cloud Secret

The confluent-cloud-kafka-secret.yaml file contains two data fields in the Secret’s data map, bootstrap.servers and sasl.jaas.config. These configuration items were both listed in the Client Config Java tab of the Confluent Cloud web-based management console, as shown previously. The sasl.jaas.config data field requires the Confluent Cloud cluster API Key and Secret you created earlier. Again, use the base64 encoding process for these two data fields (gist).

Confluent Cloud ConfigMap

The remaining five Confluent Cloud Kafka cluster configuration values are not sensitive, and therefore, may be placed in a Kubernetes ConfigMapconfluent-cloud-kafka-configmap.yaml (gist).

Accounts Deployment Resource

To see how the services consume the ConfigMap and Secret values, review the Accounts Deployment resource, shown below. Note the environment variables section, on lines 44–90, are a mix of hard-coded values and values referenced from the ConfigMap and two Secrets, shown above (gist).

Modify Microservices for Confluent Cloud

As explained earlier, Confluent Cloud’s Kafka cluster requires some very specific configuration, based largely on the security features of Confluent Cloud. Connecting to Confluent Cloud requires some minor modifications to the existing storefront service source code. The changes are identical for all three services. To understand the service’s code, I suggest reviewing the previous post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1. Note the following changes are already made to the source code in the gke git branch, and not necessary for this demo.

The previous Kafka SenderConfig and ReceiverConfig Java classes have been converted to Java interfaces. There are four new SenderConfigConfluent, SenderConfigNonConfluent, ReceiverConfigConfluent, and ReceiverConfigNonConfluent classes, which implement one of the new interfaces. The new classes contain the Spring Boot Profile class-level annotation. One set of Sender and Receiver classes are assigned the @Profile("gke") annotation, and the others, the @Profile("!gke") annotation. When the services start, one of the two class implementations are is loaded, depending on the Active Spring Profile, gke or not gke. To understand the changes better, examine the Account service’s SenderConfigConfluent.java file (gist).

Line 20: Designates this class as belonging to the gke Spring Profile.

Line 23: The class now implements an interface.

Lines 25–44: Reference the Confluent Cloud Kafka cluster configuration. The values for these variables will come from the Kubernetes ConfigMap and Secret, described previously, when the services are deployed to GKE.

Lines 55–59: Additional properties that have been added to the Kafka Sender configuration properties, specifically for Confluent Cloud.

Once code changes were completed and tested, the Docker Image for each service was rebuilt and uploaded to Docker Hub for public access. When recreating the images, the version of the Java Docker base image was upgraded from the previous post to Alpine OpenJDK 12 (openjdk:12-jdk-alpine).

Google Kubernetes Engine (GKE) with Istio

Having created the MongoDB Atlas and Confluent Cloud clusters, built the Kubernetes and Istio resources, modified the service’s source code, and pushed the new Docker Images to Docker Hub, the GKE cluster may now be built.

For the sake of brevity, we will manually create the cluster and deploy the resources, using the Google Cloud SDK gcloud and Kubernetes kubectl CLI tools, as opposed to automating with CI/CD tools, like Jenkins or Spinnaker. For this demonstration, I suggest a minimally-sized two-node GKE cluster using n1-standard-2 machine-type instances. The latest available release of Kubernetes on GKE at the time of this post was 1.11.5-gke.5 and Istio 1.03 (Istio on GKE still considered beta). Note Kubernetes and Istio are evolving rapidly, thus the configuration flags often change with newer versions. Check the GKE Clusters tab for the latest clusters create command format (gist).

Executing these commands successfully will build the cluster and the dev Namespace, into which all the resources will be deployed. The two-node cluster creation process takes about three minutes on average.

screen_shot_2018-12-26_at_2.00.56_pm

We can also observe the new GKE cluster from the GKE Clusters Details tab.

screen_shot_2018-12-26_at_2.18.32_pm

Creating the GKE cluster also creates several other GCP resources, including a TCP load balancer and three external IP addresses. Shown below in the VPC network External IP addresses tab, there is one IP address associated with each of the two GKE cluster’s VM instances, and one IP address associated with the frontend of the load balancer.

screen_shot_2018-12-26_at_2.59.38_pm

While the TCP load balancer’s frontend is associated with the external IP address, the load balancer’s backend is a target pool, containing the two GKE cluster node machine instances.

screen_shot_2018-12-26_at_2.58.42_pm

A forwarding rule associates the load balancer’s frontend IP address with the backend target pool. External requests to the frontend IP address will be routed to the GKE cluster. From there, requests will be routed by Kubernetes and Istio to the individual storefront service Pods, and through the Istio sidecar (Envoy) proxies. There is an Istio sidecar proxy deployed to each Storefront service Pod.

screen_shot_2018-12-26_at_2.59.59_pm

Below, we see the details of the load balancer’s target pool, containing the two GKE cluster’s VMs.

screen_shot_2018-12-26_at_3.57.03_pm.png

As shown at the start of the post, a simplified view of the GCP/GKE network routing looks as follows. For brevity, firewall rules and routes are not illustrated in the diagram.

ConfluentCloudRouting

Apply Kubernetes Resources

Again, using kubectl, deploy the three services and associated Kubernetes and Istio resources. Note the Istio Gateway and VirtualService(s) are not deployed to the dev Namespace since their role is to control ingress and route traffic to the dev Namespace and the services within it (gist).

Once these commands complete successfully, on the Workloads tab, we should observe two Pods of each of the three storefront service Kubernetes Deployments deployed to the dev Namespace, all six Pods with a Status of ‘OK’. A Deployment controller provides declarative updates for Pods and ReplicaSets.

screen_shot_2018-12-26_at_2.51.01_pm

On the Services tab, we should observe the three storefront service’s Kubernetes Services. A Service in Kubernetes is a REST object.

screen_shot_2018-12-26_at_2.51.16_pm

On the Configuration Tab, we should observe the Kubernetes ConfigMap and two Secrets also deployed to the dev Environment.

screen_shot_2018-12-26_at_2.51.36_pm

Below, we see the confluent-cloud-kafka ConfigMap resource with its data map of Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.54.51_pm

Below, we see the confluent-cloud-kafka Secret with its data map of sensitive Confluent Cloud configuration.

screen_shot_2018-12-23_at_10.55.17_pm

Test the Storefront API

If you recall from part two of the previous post, there are a set of seven Storefront API endpoints that can be called to create sample data and test the API. The HTTP GET Requests hit each service, generate test data, populate the three MongoDB databases, and produce and consume Kafka messages across all three topics. Making these requests is the easiest way to confirm the Storefront API is working properly.

  1. Sample Customer: accounts/customers/sample
  2. Sample Orders: orders/customers/sample/orders
  3. Sample Fulfillment Requests: orders/customers/sample/fulfill
  4. Sample Processed Order Event: fulfillment/fulfillment/sample/process
  5. Sample Shipped Order Event: fulfillment/fulfillment/sample/ship
  6. Sample In-Transit Order Event: fulfillment/fulfillment/sample/in-transit
  7. Sample Received Order Event: fulfillment/fulfillment/sample/receive

Thee are a wide variety of tools to interact with the Storefront API. The project includes a simple Python script, sample_data.py, which will make HTTP GET requests to each of the above endpoints, after confirming their health, and return a success message.

screen_shot_2018-12-31_at_12.19.50_pm.png

Postman

Postman, my personal favorite, is also an excellent tool to explore the Storefront API resources. I have the above set of the HTTP GET requests saved in a Postman Collection. Using Postman, below, we see the response from an HTTP GET request to the /accounts/customers endpoint.

screen_shot_2018-12-26_at_5.48.34_pm

Postman also allows us to create integration tests and run Collections of Requests in batches using Postman’s Collection Runner. To test the Storefront API, below, I used Collection Runner to run a single series of integration tests, intended to confirm the API’s functionality, by checking for expected HTTP response codes and expected values in the response payloads. Postman also shows the response times from the Storefront API. Since this platform was not built to meet Production SLAs, measuring response times is less critical in the Development environment.

screen_shot_2018-12-26_at_5.47.57_pm

Google Stackdriver

If you recall, the GKE cluster had the Stackdriver Kubernetes option enabled, which gives us, amongst other observability features, access to all cluster, node, pod, and container logs. To confirm data is flowing to the MongoDB databases and Kafka topics, we can check the logs from any of the containers. Below we see the logs from the two Accounts Pod containers. Observe the AfterSaveListener handler firing on an onAfterSave event, which sends a CustomerChangeEvent payload to the accounts.customer.change Kafka topic, without error. These entries confirm that both Atlas and Confluent Cloud are reachable by the GKE-based workloads, and appear to be functioning properly.

screen_shot_2018-12-26_at_8.05.50_pm.png

MongoDB Atlas Collection View

Review the MongoDB Atlas Clusters Collections tab. In this Development environment, the MongoDB databases and collections are created the first time a service tries to connects to them. In Production, the databases would be created and secured in advance of deploying resources. Once the sample data requests are completed successfully, you should now observe the three Storefront API databases, each with collections of documents.

screen_shot_2018-12-26_at_4.56.25_pm

MongoDB Compass

In addition to the Atlas web-based management console, MongoDB Compass is an excellent desktop tool to explore and manage MongoDB databases. Compass is available for Mac, Linux, and Windows. One of the many great features of Compass is the ability to visualize collection schemas and interactively filter documents. Below we see the fulfillment.requests collection schema.

Screen Shot 2019-01-20 at 10.21.54 AM.png

Confluent Control Center

Confluent Control Center is a downloadable, web browser-based tool for managing and monitoring Apache Kafka, including your Confluent Cloud clusters. Confluent Control Center provides rich functionality for building and monitoring production data pipelines and streaming applications. Confluent offers a free 30-day trial of Confluent Control Center. Since the Control Center is provided at an additional fee, and I found difficult to configure for Confluent Cloud clusters based on Confluent’s documentation, I chose not to cover it in detail, for this post.

screen_shot_2018-12-23_at_10.21.41_pm

screen_shot_2018-12-23_at_10.48.49_pm

Tear Down Cluster

Delete your Confluent Cloud and MongoDB clusters using their web-based management consoles. To delete the GKE cluster and all deployed Kubernetes resources, use the cluster delete command. Also, double-check that the external IP addresses and load balancer, associated with the cluster, were also deleted as part of the cluster deletion (gist).

Conclusion

In this post, we have seen how easy it is to integrate Cloud-based DBaaS and MaaS products with the managed Kubernetes services from GCP, AWS, and Azure. As this post demonstrated, leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications.

In future posts, we will revisit this Storefront API example, further demonstrating how to enable HTTPS (Securing Your Istio Ingress Gateway with HTTPS) and end-user authentication (Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0)

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , , ,

4 Comments

Using the Google Cloud Dataproc WorkflowTemplates API to Automate Spark and Hadoop Workloads on GCP

In the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we explored Google Cloud Dataproc using the Google Cloud Console as well as the Google Cloud SDK and Cloud Dataproc API. We created clusters, then uploaded and ran Spark and PySpark jobs, then deleted clusters, each as discrete tasks. Although each task could be done via the Dataproc API and therefore automatable, they were independent tasks, without awareness of the previous task’s state.

Screen Shot 2018-12-15 at 11.39.26 PM.png

In this brief follow-up post, we will examine the Cloud Dataproc WorkflowTemplates API to more efficiently and effectively automate Spark and Hadoop workloads. According to Google, the Cloud Dataproc WorkflowTemplates API provides a flexible and easy-to-use mechanism for managing and executing Dataproc workflows. A Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. A Workflow is an operation that runs a Directed Acyclic Graph (DAG) of jobs on a cluster. Shown below, we see one of the Workflows that will be demonstrated in this post, displayed in Spark History Server Web UI.

screen-shot-2018-12-16-at-11.07.29-am.png

Here we see a four-stage DAG of one of the three jobs in the workflow, displayed in Spark History Server Web UI.

screen-shot-2018-12-16-at-11.18.45-am

Workflows are ideal for automating large batches of dynamic Spark and Hadoop jobs, and for long-running and unattended job execution, such as overnight.

Demonstration

Using the Python and Java projects from the previous post, we will first create workflow templates using the just the WorkflowTemplates API. We will create the template, set a managed cluster, add jobs to the template, and instantiate the workflow. Next, we will further optimize and simplify our workflow by using a YAML-based workflow template file. The YAML-based template file eliminates the need to make API calls to set the template’s cluster and add the jobs to the template. Finally, to further enhance the workflow and promote re-use of the template, we will incorporate parameterization. Parameters will allow us to pass parameters (key/value) pairs from the command line to workflow template, and on to the Python script as input arguments.

It is not necessary to use the Google Cloud Console for this post. All steps will be done using Google Cloud SDK shell commands. This means all steps may be automated using CI/CD DevOps tools, like Jenkins and Spinnaker on GKE.

Source Code

All open-sourced code for this post can be found on GitHub within three repositories: dataproc-java-demodataproc-python-demo, and dataproc-workflow-templates. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.

WorkflowTemplates API

Always start by ensuring you have the latest Google Cloud SDK updates and are working within the correct Google Cloud project.

gcloud components update

export PROJECT_ID=your-project-id 
gcloud config set project $PROJECT

Set the following variables based on your Google environment. The variables will be reused throughout the post for multiple commands.

export REGION=your-region
export ZONE=your-zone
export BUCKET_NAME=your-bucket

The post assumes you still have the Cloud Storage bucket we created in the previous post. In the bucket, you will need the two Kaggle IBRD CSV files, available on Kaggle, the compiled Java JAR file from the dataproc-java-demo project, and a new Python script, international_loans_dataproc.py, from the dataproc-python-demo project.

screen-shot-2018-12-16-at-12.03.51-pm

Use gsutil with the copy (cp) command to upload the four files to your Storage bucket.

gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME
gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME
gsutil cp international_loans_dataproc.py $BUCKET_NAME

Following Google’s suggested process, we create a workflow template using the workflow-templates create command.

export TEMPLATE_ID=template-demo-1
  
gcloud dataproc workflow-templates create \
  $TEMPLATE_ID --region $REGION

Adding a Cluster

Next, we need to set a cluster for the workflow to use, in order to run the jobs. Cloud Dataproc will create and use a Managed Cluster for your workflow or use an existing cluster. If the workflow uses a managed cluster, it creates the cluster, runs the jobs, and then deletes the cluster when the jobs are finished. This means, for many use cases, there is no need to maintain long-lived clusters, they become just an ephemeral part of the workflow.

We set a managed cluster for our Workflow using the workflow-templates set-managed-cluster command. We will re-use the same cluster specifications we used in the previous post, the Standard, 1 master node and 2 worker nodes, cluster type.

gcloud dataproc workflow-templates set-managed-cluster \
  $TEMPLATE_ID \
  --region $REGION \
  --zone $ZONE \
  --cluster-name three-node-cluster \
  --master-machine-type n1-standard-4 \
  --master-boot-disk-size 500 \
  --worker-machine-type n1-standard-4 \
  --worker-boot-disk-size 500 \
  --num-workers 2 \
  --image-version 1.3-deb9

Alternatively, if we already had an existing cluster, we would use the workflow-templates set-cluster-selector command, to associate that cluster with the workflow template.

gcloud dataproc workflow-templates set-cluster-selector \
  $TEMPLATE_ID \
  --region $REGION \
  --cluster-labels goog-dataproc-cluster-uuid=$CLUSTER_UUID

To get the existing cluster’s UUID label value, you could use a command similar to the following.

CLUSTER_UUID=$(gcloud dataproc clusters describe $CLUSTER_2 \
  --region $REGION \
  | grep 'goog-dataproc-cluster-uuid:' \
  | sed 's/.* //')

echo $CLUSTER_UUID

1c27efd2-f296-466e-b14e-c4263d0d7e19

Adding Jobs

Next, we add the jobs we want to run to the template. Each job is considered a step in the template, each step requires a unique step id. We will add three jobs to the template, two Java-based Spark jobs from the previous post, and a new Python-based PySpark job.

First, we add the two Java-based Spark jobs, using the workflow-templates add-job spark command. This command’s flags are nearly identical to the dataproc jobs submit spark command, used in the previous post.

export STEP_ID=ibrd-small-spark
  
gcloud dataproc workflow-templates add-job spark \
  --region $REGION \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --class org.example.dataproc.InternationalLoansAppDataprocSmall \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar

export STEP_ID=ibrd-large-spark
  
gcloud dataproc workflow-templates add-job spark \
  --region $REGION \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --class org.example.dataproc.InternationalLoansAppDataprocLarge \
  --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar

Next, we add the Python-based PySpark job, international_loans_dataproc.py, as the second job in the template. This Python script requires three input arguments, on lines 15–17, which are the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket where the results will be placed (gist).

We pass the arguments to the Python script as part of the PySpark job, using the workflow-templates add-job pyspark command.

export STEP_ID=ibrd-large-pyspark
  
gcloud dataproc workflow-templates add-job pyspark \
  $BUCKET_NAME/international_loans_dataproc.py \
  --step-id $STEP_ID \
  --workflow-template $TEMPLATE_ID \
  --region $REGION \
  -- $BUCKET_NAME \
     ibrd-statement-of-loans-historical-data.csv \
     ibrd-summary-large-python

That’s it, we have created our first Cloud Dataproc Workflow Template using the Dataproc WorkflowTemplate API. To view our template we can use the following two commands. First, use the workflow-templates list command to display a list of available templates. The list command output displays the version of the workflow template and how many jobs are in the template.

gcloud dataproc workflow-templates list --region $REGION
  
ID               JOBS  UPDATE_TIME               VERSION
template-demo-1  3     2018-12-15T16:32:06.508Z  5

Then, we use the workflow-templates describe command to show the details of a specific template.

gcloud dataproc workflow-templates describe \
  $TEMPLATE_ID --region $REGION

Using the workflow-templates describe command, we should see output similar to the following (gist).

In the template description, notice the template’s id, the managed cluster in the placement section, and the three jobs, all which we added using the above series of workflow-templates commands. Also, notice the creation and update timestamps and version number, which were automatically generated by Dataproc. Lastly, notice the name, which refers to the GCP project and region where this copy of the template is located. Had we used an existing cluster with our workflow, as opposed to a managed cluster, the placement section would have looked as follows.

placement:
  clusterSelector:
    clusterLabels:
      goog-dataproc-cluster-uuid: your_clusters_uuid_label_value

To instantiate the workflow, we use the workflow-templates instantiate command. This command will create the managed cluster, run all the steps (jobs), then delete the cluster. I have added the time command to see how fast the workflow will take to complete.

time gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION #--async

We can observe the progress from the Google Cloud Dataproc Console, or from the command line by omitting the --async flag. Below we see the three jobs completed successfully on the managed cluster.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/e720bb96-9c87-330e-b1cd-efa4612b3c57].
WorkflowTemplate [template-demo-1] RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/e1fe53de-92f2-4f8c-8b3a-fda5e13829b6].
Created cluster: three-node-cluster-ugdo4ygpl52bo.
Job ID ibrd-small-spark-ugdo4ygpl52bo RUNNING
Job ID ibrd-large-spark-ugdo4ygpl52bo RUNNING
Job ID ibrd-large-pyspark-ugdo4ygpl52bo RUNNING
Job ID ibrd-small-spark-ugdo4ygpl52bo COMPLETED
Job ID ibrd-large-spark-ugdo4ygpl52bo COMPLETED
Job ID ibrd-large-pyspark-ugdo4ygpl52bo COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/f2a40c33-3cdf-47f5-92d6-345463fbd404].
WorkflowTemplate [template-demo-1] DONE
Deleted cluster: three-node-cluster-ugdo4ygpl52bo.

1.02s user 0.35s system 0% cpu 5:03.55 total

In the output, you see the creation of the cluster, the three jobs running and completing successfully, and finally the cluster deletion. The entire workflow took approximately 5 minutes to complete. Below is the view of the workflow’s results from the Dataproc Clusters Console Jobs tab.

screen_shot_2018-12-15_at_11.42.44_am

Below we see the output from the PySpark job, run as part of the workflow template, shown in the Dataproc Clusters Console Output tab. Notice the three input arguments we passed to the Python script from the workflow template, listed in the output.

screen_shot_2018-12-15_at_11.43.56_am

We see the arguments passed to the job, from the Jobs Configuration tab.

screen_shot_2018-12-15_at_1.11.11_pm.png

Examining the Google Cloud Dataproc Jobs Console, we will observe that the WorkflowTemplate API automatically adds a unique alphanumeric extension to both the name of the managed clusters we create, as well as to the name of each job that is run. The extension on the cluster name matches the extension on the jobs ran on that cluster.

screen_shot_2018-12-15_at_1.05.41_pm

YAML-based Workflow Template

Although, the above WorkflowTemplates API-based workflow was certainly more convenient than using the individual Cloud Dataproc API commands. At a minimum, we don’t have to remember to delete our cluster when the jobs are complete, as I often do. To further optimize the workflow, we will introduce YAML-based Workflow Template. According to Google, you can define a workflow template in a YAML file, then instantiate the template to run the workflow. You can also import and export a workflow template YAML file to create and update a Cloud Dataproc workflow template resource.

We can export our first workflow template to create our YAML-based template file.

gcloud dataproc workflow-templates export template-demo-1 \
  --destination template-demo-2.yaml \
  --region $REGION

Below is our first YAML-based template, template-demo-2.yaml. You will need to replace the values in the template with your own values, based on your environment (gist).

Note the template looks almost similar to the template we just created previously using the WorkflowTemplates API. The YAML-based template requires the placement and jobs fields. All the available fields are detailed, here.

To run the template we use the workflow-templates instantiate-from-file command. Again, I will use the time command to measure performance.

time gcloud dataproc workflow-templates instantiate-from-file \
  --file template-demo-2.yaml \
  --region $REGION

Running the workflow-templates instantiate-from-file command will run a workflow, nearly identical to the workflow we ran in the previous example, with a similar timing. Below we see the three jobs completed successfully on the managed cluster, in approximately the same time as the previous workflow.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/7ba3c28e-ebfa-32e7-9dd6-d938a1cfe23b].
WorkflowTemplate RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/8d05199f-ed36-4787-8a28-ae784c5bc8ae].
Created cluster: three-node-cluster-5k3bdmmvnna2y.
Job ID ibrd-small-spark-5k3bdmmvnna2y RUNNING
Job ID ibrd-large-spark-5k3bdmmvnna2y RUNNING
Job ID ibrd-large-pyspark-5k3bdmmvnna2y RUNNING
Job ID ibrd-small-spark-5k3bdmmvnna2y COMPLETED
Job ID ibrd-large-spark-5k3bdmmvnna2y COMPLETED
Job ID ibrd-large-pyspark-5k3bdmmvnna2y COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/a436ae82-f171-4b0a-9b36-5e16406c75d5].
WorkflowTemplate DONE
Deleted cluster: three-node-cluster-5k3bdmmvnna2y.

1.16s user 0.44s system 0% cpu 4:48.84 total

Parameterization of Templates

To further optimize the workflow template process for re-use, we have the option of passing parameters to our template. Imagine you now receive new loan snapshot data files every night. Imagine you need to run the same data analysis on the financial transactions of thousands of your customers, nightly. Parameterizing templates makes it more flexible and reusable. By removing hard-codes values, such as Storage bucket paths and data file names, a single template may be re-used for multiple variations of the same job. Parameterization allows you to automate hundreds or thousands of Spark and Hadoop jobs in a workflow or workflows, each with different parameters, programmatically.

To demonstrate the parameterization of a workflow template, we create another YAML-based template with just the Python/PySpark job, template-demo-3.yaml. If you recall from our first example, the Python script, international_loans_dataproc.py, requires three input arguments: the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket, where the results will be placed.

We will replace four of the values in the template with parameters. We will inject those parameter’s values when we instantiate the workflow. Below is the new parameterized template. The template now has a parameters section from lines 26–46. They define parameters that will be used to replace the four values on lines 3–7 (gist).

Note the PySpark job’s three arguments and the location of the Python script have been parameterized. Parameters may include validation. As an example of validation, the template uses regex to validate the format of the Storage bucket path. The regex follows Google’s RE2 regular expression library syntax. If you need help with regex, the Regex Tester – Golang website is a convenient way to test your parameter’s regex validations.

First, we import the new parameterized YAML-based workflow template, using the workflow-templates import command. Then, we instantiate the template using the workflow-templates instantiate command. The workflow-templates instantiate command will run the single PySpark job, analyzing the smaller IBRD data file, and placing the resulting Parquet-format file in a directory within the Storage bucket. We pass the Python script location, bucket link, smaller IBRD data file name, and output directory, as parameters to the template, and therefore indirectly, three of these, as input arguments to the Python script.

export TEMPLATE_ID=template-demo-3

gcloud dataproc workflow-templates import $TEMPLATE_ID \
   --region $REGION --source template-demo-3.yaml
  
gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION --async \
  --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-latest-available-snapshot.csv",RESULTS_DIRECTORY="ibrd-summary-small-python"

Next, we will analyze the larger historic data file, using the same parameterized YAML-based workflow template, but changing two of the four parameters we are passing to the template with the workflow-templates instantiate command. This will run a single PySpark job on the larger IBRD data file and place the resulting Parquet-format file in a different directory within the Storage bucket.

time gcloud dataproc workflow-templates instantiate \
  $TEMPLATE_ID --region $REGION \
  --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-historical-data.csv",RESULTS_DIRECTORY="ibrd-summary-large-python"

This is the power of parameterization—one workflow template and one job script, but two different datasets and two different results.

Below we see the single PySpark job ran on the managed cluster.

Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/b3c5063f-e3cf-3833-b613-83db12b82f32].
WorkflowTemplate [template-demo-3] RUNNING
Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105].
Created cluster: three-node-cluster-j6q2al2mkkqck.
Job ID ibrd-pyspark-j6q2al2mkkqck RUNNING
Job ID ibrd-pyspark-j6q2al2mkkqck COMPLETED
Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/fe4a263e-7c6d-466e-a6e2-52292cbbdc9b].
WorkflowTemplate [template-demo-3] DONE
Deleted cluster: three-node-cluster-j6q2al2mkkqck.

0.98s user 0.40s system 0% cpu 4:19.42 total

Using the workflow-templates list command again, should display a list of two workflow templates.

gcloud dataproc workflow-templates list --region $REGION
  
ID               JOBS  UPDATE_TIME               VERSION
template-demo-3  1     2018-12-15T17:04:39.064Z  2
template-demo-1  3     2018-12-15T16:32:06.508Z  5

Looking within the Google Cloud Storage bucket, we should now see four different folders, the results of the workflows.

screen-shot-2018-12-16-at-11.58.32-am.png

Job Results and Testing

To check on the status of a job, we use the dataproc jobs wait command. This returns the standard output (stdout) and standard error (stderr) for that specific job.

export SET_ID=ibrd-large-dataset-pyspark-cxzzhr2ro3i54
  
gcloud dataproc jobs wait $SET_ID \
  --project $PROJECT_ID \
  --region $REGION

The dataproc jobs wait command is frequently used for automated testing of jobs, often within a CI/CD pipeline. Assume we have expected part of the job output that indicates success, such as a string, boolean, or numeric value. We could any number of test frameworks or other methods to confirm the existence of that expected value or values. Below is a simple example of using the grep command to check for the existence of the expected line ‘ state: FINISHED’ in the standard output of the dataproc jobs wait command.

command=$(gcloud dataproc jobs wait $SET_ID \
--project $PROJECT_ID \
--region $REGION) &>/dev/null

if grep -Fqx "  state: FINISHED" <<< $command &>/dev/null; then
  echo "Job Success!"
else
  echo "Job Failure?"
fi

# single line alternative
if grep -Fqx "  state: FINISHED" <<< $command &>/dev/null;then echo "Job Success!";else echo "Job Failure?";fi

Job Success!

Individual Operations

To view individual workflow operations, use the operations list and operations describe commands. The operations list command will list all operations.

Notice the three distinct series of operations within each workflow, shown with the operations list command: WORKFLOW, CREATE, and DELETE. In the example below, I’ve separated the operations by workflow, for better clarity.

gcloud dataproc operations list --region $REGION

NAME                                  TIMESTAMP                 TYPE      STATE  ERROR  WARNINGS
fe4a263e-7c6d-466e-a6e2-52292cbbdc9b  2018-12-15T17:11:45.178Z  DELETE    DONE
896b7922-da8e-49a9-bd80-b1ac3fda5105  2018-12-15T17:08:38.322Z  CREATE    DONE
b3c5063f-e3cf-3833-b613-83db12b82f32  2018-12-15T17:08:37.497Z  WORKFLOW  DONE
---
be0e5293-275f-46ad-b1f4-696ba44c222e  2018-12-15T17:07:26.305Z  DELETE    DONE
6784078c-cbe3-4c1e-a56e-217149f555a4  2018-12-15T17:04:40.613Z  CREATE    DONE
fcd8039e-a260-3ab3-ad31-01abc1a524b4  2018-12-15T17:04:40.007Z  WORKFLOW  DONE
---
b4b23ca6-9442-4ffb-8aaf-460bac144dd8  2018-12-15T17:02:16.744Z  DELETE    DONE
89ef9c7c-f3c9-4d01-9091-61ed9e1f085d  2018-12-15T17:01:45.514Z  CREATE    DONE
243fa7c1-502d-3d7a-aaee-b372fe317570  2018-12-15T17:01:44.895Z  WORKFLOW  DONE

We use the results of the operations list command to execute the operations describe command to describe a specific operation.

gcloud dataproc operations describe \
  projects/$PROJECT_ID/regions/$REGION/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105

Each type of operation contains different details. Note the fine-grain of detail we get from Dataproc using the operations describe command for a CREATE operation (gist).

Conclusion

In this brief, follow-up post to the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we have seen how easy the WorkflowTemplates API and YAML-based workflow templates make automating our analytics jobs. This post only scraped the surface of the complete functionality of the WorkflowTemplates API and parameterization of templates.

In a future post, we leverage the automation capabilities of the Google Cloud Platform, the WorkflowTemplates API, YAML-based workflow templates, and parameterization, to develop a fully-automated DevOps for Big Data workflow, capable of running hundreds of Spark and Hadoop jobs.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

1 Comment