Archive for category Serverless

Collecting and Analyzing IoT Data in Near Real-Time with AWS IoT, LoRa, and LoRaWAN

Introduction

In a recent post published on ITNEXT, LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT, we explored the use of the LoRa (Long Range) and LoRaWAN protocols to transmit and receive sensor data, over a substantial distance, between an IoT device, containing several embedded sensors, and an IoT gateway. In this post, we will extend that architecture to the Cloud, using AWS IoT, a broad and deep set of IoT services, from the edge to the Cloud. We will securely collect, transmit, and analyze IoT data using the AWS cloud platform.

LoRa and LoRaWAN

According to the LoRa Alliance, Low-Power, Wide-Area Networks (LPWAN) are projected to support a major portion of the billions of devices forecasted for the Internet of Things (IoT). LoRaWAN is designed from the bottom up to optimize LPWANs for battery lifetime, capacity, range, and cost. LoRa and LoRaWAN permit long-range connectivity for IoT devices in different types of industries. According to Wikipedia, LoRaWAN defines the communication protocol and system architecture for the network, while the LoRa physical layer enables the long-range communication link.

AWS IoT

AWS describes AWS IoT as a set of managed services that enable ‘internet-connected devices to connect to the AWS Cloud and lets applications in the cloud interact with internet-connected devices.’ AWS IoT services span three categories: Device Software, Connectivity and Control, and Analytics.

In this post, we will focus on three AWS IOT services, one from each category, including AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics. According to AWS, the AWS IoT Device SDKs include open-source libraries and developer and porting guides with samples to help you build innovative IoT products or solutions on your choice of hardware platforms. AWS IoT Core is a managed cloud service that lets connected devices easily and securely interact with cloud applications and other devices. AWS IoT Core can process and route messages to AWS endpoints and other devices reliably and securely. Finally, AWS IoT Analytics is a fully-managed IoT analytics service, designed specifically for IoT, which collects, pre-processes, enriches, stores, and analyzes IoT device data at scale.

To learn more about AWS IoT, specifically the AWS IoT services we will be exploring within this post, I recommend reading my recent post published on Towards Data Science, Getting Started with IoT Analytics on AWS.

Hardware Selection

In this post, we will use the following hardware.

IoT Device with Embedded Sensors

An Arduino single-board microcontroller will serve as our IoT device. The 3.3V AI-enabled Arduino Nano 33 BLE Sense board (Amazon: USD 36.00), released in August 2019, comes with the powerful nRF52840 processor from Nordic Semiconductors, a 32-bit ARM Cortex-M4 CPU running at 64 MHz, 1MB of CPU Flash Memory, 256KB of SRAM, and a NINA-B306 stand-alone Bluetooth 5 low energy (BLE) module.

The Sense contains an impressive array of embedded sensors:

  • 9-axis Inertial Sensor (LSM9DS1): 3D digital linear acceleration sensor, a 3D digital
    angular rate sensor, and a 3D digital magnetic sensor
  • Humidity and Temperature Sensor (HTS221): Capacitive digital sensor for relative humidity and temperature
  • Barometric Sensor (LPS22HB): MEMS nano pressure sensor: 260–1260 hectopascal (hPa) absolute digital output barometer
  • Microphone (MP34DT05): MEMS audio sensor omnidirectional digital microphone
  • Gesture, Proximity, Light Color, and Light Intensity Sensor (APDS9960): Advanced Gesture detection, Proximity detection, Digital Ambient Light Sense (ALS), and Color Sense (RGBC).

The Arduino Sense is an excellent, low-cost single-board microcontroller for learning about the collection and transmission of IoT sensor data.

IoT Gateway

An IoT Gateway, according to TechTarget, is a physical device or software program that serves as the connection point between the Cloud and controllers, sensors, and intelligent devices. All data moving to the Cloud, or vice versa, goes through the gateway, which can be either a dedicated hardware appliance or software program.

LoRa Gateways, to paraphrase The Things Network, form the bridge between devices and the Cloud. Devices use low power networks like LoRaWAN to connect to the Gateway, while the Gateway uses high bandwidth networks like WiFi, Ethernet, or Cellular to connect to the Cloud.

A third-generation Raspberry Pi 3 Model B+ single-board computer (SBC) will serve as our LoRa IoT Gateway. This Raspberry Pi model features a 1.4GHz Cortex-A53 (ARMv8) 64-bit quad-core processor System on a Chip (SoC), 1GB LPDDR2 SDRAM, dual-band wireless LAN, Bluetooth 4.2 BLE, and Gigabit Ethernet (Amazon: USD 42.99).

LoRa Transceiver Modules

To transmit the IoT sensor data between the IoT device, containing the embedded sensors, and the IoT gateway, I have used the REYAX RYLR896 LoRa transceiver module (Amazon: USD 19.50 x 2). The transceiver modules are commonly referred to as a universal asynchronous receiver-transmitter (UART). A UART is a computer hardware device for asynchronous serial communication in which the data format and transmission speeds are configurable.

According to the manufacturer, REYAX, the RYLR896 contains the Semtech SX1276 long-range, low power transceiver. The RYLR896 module provides ultra-long range spread spectrum communication and high interference immunity while minimizing current consumption. Each RYLR896 module contains a small, PCB integrated, helical antenna. This transceiver operates at both the 868 and 915 MHz frequency ranges. In this demonstration, we will be transmitting at 915 MHz for North America.

The Arduino Sense (IoT device) transmits data, using one of the RYLR896 modules (shown below front). The Raspberry Pi (IoT Gateway), connected to the other RYLR896 module (shown below rear), receives the data.

LoRaWAN Security

The RYLR896 is capable of AES 128-bit data encryption. Using the Advanced Encryption Standard (AES), we will encrypt the data sent from the IoT device to the IoT gateway, using a 32 hex digit password (128 bits / 4 bits/hex digit).

Provisioning AWS Resources

To start, we will create the necessary AWS IoT and associated resources on the AWS cloud platform. Once these resources are in place, we can then proceed to configure the IoT device and IoT gateway to securely transmit the sensor data to the Cloud.

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
  –branch master –single-branch –depth 1 –no-tags \
  https://github.com/garystafford/aws-iot-analytics-demo.git

AWS CloudFormation

The CloudFormation template, iot-analytics.yaml, will create an AWS IoT CloudFormation stack containing the following resources.

  • AWS IoT Thing
  • AWS IoT Thing Policy
  • AWS IoT Core Topic Rule
  • AWS IoT Analytics Channel, Pipeline, Data store, and Data set
  • AWS Lambda and Lambda Permission
  • Amazon S3 Bucket
  • Amazon SageMaker Notebook Instance
  • AWS IAM Roles

Please be aware of the costs involved with the AWS resources used in the CloudFormation template before continuing. To create the AWS CloudFormation stack from the included CloudFormation template, execute the following AWS CLI command.

aws cloudformation create-stack \
–stack-name lora-iot-demo \
–template-body file://cloudformation/iot-analytics.yaml \
–parameters ParameterKey=ProjectName,ParameterValue=lora-iot-demo \
ParameterKey=IoTTopicName,ParameterValue=lora-iot-demo \
–capabilities CAPABILITY_NAMED_IAM

The resulting CloudFormation stack should contain 16 AWS resources.

Additional Resources

Unfortunately, AWS CloudFormation cannot create all the AWS IoT resources we require for this demonstration. To complete the AWS provisioning process, execute the following series of AWS CLI commands, aws_cli_commands.md. These commands will create the remaining resources, including an AWS IoT Thing Type, Thing Group, Thing Billing Group, and an X.509 Certificate.

# LoRaWAN / AWS IoT Demo
# Author: Gary Stafford
# Run AWS CLI commands after CloudFormation stack completes successfully
# variables
thingName=lora-iot-gateway-01
thingPolicy=LoRaDevicePolicy
thingType=LoRaIoTGateway
thingGroup=LoRaIoTGateways
thingBillingGroup=LoRaIoTGateways
mkdir ${thingName}
aws iot create-keys-and-certificate \
–certificate-pem-outfile "${thingName}/${thingName}.cert.pem" \
–public-key-outfile "${thingName}/${thingName}.public.key" \
–private-key-outfile "${thingName}/${thingName}.private.key" \
–set-as-active
# assuming you only have one certificate registered
certificate=$(aws iot list-certificates | jq '.[][] | .certificateArn')
## alternately, for a specific certificate if you have more than one
# aws iot list-certificates
## then change the value below
# certificate=arn:aws:iot:us-east-1:123456789012:cert/<certificate>
aws iot attach-policy \
–policy-name $thingPolicy \
–target $certificate
aws iot attach-thing-principal \
–thing-name $thingName \
–principal $certificate
aws iot create-thing-type \
–thing-type-name $thingType \
–thing-type-properties "thingTypeDescription=LoRaWAN IoT Gateway"
aws iot create-thing-group \
–thing-group-name $thingGroup \
–thing-group-properties "thingGroupDescription=\"LoRaWAN IoT Gateway Thing Group\", attributePayload={attributes={Manufacturer=RaspberryPiFoundation}}"
aws iot add-thing-to-thing-group \
–thing-name $thingName \
–thing-group-name $thingGroup
aws iot create-billing-group \
–billing-group-name $thingBillingGroup \
–billing-group-properties "billingGroupDescription=\"Gateway Billing Group\""
aws iot add-thing-to-billing-group \
–thing-name $thingName \
–billing-group-name $thingBillingGroup
aws iot update-thing \
–thing-name $thingName \
–thing-type-name $thingType \
–attribute-payload "{\"attributes\": {\"GatewayMfr\":\"RaspberryPiFoundation\", \"LoRaMfr\":\"REYAX\", \"LoRaModel\":\"RYLR896\"}}"
aws iot describe-thing \
–thing-name $thingName
view raw aws_cli_commands.sh hosted with ❤ by GitHub

IoT Device Configuration

With the AWS resources deployed, we can configure the IoT device and IoT Gateway.

Arduino Sketch

For those not familiar with Arduino, a sketch is the name that Arduino uses for a program. It is the unit of code that is uploaded into non-volatile flash memory and runs on an Arduino board. The Arduino language is a set of C and C++ functions. All standard C and C++ constructs supported by the avr-g++ compiler should work in Arduino.

For this post, the sketch, lora_iot_demo_aws.ino, contains the code necessary to collect and securely transmit the environmental sensor data, including temperature, relative humidity, barometric pressure, Red, Green, and Blue (RGB) color, and ambient light intensity, using the LoRaWAN protocol.

/*
Description: Transmits Arduino Nano 33 BLE Sense sensor telemetry over LoRaWAN,
including temperature, humidity, barometric pressure, and color,
using REYAX RYLR896 transceiver modules
http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
Author: Gary Stafford
*/
#include <Arduino_HTS221.h>
#include <Arduino_LPS22HB.h>
#include <Arduino_APDS9960.h>
const int UPDATE_FREQUENCY = 5000; // update frequency in ms
const float CALIBRATION_FACTOR = –4.0; // temperature calibration factor (Celsius)
const int ADDRESS = 116;
const int NETWORK_ID = 6;
const String PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF";
const String DELIMITER = "|";
String uid = "";
void setup()
{
Serial.begin(9600);
Serial1.begin(115200); // default baud rate of module is 115200
delay(1000); // wait for LoRa module to be ready
// get unique transceiver id to identify iot device on network
Serial1.print((String)"AT+UID?\r\n");
uid = Serial1.readString();
uid.replace("+UID=", ""); // trim off '+UID=' at start of line
uid.replace("\r\n", ""); // trim off CR/LF at end of line
// needs all need to be same for receiver and transmitter
Serial1.print((String)"AT+ADDRESS=" + ADDRESS + "\r\n");
delay(200);
Serial1.print((String)"AT+NETWORKID=" + NETWORK_ID + "\r\n");
delay(200);
Serial1.print("AT+CPIN=" + PASSWORD + "\r\n");
delay(200);
Serial1.print("AT+CPIN?\r\n"); // confirm password is set
if (!HTS.begin())
{ // initialize HTS221 sensor
Serial.println("Failed to initialize humidity temperature sensor!");
while (1);
}
if (!BARO.begin())
{ // initialize LPS22HB sensor
Serial.println("Failed to initialize pressure sensor!");
while (1);
}
// avoid bad readings to start bug
// https://forum.arduino.cc/index.php?topic=660360.0
BARO.readPressure();
delay(1000);
if (!APDS.begin())
{ // initialize APDS9960 sensor
Serial.println("Failed to initialize color sensor!");
while (1);
}
}
void loop()
{
updateReadings();
delay(UPDATE_FREQUENCY);
}
void updateReadings()
{
float temperature = getTemperature(CALIBRATION_FACTOR);
float humidity = getHumidity();
float pressure = getPressure();
int colors[4];
getColor(colors);
String payload = buildPayload(temperature, humidity, pressure, colors);
Serial.println("Payload: " + payload); // display the payload for debugging
Serial1.print(payload); // send the payload over LoRaWAN WiFi
displayResults(temperature, humidity, pressure, colors); // display the results for debugging
}
float getTemperature(float calibration)
{
return HTS.readTemperature() + calibration;
}
float getHumidity()
{
return HTS.readHumidity();
}
float getPressure()
{
return BARO.readPressure();
}
void getColor(int c[])
{
// check if a color reading is available
while (!APDS.colorAvailable())
{
delay(5);
}
int r, g, b, a;
APDS.readColor(r, g, b, a);
c[0] = r;
c[1] = g;
c[2] = b;
c[3] = a;
}
// display for debugging purposes
void displayResults(float t, float h, float p, int c[])
{
Serial.println((String)"UID: " + uid);
Serial.print("Temperature: ");
Serial.println(t);
Serial.print("Humidity: ");
Serial.println(h);
Serial.print("Pressure: ");
Serial.println(p);
Serial.print("Color (r, g, b, a): ");
Serial.print(c[0]);
Serial.print(", ");
Serial.print(c[1]);
Serial.print(", ");
Serial.print(c[2]);
Serial.print(", ");
Serial.println(c[3]);
Serial.println("———-");
}
String buildPayload(float t, float h, float p, int c[])
{
String readings = "";
readings += uid;
readings += DELIMITER;
readings += t;
readings += DELIMITER;
readings += h;
readings += DELIMITER;
readings += p;
readings += DELIMITER;
readings += c[0];
readings += DELIMITER;
readings += c[1];
readings += DELIMITER;
readings += c[2];
readings += DELIMITER;
readings += c[3];
String payload = "";
payload += "AT+SEND=";
payload += ADDRESS;
payload += ",";
payload += readings.length();
payload += ",";
payload += readings;
payload += "\r\n";
return payload;
}

AT Commands

Communications with the RYLR896’s long-range modem is done using AT commands. AT commands are instructions used to control a modem. AT is the abbreviation of ATtention. Every command line starts with AT. That is why modem commands are called AT commands, according to Developer’s Home. A complete list of AT commands can be downloaded as a PDF from the RYLR896 product page.

To efficiently transmit the environmental sensor data from the IoT sensor to the IoT gateway, the sketch concatenates the sensor ID and the sensor values together in a single string. The string will be incorporated into an AT command, sent to the RYLR896 LoRa transceiver module. To make it easier to parse the sensor data on the IoT gateway, we will delimit the sensor values with a pipe (|), as opposed to a comma. According to REYAX, the maximum length of the LoRa payload is approximately 330 bytes.

Below, we see an example of an AT command used to send the sensor data from the IoT sensor and the corresponding unencrypted data received by the IoT gateway. Both contain the LoRa transmitter Address ID, payload length (62 bytes in the example), and the payload. The data received by the IoT gateway also has the Received signal strength indicator (RSSI), and Signal-to-noise ratio (SNR).

Receiving Data on IoT Gateway

The Raspberry Pi will act as a LoRa IoT gateway, receiving the environmental sensor data from the IoT device, the Arduino, and sending the data to AWS. The Raspberry Pi runs a Python script, rasppi_lora_receiver_aws.py, which will receive the data from the Arduino Sense, decrypt the data, parse the sensor values, and serialize the data to a JSON payload, and finally, transmit the payload in an MQTT-protocol message to AWS. The script uses the pyserial, the Python Serial Port Extension, which encapsulates the access for the serial port for communication with the RYLR896 module. The script uses the AWS IoT Device SDK for Python v2 to communicate with AWS.

import json
import logging
import sys
import threading
import time
from argparse import ArgumentParser
import serial
from awscrt import io, mqtt, auth, http, exceptions
from awsiot import mqtt_connection_builder
# LoRaWAN IoT Sensor Demo
# Using REYAX RYLR896 transceiver modules
# http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
# Author: Gary Stafford
# Requirements: python3 -m pip install –user -r requirements.txt
# Usage:
# sh ./rasppi_lora_receiver_aws.sh \
# a1d0wxnxn1hs7m-ats.iot.us-east-1.amazonaws.com
# constants
ADDRESS = 116
NETWORK_ID = 6
PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF"
# global variables
count = 0 # from args
received_count = 0
received_all_event = threading.Event()
def main():
# get args
logging.basicConfig(filename='output.log',
filemode='w', level=logging.DEBUG)
args = get_args() # get args
payload = ""
lora_payload = {}
# set log level
io.init_logging(getattr(io.LogLevel, args.verbosity), 'stderr')
# spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
# set MQTT connection
mqtt_connection = set_mqtt_connection(args, client_bootstrap)
logging.debug("Connecting to {} with client ID '{}'…".format(
args.endpoint, args.client_id))
connect_future = mqtt_connection.connect()
# future.result() waits until a result is available
connect_future.result()
logging.debug("Connecting to REYAX RYLR896 transceiver module…")
serial_conn = serial.Serial(
port=args.tty,
baudrate=int(args.baud_rate),
timeout=5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
if serial_conn.isOpen():
logging.debug("Connected!")
set_lora_config(serial_conn)
check_lora_config(serial_conn)
while True:
# read data from serial port
serial_payload = serial_conn.readline()
logging.debug(serial_payload)
if len(serial_payload) >= 1:
payload = serial_payload.decode(encoding="utf-8")
payload = payload[:2]
try:
data = parse_payload(payload)
lora_payload = {
"ts": time.time(),
"data": {
"device_id": str(data[0]),
"gateway_id": str(args.gateway_id),
"temperature": float(data[1]),
"humidity": float(data[2]),
"pressure": float(data[3]),
"color": {
"red": float(data[4]),
"green": float(data[5]),
"blue": float(data[6]),
"ambient": float(data[7])
}
}
}
logging.debug(lora_payload)
except IndexError:
logging.error("IndexError: {}".format(payload))
except ValueError:
logging.error("ValueError: {}".format(payload))
# publish mqtt message
message_json = json.dumps(
lora_payload,
sort_keys=True,
indent=None,
separators=(',', ':'))
try:
mqtt_connection.publish(
topic=args.topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE)
except mqtt.SubscribeError as err:
logging.error(".SubscribeError: {}".format(err))
except exceptions.AwsCrtError as err:
logging.error("AwsCrtError: {}".format(err))
def set_mqtt_connection(args, client_bootstrap):
if args.use_websocket:
proxy_options = None
if args.proxy_host:
proxy_options = http.HttpProxyOptions(
host_name=args.proxy_host, port=args.proxy_port)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(
client_bootstrap)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=args.endpoint,
client_bootstrap=client_bootstrap,
region=args.signing_region,
credentials_provider=credentials_provider,
websocket_proxy_options=proxy_options,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
else:
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=args.endpoint,
cert_filepath=args.cert,
pri_key_filepath=args.key,
client_bootstrap=client_bootstrap,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
return mqtt_connection
def get_args():
parser = ArgumentParser(
description="Send and receive messages through and MQTT connection.")
parser.add_argument("–tty", required=True,
help="serial tty", default="/dev/ttyAMA0")
parser.add_argument("–baud-rate", required=True,
help="serial baud rate", default=1152000)
parser.add_argument('–endpoint', required=True, help="Your AWS IoT custom endpoint, not including a port. " +
"Ex: \"abcd123456wxyz-ats.iot.us-east-1.amazonaws.com\"")
parser.add_argument('–cert', help="File path to your client certificate, in PEM format.")
parser.add_argument('–key', help="File path to your private key, in PEM format.")
parser.add_argument('–root-ca', help="File path to root certificate authority, in PEM format. " +
"Necessary if MQTT server uses a certificate that's not already in " +
"your trust store.")
parser.add_argument('–client-id', default='samples-client-id',
help="Client ID for MQTT connection.")
parser.add_argument('–topic', default="samples/test",
help="Topic to subscribe to, and publish messages to.")
parser.add_argument('–message', default="Hello World!", help="Message to publish. " +
"Specify empty string to publish nothing.")
parser.add_argument('–count', default=0, type=int, help="Number of messages to publish/receive before exiting. " +
"Specify 0 to run forever.")
parser.add_argument('–use-websocket', default=False, action='store_true',
help="To use a websocket instead of raw mqtt. If you specify this option you must "
"specify a region for signing, you can also enable proxy mode.")
parser.add_argument('–signing-region', default='us-east-1',
help="If you specify –use-web-socket, this is the region that will be used for computing "
"the Sigv4 signature")
parser.add_argument('–proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " +
"you will likely need to set –root-ca to the ca for your proxy.")
parser.add_argument('–proxy-port', type=int, default=8080,
help="Port for proxy to connect to.")
parser.add_argument('–verbosity', choices=[x.name for x in io.LogLevel], default=io.LogLevel.NoLogs.name,
help='Logging level')
parser.add_argument("–gateway-id", help="IoT Gateway serial number")
args = parser.parse_args()
return args
def parse_payload(payload):
# input: +RCV=116,29,0447383033363932003C0034|23.94|37.71|99.89|16|38|53|80,-61,56
# output: [0447383033363932003C0034, 23.94, 37.71, 99.89, 16.0, 38.0, 53.0, 80.0]
payload = payload.split(",")
payload = payload[2].split("|")
payload = [i for i in payload]
return payload
def set_lora_config(serial_conn):
# configures the REYAX RYLR896 transceiver module
serial_conn.write(str.encode("AT+ADDRESS=" + str(ADDRESS) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID=" + str(NETWORK_ID) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network Id set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN=" + PASSWORD + "\r\n"))
time.sleep(1)
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES-128 password set? {}".format(serial_payload.decode(encoding="utf-8")))
def check_lora_config(serial_conn):
serial_conn.write(str.encode("AT?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Module responding? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+ADDRESS?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network id: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+IPR?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("UART baud rate: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+BAND?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF frequency: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CRFOP?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF output power: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+MODE?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Work mode: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+PARAMETER?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF parameters: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES128 password of the network: {}".format(serial_payload.decode(encoding="utf-8")))
# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
logging.error("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
logging.warning("Connection resumed. return_code: {} session_present: {}".format(
return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
logging.warning("Session did not persist. Resubscribing to existing topics…")
resubscribe_future, _ = connection.resubscribe_existing_topics()
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
logging.warning("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
logging.debug("Received message from topic '{}': {}".format(topic, payload))
global received_count
received_count += 1
if received_count == count:
received_all_event.set()
if __name__ == "__main__":
sys.exit(main())

Running the IoT Gateway Python Script

To run the Python script on the Raspberry Pi, we will use a helper shell script, rasppi_lora_receiver_aws.sh. The shell script helps construct the arguments required to execute the Python script.

#!/bin/bash
# Author: Gary A. Stafford
# Start IoT data collector script and tails output
# Usage:
# sh ./rasppi_lora_receiver_aws.sh \
# a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com
if [[ $# -ne 1 ]]; then
echo "Script requires 1 parameter!"
exit 1
fi
# input parameters
ENDPOINT=$1 # e.g. a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com
DEVICE="lora-iot-gateway-01" # matches CloudFormation thing name
CERTIFICATE="${DEVICE}-certificate.pem.crt" # e.g. lora-iot-gateway-01-certificate.pem.crt
KEY="${DEVICE}-private.pem.key" # e.g. lora-iot-gateway-01-private.pem.key
GATEWAY_ID=$(< /proc/cpuinfo grep Serial | grep -oh "[a-z0-9]*$") # e.g. 00000000f62051ce
# output for debugging
echo "DEVICE: ${DEVICE}"
echo "ENDPOINT: ${ENDPOINT}"
echo "CERTIFICATE: ${CERTIFICATE}"
echo "KEY: ${KEY}"
echo "GATEWAY_ID: ${GATEWAY_ID}"
# call the python script
nohup python3 rasppi_lora_receiver_aws.py \
–endpoint "${ENDPOINT}" \
–cert "${DEVICE}-creds/${CERTIFICATE}" \
–key "${DEVICE}-creds/${KEY}" \
–root-ca "${DEVICE}-creds/AmazonRootCA1.pem" \
–client-id "${DEVICE}" \
–topic "lora-iot-demo" \
–gateway-id "${GATEWAY_ID}" \
–verbosity "Info" \
–tty "/dev/ttyAMA0" \
–baud-rate 115200 \
>collector.log 2>&1 </dev/null &
sleep 2
# tail the log (Control-C to exit)
tail -f collector.log

To run the helper script, we execute the following command, substituting the input parameter, the AWS IoT endpoint, with your endpoint.

sh ./rasppi_lora_receiver_aws.sh \
  a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com

You should see the console output, similar to the following.

The script starts by configuring the RYLR896 module and outputting that configuration to a log file, output.log. If successful, we should see the following debug information logged.

DEBUG:root:Connecting to a1b2c3d4e5f6-ats.iot.us-east-1.amazonaws.com with client ID 'lora-iot-gateway-01'
DEBUG:root:Connecting to REYAX RYLR896 transceiver module
DEBUG:root:Connected!
DEBUG:root:Address set? +OK
DEBUG:root:Network Id set? +OK
DEBUG:root:AES-128 password set? +OK
DEBUG:root:Module responding? +OK
DEBUG:root:Address: +ADDRESS=116
DEBUG:root:Network id: +NETWORKID=6
DEBUG:root:UART baud rate: +IPR=115200
DEBUG:root:RF frequency: +BAND=915000000
DEBUG:root:RF output power: +CRFOP=15
DEBUG:root:Work mode: +MODE=0
DEBUG:root:RF parameters: +PARAMETER=12,7,1,4
DEBUG:root:AES128 password of the network: +CPIN=92A0ECEC9000DA0DCF0CAAB0ABA2E0EF

That sensor data is also written to the log file for debugging purposes. This first line in the log (shown below) is the raw decrypted data received from the IoT device via LoRaWAN. The second line is the JSON-serialized payload, sent securely to AWS, using the MQTT protocol.

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.46|41.89|99.38|230|692|833|1116,-48,39\r\n'

DEBUG:root:{'ts': 1598305503.7041512, 'data': {'humidity': 41.89, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 230.0, 'blue': 833.0, 'ambient': 1116.0, 'green': 692.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.46|41.63|99.38|236|696|837|1127,-49,35\r\n'

DEBUG:root:{'ts': 1598305513.7918658, 'data': {'humidity': 41.63, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 236.0, 'blue': 837.0, 'ambient': 1127.0, 'green': 696.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.44|41.57|99.38|232|686|830|1113,-48,32\r\n'

DEBUG:root:{'ts': 1598305523.8556132, 'data': {'humidity': 41.57, 'temperature': 23.44, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 232.0, 'blue': 830.0, 'ambient': 1113.0, 'green': 686.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.51|41.44|99.38|205|658|802|1040,-48,36\r\n'

DEBUG:root:{'ts': 1598305528.8890748, 'data': {'humidity': 41.44, 'temperature': 23.51, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 205.0, 'blue': 802.0, 'ambient': 1040.0, 'green': 658.0}}}

AWS IoT Core

The Raspberry Pi-based IoT gateway will be registered with AWS IoT Core. IoT Core allows users to connect devices quickly and securely to AWS.

Things

According to AWS, IoT Core can reliably scale to billions of devices and trillions of messages. Registered devices are referred to as things in AWS IoT Core. A thing is a representation of a specific device or logical entity. Information about a thing is stored in the registry as JSON data.

Below, we see an example of the Thing created by CloudFormation. The Thing, lora-iot-gateway-01, represents the physical IoT gateway. We have assigned the IoT gateway a Thing Type, LoRaIoTGateway, a Thing Group, LoRaIoTGateways, and a Thing Billing Group, IoTGateways.

In a real IoT environment, containing hundreds, thousands, even millions of IoT devices, gateways, and sensors, these classification mechanisms, Thing Type, Thing Group, and Thing Billing Group, will help to organize IoT assets.

Device Gateway and Message Broker

IoT Core provides a Device Gateway, which manages all active device connections. The Gateway currently supports MQTT, WebSockets, and HTTP 1.1 protocols. Behind the Message Gateway is a high-throughput pub/sub Message Broker, which securely transmits messages to and from all IoT devices and applications with low latency. Below, we see a typical AWS IoT Core architecture containing multiple Topics, Rules, and Actions.

AWS IoT Security

AWS IoT Core provides mutual authentication and encryption, ensuring all data is exchanged between AWS and the devices are secure by default. In the demonstration, all data is sent securely using Transport Layer Security (TLS) 1.2 with X.509 digital certificates on port 443. Below, we see an example of an X.509 certificate assigned to the Thing, lora-iot-gateway-01, which represents the physical IoT gateway. The X.509 certificate and the private key, generated using the AWS CLI, previously, are installed on the IoT gateway.

Authorization of the device to access any resource on AWS is controlled by AWS IoT Core Policies. These policies are similar to AWS IAM Policies. Below, we see an example of an AWS IoT Core Policy, LoRaDevicePolicy, which is assigned to the IoT gateway.

AWS IoT Core Rules

Once an MQTT message is received from the IoT gateway (a thing), we use AWS IoT Rules to send message data to an AWS IoT Analytics Channel. Rules give your devices the ability to interact with AWS services. Rules are analyzed, and Actions are performed based on the MQTT topic stream. Below, we see an example rule that forwards our messages to an IoT Analytics Channel.

Rule query statements are written in standard Structured Query Language (SQL). The datasource for the Rule query is an IoT Topic.

SELECT
data.device_id,
data.gateway_id,
data.temperature,
data.humidity,
data.pressure,
data.color.red,
data.color.green,
data.color.blue,
data.color.ambient,
ts,
Clientid () AS device,
parse_time ("yyyy-MM-dd'T'HH:mm:ss.SSSZ", timestamp(), "UTC") AS msg_received
FROM
"${IoTTopicName}"
view raw iot_rule.sql hosted with ❤ by GitHub

AWS IoT Analytics

AWS IoT Analytics is composed of five primary components: Channels, Pipelines, Data stores, Data sets, and Notebooks. These components enable you to collect, prepare, store, analyze, and visualize your IoT data.

Below, we see a typical AWS IoT Analytics architecture. IoT messages are received from AWS IoT Core, thought a Rule Action. Amazon QuickSight provides business intelligence, visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting.

IoT Analytics Channel

An AWS IoT Analytics Channel pulls messages or data into IoT Analytics from other AWS sources, such as Amazon S3, Amazon Kinesis, or Amazon IoT Core. Channels store data for IoT Analytics Pipelines. Both Channels and Data store support storing data in your own Amazon S3 bucket or an IoT Analytics service-managed S3 bucket. In the demonstration, we are using a service managed S3 bucket.

When creating a Channel, you also decide how long to retain the data. For the demonstration, we have set the data retention period for 21 days. Channels are generally not used for long term storage of data. Typically, you would only retain data in the Channel for the period you need to analyze. For long term storage of IoT message data, I recommend using an AWS IoT Core Rule to send a copy of the raw IoT data to Amazon S3, using a service such as Amazon Kinesis Data Firehose.

IoT Analytics Pipeline

An AWS IoT Analytics Pipeline consumes messages from one or more Channels. Pipelines transform, filter, and enrich the messages before storing them in IoT Analytics Data stores. A Pipeline is composed of an ordered list of activities. Logically, you must specify both a Channel (source) and a Datastore (destination) activity. Optionally, you may choose as many as 23 additional activities in the pipelineActivities array.

In our demonstration’s Pipeline, iot_analytics_pipeline, we have specified three additional activities, including DeviceRegistryEnrich, Filter, and Lambda. Other activity types include Math, SelectAttributes, RemoveAttributes, and AddAttributes.

The Filter activity ensures the sensor values are not Null or otherwise erroneous; if true, the message is dropped. The Lambda Pipeline activity executes an AWS Lambda function to transform the messages in the pipeline. Messages are sent in an event object to the Lambda. The message is modified, and the event object is returned to the activity.

The Python-based Lambda function easily handles typical IoT data transformation tasks, including converting the temperature from Celsius to Fahrenheit, pressure from kilopascals (kPa) to inches of Mercury (inHg), and 12-bit RGBA values to 8-bit color values (0–255). The Lambda function also rounds down all the values to between 0 and 2 decimal places of precision.

def lambda_handler(event, context):
for e in event:
e['temperature'] = round((e['temperature'] * 1.8) + 32, 2)
e['humidity'] = round(e['humidity'], 2)
e['pressure'] = round((e['pressure'] / 3.3864), 2)
e['red'] = int(round(e['red'] / (4097 / 255), 0))
e['green'] = int(round(e['green'] / (4097 / 255), 0))
e['blue'] = int(round(e['blue'] / (4097 / 255), 0))
e['ambient'] = int(round(e['ambient'] / (4097 / 255), 0))
return event

The demonstration’s Pipeline also enriches the IoT data with metadata from the IoT device’s AWS IoT Core Registry. The metadata includes additional information about the device that generated the IoT data, including the custom attributes such as LoRa transceiver manufacturer and model, and the IoT gateway manufacturer.

A notable feature of Pipelines is the ability to reprocess messages. If you make changes to the Pipeline, which often happens during the data preparation stage, you can reprocess any or all the IoT data in the associated Channel, and overwrite the IoT data in the Data set.

IoT Analytics Data store

An AWS IoT Analytics Data store stores prepared data from an AWS IoT Analytics Pipeline, in a fully-managed database. Both Channels and Data store support storing IoT data in your own Amazon S3 bucket or an IoT Analytics managed S3 bucket. In the demonstration, we are using a service-managed S3 bucket to store the IoT data in our Data store, iot_analytics_data_store.

IoT Analytics Data set

An AWS IoT Analytics Data set automatically provides regular, up-to-date insights for data analysts by querying a Data store using standard SQL. Periodic updates are implemented using a cron expression. For the demonstration, we are updating our Data set, iot_analytics_data_set, at a 15-minute interval. The time interval can be increased or reduced, depending on the desired ‘near real-time’ nature of the IoT data being analyzed.

Below, we see messages in the Result preview pane of the Data set. Note the SQL query used to obtain the messages, which queries the Data store. The Data store, as you will recall, contains the transformed messages from the Pipeline.

IoT Analytics Data sets also support sending content results, which are materialized views of your IoT Analytics data, to an Amazon S3 bucket.

The CloudFormation stack created an encrypted Amazon S3 Bucket. This bucket receives a copy of the messages from the IoT Analytics Data set whenever the cron expression runs the scheduled update.

IoT Analytics Notebook

An AWS IoT Analytics Notebook allows users to perform statistical analysis and machine learning on IoT Analytics Data sets using Jupyter Notebooks. The IoT Analytics Notebook service includes a set of notebook templates that contain AWS-authored machine learning models and visualizations. Notebook Instances can be linked to a GitHub or other source code repository. Notebooks created with IoT Analytics Notebook can also be accessed directly through Amazon SageMaker. For the demonstration, the Notebooks Instance is cloned from our project’s GitHub repository.

The repository contains a sample Jupyter Notebook, LoRa_IoT_Analytics_Demo.ipynb, based on the conda_python3 kernel. This preinstalled environment includes the default Anaconda installation and Python 3.

The Notebook uses pandas, matplotlib, and plotly to manipulate and visualize the sample IoT data stored in the IoT Analytics Data set.

The Notebook can be modified, and the changes pushed back to GitHub. You could easily fork the demonstration’s GitHub repository and modify the CloudFormation template to point to your source code repository.

Amazon QuickSight

Amazon QuickSight provides business intelligence (BI) and visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting. We can use Amazon QuickSight to visualize the IoT message data, stored in the IoT Analytics Data set.

Amazon QuickSight has both a Standard and an Enterprise Edition. AWS provides a detailed product comparison of each edition. For the post, I am demonstrating the Enterprise Edition, which includes additional features, such as ML Insights, hourly refreshes of SPICE (super-fast, parallel, in-memory, calculation engine), and theme customization.

Please be aware of the costs of Amazon QuickSight if you choose to follow along with this part of the demo. Although there is an Amazon QuickSight API, Amazon QuickSight is not automatically enabled or configured with CloudFormation or using the AWS CLI in this demonstration.

QuickSight Data Sets

Amazon QuickSight has a wide variety of data source options for creating Amazon QuickSight Data sets, including the ones shown below. Do not confuse Amazon QuickSight Data sets with IoT Analytics Data sets; they are two different service features.

For the demonstration, we will create an Amazon QuickSight Data set that will use our IoT Analytics Data set, iot_analytics_data_set.

Amazon QuickSight gives you the ability to view and modify QuickSight Data sets before visualizing. QuickSight even provides a wide variety of functions, enabling us to perform dynamic calculations on the field values. For this demonstration, we will leave the data unchanged since all transformations were already completed in the IoT Analytics Pipeline.

QuickSight Analysis

Using the QuickSight Data set, built from the IoT Analytics Data set as a data source, we create a QuickSight Analysis. The QuickSight Analysis console is shown below. An Analysis is primarily a collection of Visuals (aka Visual types). QuickSight provides several Visual types. Each visual is associated with a Data set. Data for the QuickSight Analysis or each visual within the Analysis can be filtered. For the demo, I have created a simple QuickSight Analysis, including a few typical QuickSight visuals.

QuickSight Dashboards

To share a QuickSight Analysis, we can create a QuickSight Dashboard. Below, we see a few views of the QuickSight Analysis, shown above, as a Dashboard. Although viewers of the Dashboard cannot edit the visuals, they can apply filtering and interactively drill-down into data in the Visuals.

Amazon QuickSight ML Insights

According to Amazon, ML Insights leverages AWS’s machine learning (ML) and natural language capabilities to gain deeper insights from data. QuickSight’s ML-powered Anomaly Detection continuously analyze data to discover anomalies and variations inside of the aggregates, giving you the insights to act when business changes occur. QuickSight’s ML-powered Forecasting can be used to predict your business metrics accurately, and perform interactive what-if analysis with point-and-click simplicity. QuickSight’s built-in algorithms make it easy for anyone to use ML that learns from your data patterns to provide you with accurate predictions based on historical trends.

Below, we see the ML Insights tab (left) in the demonstration’s QuickSight Analysis. Individually detected anomalies can be added to the QuickSight Analysis, like Visuals, and configured to tune the detection parameters. Observe the temperature, humidity, and barometric pressure anomalies, identified by ML Insights, based on their Anomaly Score, which is higher or lower, given a minimum delta of five percent. These anomalies accurately reflected an actual failure of the IoT device, caused by overheated during testing, which resulted in abnormal sensor readings.

Receiving the Messages on AWS

To confirm the IoT gateway is sending messages, we can use a packet analyzer, like tcpdump, on the IoT gateway. Running tcpdump on the IoT gateway, below, we see outbound encrypted MQTT messages being sent to AWS on port 443.

To confirm those messages are being received from the IoT gateway on AWS, we can use the AWS IoT Core Test feature and subscribe to the lora-iot-demo topic. We should see messages flowing in from the IoT gateway at approximately 5-second intervals.

The JSON payload structure of the incoming MQTT messages will look similar to the below example. The device_id is the unique id of the IoT device that transmitted the message using LoRaWAN. The gateway_id is the unique id of the IoT gateway that received the message using LoRaWAN and sent it to AWS. A single IoT gateway would usually manage messages from multiple IoT devices, each with a unique id.

{
"data": {
"color": {
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281
},
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6
},
"ts": 1598543131.9861386
}

The SQL query used by the AWS IoT Rule described earlier, transforms and flattens the nested JSON payload structure, before passing it to the AWS IoT Analytics Channel, as shown below.

{
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281,
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6,
"ts": 1598543131.9861386,
"msg_received": "2020-08-27T11:45:32.074+0000",
"device": "lora-iot-gateway-01"
}

We can measure the near real-time nature of the IoT data using the ts and msg_received data fields. The ts data field is date and time when the sensor reading occurred on the IoT device, while the msg_received data field is the date and time when the message was received on AWS. The delta between the two values is a measure of how near real-time the sensor readings are being streamed to the AWS IoT Analytics Channel. In the below example, the difference between ts (2020–08–27T11:45:31.986) and msg_received (2020–08–27T11:45:32.074) is 88 ms.

Final IoT Data Message Structure

Once the message payload passes through the AWS IoT Analytics Pipeline and lands in the AWS IoT Analytics Data set, its final data structure looks as follows. Note that the device’s attribute metadata has been added from the AWS IoT Core device registry. Regrettably, the metadata is not well-formatted JSON and will require additional transformation to be usable.

{
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"temperature": 74.48,
"humidity": 45.73,
"pressure": 29.13,
"red": 17,
"green": 42,
"blue": 40,
"ambient": 66,
"ts": 1598543131.9861386,
"device": "lora-iot-gateway-01",
"msg_received": "2020-08-27T15:45:32.024+0000",
"metadata": {
"defaultclientid": "lora-iot-gateway-01",
"thingname": "lora-iot-gateway-01",
"thingid": "017db4b8-7fca-4617-aa58-7125dd94ab36",
"thingarn": "arn:aws:iot:us-east-1:123456789012:thing/lora-iot-gateway-01",
"thingtypename": "LoRaIoTGateway",
"attributes": {
"loramfr": "REYAX",
"gatewaymfr": "RaspberryPiFoundation",
"loramodel": "RYLR896"
},
"version": "2",
"billinggroupname": "LoRaIoTGateways"
},
"__dt": "2020-08-27 00:00:00.000"
}

A set of sample messages is included in the GitHub project’s sample_messages directory.

Conclusion

In this post, we explored the use of the LoRa and LoRaWAN protocols to transmit environmental sensor data from an IoT device to an IoT gateway. Given its low energy consumption, long-distance transmission capabilities, and well-developed protocols, LoRaWAN is an ideal long-range wireless protocol for IoT devices. We then demonstrated how to use AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics to securely collect, analyze, and visualize streaming messages from the IoT device, in near real-time.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , ,

Leave a comment

Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 1

Introduction

According to Wikipedia, data analysis is “a process of inspecting, cleansing, transforming, and modeling data with the goal of discovering useful information, informing conclusion, and supporting decision-making.” In this two-part post, we will explore how to get started with data analysis on AWS, using the serverless capabilities of Amazon Athena, AWS Glue, Amazon QuickSight, Amazon S3, and AWS Lambda. We will learn how to use these complementary services to transform, enrich, analyze, and visualize semi-structured data.

Data Analysis—discovering useful information, informing conclusion, and supporting decision-making. –Wikipedia

In part one, we will begin with raw, semi-structured data in multiple formats. We will discover how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We will build an S3-based data lake, and learn how AWS leverages open-source technologies, such as Presto, Apache Hive, and Apache Parquet. In part two, we will learn how to further analyze and visualize the data using Amazon QuickSight. Here’s a quick preview of what we will build in part one of the post.

Demonstration

In this demonstration, we will adopt the persona of a large, US-based electric energy provider. The energy provider has developed its next-generation Smart Electrical Monitoring Hub (Smart Hub). They have sold the Smart Hub to a large number of residential customers throughout the United States. The hypothetical Smart Hub wirelessly collects detailed electrical usage data from individual, smart electrical receptacles and electrical circuit meters, spread throughout the residence. Electrical usage data is encrypted and securely transmitted from the customer’s Smart Hub to the electric provider, who is running their business on AWS.

Customers are able to analyze their electrical usage with fine granularity, per device, and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.

screen_shot_2020-01-13_at_7_57_47_pm_v4.pngPreview of post’s data in Amazon QuickSight.

The original concept for the Smart Hub was developed as part of a multi-day training and hackathon, I recently attended with an AWSome group of AWS Solutions Architects in San Francisco. As a team, we developed the concept of the Smart Hub integrated with a real-time, serverless, streaming data architecture, leveraging AWS IoT Core, Amazon Kinesis, AWS Lambda, and Amazon DynamoDB.

SA_Team_PhotoFrom left: Bruno Giorgini, Mahalingam (‘Mahali’) Sivaprakasam, Gary Stafford, Amit Kumar Agrawal, and Manish Agarwal.

This post will focus on data analysis, as opposed to the real-time streaming aspect of data capture or how the data is persisted on AWS.

athena-glue-architecture-v2High-level AWS architecture diagram of the demonstration.

Featured Technologies

The following AWS services and open-source technologies are featured prominently in this post.

Athena-Glue-v2.png

Amazon S3-based Data Lake

Screen Shot 2020-01-02 at 5.09.05 PMAn Amazon S3-based Data Lake uses Amazon S3 as its primary storage platform. Amazon S3 provides an optimal foundation for a data lake because of its virtually unlimited scalability, from gigabytes to petabytes of content. Amazon S3 provides ‘11 nines’ (99.999999999%) durability. It has scalable performance, ease-of-use features, and native encryption and access control capabilities.

AWS Glue

Screen Shot 2020-01-02 at 5.11.37 PMAWS Glue is a fully managed extract, transform, and load (ETL) service to prepare and load data for analytics. AWS Glue discovers your data and stores the associated metadata (e.g., table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

AWS Glue Data Catalog

Screen Shot 2020-01-02 at 5.13.01 PM.pngThe AWS Glue Data Catalog is an Apache Hive Metastore compatible, central repository to store structural and operational metadata for data assets. For a given data set, store table definition, physical location, add business-relevant attributes, as well as track how the data has changed over time.

AWS Glue Crawler

Screen Shot 2020-01-02 at 5.14.57 PMAn AWS Glue Crawler connects to a data store, progresses through a prioritized list of classifiers to extract the schema of your data and other statistics, and then populates the Glue Data Catalog with this metadata. Crawlers can run periodically to detect the availability of new data as well as changes to existing data, including table definition changes. Crawlers automatically add new tables, new partitions to an existing table, and new versions of table definitions. You can even customize Glue Crawlers to classify your own file types.

AWS Glue ETL Job

Screen Shot 2020-01-02 at 5.11.37 PMAn AWS Glue ETL Job is the business logic that performs extract, transform, and load (ETL) work in AWS Glue. When you start a job, AWS Glue runs a script that extracts data from sources, transforms the data, and loads it into targets. AWS Glue generates a PySpark or Scala script, which runs on Apache Spark.

Amazon Athena

Screen Shot 2020-01-02 at 5.17.49 PMAmazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena supports and works with a variety of standard data formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet. Athena is integrated, out-of-the-box, with AWS Glue Data Catalog. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

The underlying technology behind Amazon Athena is Presto, the open-source distributed SQL query engine for big data, created by Facebook. According to the AWS, the Athena query engine is based on Presto 0.172 (released April 9, 2017). In addition to Presto, Athena uses Apache Hive to define tables.

Amazon QuickSight

Screen Shot 2020-01-02 at 5.18.40 PMAmazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that can then be accessed from any device, and embedded into your applications, portals, and websites.

AWS Lambda

Screen Shot 2020-01-02 at 5.25.57 PMAWS Lambda automatically runs code without requiring the provisioning or management servers. AWS Lambda automatically scales applications by running code in response to triggers. Lambda code runs in parallel. With AWS Lambda, you are charged for every 100ms your code executes and the number of times your code is triggered. You pay only for the compute time you consume.

Smart Hub Data

Everything in this post revolves around data. For the post’s demonstration, we will start with four categories of raw, synthetic data. Those data categories include Smart Hub electrical usage data, Smart Hub sensor mapping data, Smart Hub residential locations data, and electrical rate data. To demonstrate the capabilities of AWS Glue to handle multiple data formats, the four categories of raw data consist of three distinct file formats: XML, JSON, and CSV. I have attempted to incorporate as many ‘real-world’ complexities into the data without losing focus on the main subject of the post. The sample datasets are intentionally small to keep your AWS costs to a minimum for the demonstration.

To further reduce costs, we will use a variety of data partitioning schemes. According to AWS, by partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost. We have very little data for the demonstration, in which case partitioning may negatively impact query performance. However, in a ‘real-world’ scenario, there would be millions of potential residential customers generating terabytes of data. In that case, data partitioning would be essential for both cost and performance.

Smart Hub Electrical Usage Data

The Smart Hub’s time-series electrical usage data is collected from the customer’s Smart Hub. In the demonstration’s sample electrical usage data, each row represents a completely arbitrary five-minute time interval. There are a total of ten electrical sensors whose electrical usage in kilowatt-hours (kW) is recorded and transmitted. Each Smart Hub records and transmits electrical usage for 10 device sensors, 288 times per day (24 hr / 5 min intervals), for a total of 2,880 data points per day, per Smart Hub. There are two days worth of usage data for the demonstration, for a total of 5,760 data points. The data is stored in JSON Lines format. The usage data will be partitioned in the Amazon S3-based data lake by date (e.g., ‘dt=2019-12-21’).

{"loc_id":"b6a8d42425fde548","ts":1576915200,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04167}}
{"loc_id":"b6a8d42425fde548","ts":1576915500,"data":{"s_01":0,"s_02":0.00552,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04147}}
{"loc_id":"b6a8d42425fde548","ts":1576915800,"data":{"s_01":0.29267,"s_02":0.00642,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04207}}
{"loc_id":"b6a8d42425fde548","ts":1576916100,"data":{"s_01":0.29207,"s_02":0.00592,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04137}}
{"loc_id":"b6a8d42425fde548","ts":1576916400,"data":{"s_01":0.29217,"s_02":0.00622,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04157}}
{"loc_id":"b6a8d42425fde548","ts":1576916700,"data":{"s_01":0,"s_02":0.00562,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04197}}
{"loc_id":"b6a8d42425fde548","ts":1576917000,"data":{"s_01":0,"s_02":0.00512,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04257}}
{"loc_id":"b6a8d42425fde548","ts":1576917300,"data":{"s_01":0,"s_02":0.00522,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04177}}
{"loc_id":"b6a8d42425fde548","ts":1576917600,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04267}}
{"loc_id":"b6a8d42425fde548","ts":1576917900,"data":{"s_01":0,"s_02":0.00612,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04237}}

view raw
smart_data.json
hosted with ❤ by GitHub

Note the electrical usage data contains nested data. The electrical usage for each of the ten sensors is contained in a JSON array, within each time series entry. The array contains ten numeric values of type, double.

{
"loc_id": "b6a8d42425fde548",
"ts": 1576916400,
"data": {
"s_01": 0.29217,
"s_02": 0.00622,
"s_03": 0,
"s_04": 0,
"s_05": 0,
"s_06": 0,
"s_07": 0,
"s_08": 0,
"s_09": 0,
"s_10": 0.04157
}
}

Real data is often complex and deeply nested. Later in the post, we will see that AWS Glue can map many common data types, including nested data objects, as illustrated below.

screen_shot_2020-01-05_at_7_46_19_am

Smart Hub Sensor Mappings

The Smart Hub sensor mappings data maps a sensor column in the usage data (e.g., ‘s_01’ to the corresponding actual device (e.g., ‘Central Air Conditioner’). The data contains the device location, wattage, and the last time the record was modified. The data is also stored in JSON Lines format. The sensor mappings data will be partitioned in the Amazon S3-based data lake by the state of the residence (e.g., ‘state=or’ for Oregon).

{"loc_id":"b6a8d42425fde548","id":"s_01","description":"Central Air Conditioner","location":"N/A","watts":3500,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_02","description":"Ceiling Fan","location":"Master Bedroom","watts":65,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_03","description":"Clothes Dryer","location":"Basement","watts":5000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_04","description":"Clothes Washer","location":"Basement","watts":1800,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_05","description":"Dishwasher","location":"Kitchen","watts":900,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_06","description":"Flat Screen TV","location":"Living Room","watts":120,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_07","description":"Microwave Oven","location":"Kitchen","watts":1000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_08","description":"Coffee Maker","location":"Kitchen","watts":900,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_09","description":"Hair Dryer","location":"Master Bathroom","watts":2000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_10","description":"Refrigerator","location":"Kitchen","watts":500,"last_modified":1559347200}

view raw
sensor_mappings.json
hosted with ❤ by GitHub

Smart Hub Locations

The Smart Hub locations data contains the geospatial coordinates, home address, and timezone for each residential Smart Hub. The data is stored in CSV format. The data for the four cities included in this demonstration originated from OpenAddresses, ‘the free and open global address collection.’ There are approximately 4k location records. The location data will be partitioned in the Amazon S3-based data lake by the state of the residence where the Smart Hub is installed (e.g., ‘state=or’ for Oregon).


lon lat number street unit city district region postcode id hash tz
-122.8077278 45.4715614 6635 SW JUNIPER TER 97008 b6a8d42425fde548 America/Los_Angeles
-122.8356634 45.4385864 11225 SW PINTAIL LOOP 97007 08ae3df798df8b90 America/Los_Angeles
-122.8252379 45.4481709 9930 SW WRANGLER PL 97008 1c7e1f7df752663e America/Los_Angeles
-122.8354211 45.4535977 9174 SW PLATINUM PL 97007 b364854408ee431e America/Los_Angeles
-122.8315771 45.4949449 15040 SW MILLIKAN WAY # 233 97003 0e97796ba31ba3b4 America/Los_Angeles
-122.7950339 45.4470259 10006 SW CONESTOGA DR # 113 97008 2b5307be5bfeb026 America/Los_Angeles
-122.8072836 45.4908594 12600 SW CRESCENT ST # 126 97005 4d74167f00f63f50 America/Los_Angeles
-122.8211801 45.4689303 7100 SW 140TH PL 97008 c5568631f0b9de9c America/Los_Angeles
-122.831154 45.4317057 15050 SW MALLARD DR # 101 97007 dbd1321080ce9682 America/Los_Angeles
-122.8162856 45.4442878 10460 SW 136TH PL 97008 008faab8a9a3e519 America/Los_Angeles

Electrical Rates

Lastly, the electrical rate data contains the cost of electricity. In this demonstration, the assumption is that the rate varies by state, by month, and by the hour of the day. The data is stored in XML, a data export format still common to older, legacy systems. The electrical rate data will not be partitioned in the Amazon S3-based data lake.

<?xml version="1.0" encoding="UTF-8"?>
<root>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>19:00:00</from>
<to>19:59:59</to>
<type>peak</type>
<rate>12.623</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>20:00:00</from>
<to>20:59:59</to>
<type>partial-peak</type>
<rate>7.232</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>21:00:00</from>
<to>21:59:59</to>
<type>partial-peak</type>
<rate>7.232</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>22:00:00</from>
<to>22:59:59</to>
<type>off-peak</type>
<rate>4.209</rate>
</row>
</root>

view raw
rates.xml
hosted with ❤ by GitHub

Data Analysis Process

Due to the number of steps involved in the data analysis process in the demonstration, I have divided the process into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).

athena-glue-0.pngFull data analysis workflow diagram (click to enlarge…)

Raw Data Ingestion

In the Raw Data Ingestion stage, semi-structured CSV-, XML-, and JSON-format data files are copied to a secure Amazon Simple Storage Service (S3) bucket. Within the bucket, data files are organized into folders based on their physical data structure (schema). Due to the potentially unlimited number of data files, files are further organized (partitioned) into subfolders. Organizational strategies for data files are based on date, time, geographic location, customer id, or other common data characteristics.

This collection of semi-structured data files, S3 buckets, and partitions form what is referred to as a Data Lake. According to AWS, a data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.

A series of AWS Glue Crawlers process the raw CSV-, XML-, and JSON-format files, extracting metadata, and creating table definitions in the AWS Glue Data Catalog. According to AWS, an AWS Glue Data Catalog contains metadata tables, where each table specifies a single data store.

Athena-Glue-1

Data Transformation

In the Data Transformation stage, the raw data in the previous stage is transformed. Data transformation may include both modifying the data and changing the data format. Data modifications include data cleansing, re-casting data types, changing date formats, field-level computations, and field concatenation.

The data is then converted from CSV-, XML-, and JSON-format to Apache Parquet format and written back to the Amazon S3-based data lake. Apache Parquet is a compressed, efficient columnar storage format. Amazon Athena, like many Cloud-based services, charges you by the amount of data scanned per query. Hence, using data partitioning, bucketing, compression, and columnar storage formats, like Parquet, will reduce query cost.

Lastly, the transformed Parquet-format data is cataloged to new tables, alongside the raw CSV, XML, and JSON data, in the Glue Data Catalog.

Athena-Glue-2

Data Enrichment

According to ScienceDirect, data enrichment or augmentation is the process of enhancing existing information by supplementing missing or incomplete data. Typically, data enrichment is achieved by using external data sources, but that is not always the case.

Data Enrichment—the process of enhancing existing information by supplementing missing or incomplete data. –ScienceDirect

In the Data Enrichment stage, the Parquet-format Smart Hub usage data is augmented with related data from the three other data sources: sensor mappings, locations, and electrical rates. The customer’s Smart Hub usage data is enriched with the customer’s device types, the customer’s timezone, and customer’s electricity cost per monitored period based on the customer’s geographic location and time of day.

Athena-Glue-3a

Once the data is enriched, it is converted to Parquet and optimized for query performance, stored in the data lake, and cataloged. At this point, the original CSV-, XML-, and JSON-format raw data files, the transformed Parquet-format data files, and the Parquet-format enriched data files are all stored in the Amazon S3-based data lake and cataloged in the Glue Data Catalog.

Athena-Glue-3b

Data Visualization

In the final Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for visualization and Business Intelligence, which integrate with Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo. In this demonstration, we will focus on Amazon QuickSight.

Athena-Glue-4

Getting Started

Requirements

To follow along with the demonstration, you will need an AWS Account and a current version of the AWS CLI. To get the most from the demonstration, you should also have Python 3 and jq installed in your work environment.

Source Code

All source code for this post can be found on GitHub. Use the following command to clone a copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/athena-glue-quicksight-demo.git

view raw
git_clone.sh
hosted with ❤ by GitHub

Source code samples in this post are displayed as GitHub Gists, which will not display correctly on some mobile and social media browsers.

TL;DR?

Just want the jump in without reading the instructions? All the AWS CLI commands, found within the post, are consolidated in the GitHub project’s README file.

CloudFormation Stack

To start, create the ‘smart-hub-athena-glue-stack’ CloudFormation stack using the smart-hub-athena-glue.yml template. The template will create (3) Amazon S3 buckets, (1) AWS Glue Data Catalog Database, (5) Data Catalog Database Tables, (6) AWS Glue Crawlers, (1) AWS Glue ETL Job, and (1) IAM Service Role for AWS Glue.

Make sure to change the DATA_BUCKET, SCRIPT_BUCKET, and LOG_BUCKET variables, first, to your own unique S3 bucket names. I always suggest using the standard AWS 3-part convention of 1) descriptive name, 2) AWS Account ID or Account Alias, and 3) AWS Region, to name your bucket (e.g. ‘smart-hub-data-123456789012-us-east-1’).

# *** CHANGE ME ***
BUCKET_SUFFIX="123456789012-us-east-1"
DATA_BUCKET="smart-hub-data-${BUCKET_SUFFIX}"
SCRIPT_BUCKET="smart-hub-scripts-${BUCKET_SUFFIX}"
LOG_BUCKET="smart-hub-logs-${BUCKET_SUFFIX}"
aws cloudformation create-stack \
–stack-name smart-hub-athena-glue-stack \
–template-body file://cloudformation/smart-hub-athena-glue.yml \
–parameters ParameterKey=DataBucketName,ParameterValue=${DATA_BUCKET} \
ParameterKey=ScriptBucketName,ParameterValue=${SCRIPT_BUCKET} \
ParameterKey=LogBucketName,ParameterValue=${LOG_BUCKET} \
–capabilities CAPABILITY_NAMED_IAM

view raw
step1-2.sh
hosted with ❤ by GitHub

Raw Data Files

Next, copy the raw CSV-, XML-, and JSON-format data files from the local project to the DATA_BUCKET S3 bucket (steps 1a-1b in workflow diagram). These files represent the beginnings of the S3-based data lake. Each category of data uses a different strategy for organizing and separating the files. Note the use of the Apache Hive-style partitions (e.g., /smart_hub_data_json/dt=2019-12-21). As discussed earlier, the assumption is that the actual, large volume of data in the data lake would necessitate using partitioning to improve query performance.

# location data
aws s3 cp data/locations/denver_co_1576656000.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=co/
aws s3 cp data/locations/palo_alto_ca_1576742400.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ca/
aws s3 cp data/locations/portland_metro_or_1576742400.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=or/
aws s3 cp data/locations/stamford_ct_1576569600.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ct/
# sensor mapping data
aws s3 cp data/mappings/ \
s3://${DATA_BUCKET}/sensor_mappings_json/state=or/ \
–recursive
# electrical usage data
aws s3 cp data/usage/2019-12-21/ \
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-21/ \
–recursive
aws s3 cp data/usage/2019-12-22/ \
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-22/ \
–recursive
# electricity rates data
aws s3 cp data/rates/ \
s3://${DATA_BUCKET}/electricity_rates_xml/ \
–recursive

view raw
step3.sh
hosted with ❤ by GitHub

Confirm the contents of the DATA_BUCKET S3 bucket with the following command.

aws s3 ls s3://${DATA_BUCKET}/ \
–recursive –human-readable –summarize

view raw
step3.sh
hosted with ❤ by GitHub

There should be a total of (14) raw data files in the DATA_BUCKET S3 bucket.

2020-01-04 14:39:51 20.0 KiB electricity_rates_xml/2019_12_1575270000.xml
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/08ae3df798df8b90_1550908800.json
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/1c7e1f7df752663e_1559347200.json
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/b6a8d42425fde548_1568314800.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/08ae3df798df8b90_1576915200.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/1c7e1f7df752663e_1576915200.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/b6a8d42425fde548_1576915200.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/08ae3df798df8b90_15770016000.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/1c7e1f7df752663e_1577001600.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/b6a8d42425fde548_15770016001.json
2020-01-04 14:39:39 89.7 KiB smart_hub_locations_csv/state=ca/palo_alto_ca_1576742400.csv
2020-01-04 14:39:37 84.2 KiB smart_hub_locations_csv/state=co/denver_co_1576656000.csv
2020-01-04 14:39:44 78.6 KiB smart_hub_locations_csv/state=ct/stamford_ct_1576569600.csv
2020-01-04 14:39:42 91.6 KiB smart_hub_locations_csv/state=or/portland_metro_or_1576742400.csv
Total Objects: 14
Total Size: 636.7 KiB

view raw
raw_data_files.txt
hosted with ❤ by GitHub

Lambda Functions

Next, package the (5) Python3.8-based AWS Lambda functions for deployment.

pushd lambdas/athena-json-to-parquet-data || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-csv-to-parquet-locations || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-json-to-parquet-mappings || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-complex-etl-query || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-parquet-to-parquet-elt-data || exit
zip -r package.zip index.py
popd || exit

view raw
step4.sh
hosted with ❤ by GitHub

Copy the five Lambda packages to the SCRIPT_BUCKET S3 bucket. The ZIP archive Lambda packages are accessed by the second CloudFormation stack, smart-hub-serverless. This CloudFormation stack, which creates the Lambda functions, will fail to deploy if the packages are not found in the SCRIPT_BUCKET S3 bucket.

I have chosen to place the packages in a different S3 bucket then the raw data files. In a real production environment, these two types of files would be separated, minimally, into separate buckets for security. Remember, only data should go into the data lake.

aws s3 cp lambdas/athena-json-to-parquet-data/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_data/
aws s3 cp lambdas/athena-csv-to-parquet-locations/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_csv_to_parquet_locations/
aws s3 cp lambdas/athena-json-to-parquet-mappings/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_mappings/
aws s3 cp lambdas/athena-complex-etl-query/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_complex_etl_query/
aws s3 cp lambdas/athena-parquet-to-parquet-elt-data/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_parquet_to_parquet_elt_data/

view raw
step5.sh
hosted with ❤ by GitHub

Create the second ‘smart-hub-lambda-stack’ CloudFormation stack using the smart-hub-lambda.yml CloudFormation template. The template will create (5) AWS Lambda functions and (1) Lambda execution IAM Service Role.

aws cloudformation create-stack \
–stack-name smart-hub-lambda-stack \
–template-body file://cloudformation/smart-hub-lambda.yml \
–capabilities CAPABILITY_NAMED_IAM

view raw
step6.sh
hosted with ❤ by GitHub

At this point, we have deployed all of the AWS resources required for the demonstration using CloudFormation. We have also copied all of the raw CSV-, XML-, and JSON-format data files in the Amazon S3-based data lake.

AWS Glue Crawlers

If you recall, we created five tables in the Glue Data Catalog database as part of the CloudFormation stack. One table for each of the four raw data types and one table to hold temporary ELT data later in the demonstration. To confirm the five tables were created in the Glue Data Catalog database, use the Glue Data Catalog Console, or run the following AWS CLI / jq command.

aws glue get-tables \
–database-name smart_hub_data_catalog \
| jq -r '.TableList[].Name'

view raw
step8.sh
hosted with ❤ by GitHub

The five data catalog tables should be as follows.

electricity_rates_xml
etl_tmp_output_parquet
sensor_mappings_json
smart_hub_data_json
smart_hub_locations_csv

view raw
step8.txt
hosted with ❤ by GitHub

We also created six Glue Crawlers as part of the CloudFormation template. Four of these Crawlers are responsible for cataloging the raw CSV-, XML-, and JSON-format data from S3 into the corresponding, existing Glue Data Catalog database tables. The Crawlers will detect any new partitions and add those to the tables as well. Each Crawler corresponds to one of the four raw data types. Crawlers can be scheduled to run periodically, cataloging new data and updating data partitions. Crawlers will also create a Data Catalog database tables. We use Crawlers to create new tables, later in the post.

Run the four Glue Crawlers using the AWS CLI (step 1c in workflow diagram).

aws glue start-crawler –name smart-hub-locations-csv
aws glue start-crawler –name smart-hub-sensor-mappings-json
aws glue start-crawler –name smart-hub-data-json
aws glue start-crawler –name smart-hub-rates-xml

view raw
step7.sh
hosted with ❤ by GitHub

You can check the Glue Crawler Console to ensure the four Crawlers finished successfully.

screen_shot_2020-01-03_at_3_05_29_pm

Alternately, use another AWS CLI / jq command.

aws glue get-crawler-metrics \
| jq -r '.CrawlerMetricsList[] | "\(.CrawlerName): \(.StillEstimating), \(.TimeLeftSeconds)"' \
| grep "^smart-hub-[A-Za-z-]*"

view raw
step8.sh
hosted with ❤ by GitHub

When complete, all Crawlers should all be in a state of ‘Still Estimating = false’ and ‘TimeLeftSeconds = 0’. In my experience, the Crawlers can take up one minute to start, after the estimation stage, and one minute to stop when complete.

smart-hub-data-json: true, 0
smart-hub-etl-tmp-output-parquet: false, 0
smart-hub-locations-csv: false, 15
smart-hub-rates-parquet: false, 0
smart-hub-rates-xml: false, 15
smart-hub-sensor-mappings-json: false, 15

view raw
step8.txt
hosted with ❤ by GitHub

Successfully running the four Crawlers completes the Raw Data Ingestion stage of the demonstration.

Converting to Parquet with CTAS

With the Raw Data Ingestion stage completed, we will now transform the raw Smart Hub usage data, sensor mapping data, and locations data into Parquet-format using three AWS Lambda functions. Each Lambda subsequently calls Athena, which executes a CREATE TABLE AS SELECT SQL statement (aka CTAS) . Each Lambda executes a similar command, varying only by data source, data destination, and partitioning scheme. Below, is an example of the command used for the Smart Hub electrical usage data, taken from the Python-based Lambda, athena-json-to-parquet-data/index.py.

query = \
"CREATE TABLE IF NOT EXISTS " + data_catalog + "." + output_directory + " " \
"WITH ( " \
" format = 'PARQUET', " \
" parquet_compression = 'SNAPPY', " \
" partitioned_by = ARRAY['dt'], " \
" external_location = 's3://" + data_bucket + "/" + output_directory + "' " \
") AS " \
"SELECT * " \
"FROM " + data_catalog + "." + input_directory + ";"

view raw
athena_command.py
hosted with ❤ by GitHub

This compact, yet powerful CTAS statement converts a copy of the raw JSON- and CSV-format data files into Parquet-format, and partitions and stores the resulting files back into the S3-based data lake. Additionally, the CTAS SQL statement catalogs the Parquet-format data files into the Glue Data Catalog database, into new tables. Unfortunately, this method will not work for the XML-format raw data files, which we will tackle next.

The five deployed Lambda functions should be visible from the Lambda Console’s Functions tab.

screen_shot_2020-01-04_at_5_57_31_pm

Invoke the three Lambda functions using the AWS CLI. (part of step 2a in workflow diagram).

aws lambda invoke \
–function-name athena-json-to-parquet-data \
response.json
aws lambda invoke \
–function-name athena-csv-to-parquet-locations \
response.json
aws lambda invoke \
–function-name athena-json-to-parquet-mappings \
response.json

view raw
step9.sh
hosted with ❤ by GitHub

Here is an example of the same CTAS command, shown above for the Smart Hub electrical usage data, as it is was executed successfully by Athena.

CREATE TABLE IF NOT EXISTS smart_hub_data_catalog.smart_hub_data_parquet
WITH (format = 'PARQUET',
parquet_compression = 'SNAPPY',
partitioned_by = ARRAY['dt'],
external_location = 's3://smart-hub-data-demo-account-1-us-east-1/smart_hub_data_parquet')
AS
SELECT *
FROM smart_hub_data_catalog.smart_hub_data_json

view raw
athena_command.sql
hosted with ❤ by GitHub

We can view any Athena SQL query from the Athena Console’s History tab. Clicking on a query (in pink) will copy it to the Query Editor tab and execute it. Below, we see the three SQL statements executed by the Lamba functions.

screen_shot_2020-01-04_at_7_08_32_pm

AWS Glue ETL Job for XML

If you recall, the electrical rate data is in XML format. The Lambda functions we just executed, converted the CSV and JSON data to Parquet using Athena. Currently, unlike CSV, JSON, ORC, Parquet, and Avro, Athena does not support the older XML data format. For the XML data files, we will use an AWS Glue ETL Job to convert the XML data to Parquet. The Glue ETL Job is written in Python and uses Apache Spark, along with several AWS Glue PySpark extensions. For this job, I used an existing script created in the Glue ETL Jobs Console as a base, then modified the script to meet my needs.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
's3_output_path',
'source_glue_database',
'source_glue_table'
])
s3_output_path = args['s3_output_path']
source_glue_database = args['source_glue_database']
source_glue_table = args['source_glue_table']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext. \
create_dynamic_frame. \
from_catalog(database=source_glue_database,
table_name=source_glue_table,
transformation_ctx="datasource0")
applymapping1 = ApplyMapping.apply(
frame=datasource0,
mappings=[("from", "string", "from", "string"),
("to", "string", "to", "string"),
("type", "string", "type", "string"),
("rate", "double", "rate", "double"),
("year", "int", "year", "int"),
("month", "int", "month", "int"),
("state", "string", "state", "string")],
transformation_ctx="applymapping1")
resolvechoice2 = ResolveChoice.apply(
frame=applymapping1,
choice="make_struct",
transformation_ctx="resolvechoice2")
dropnullfields3 = DropNullFields.apply(
frame=resolvechoice2,
transformation_ctx="dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_options(
frame=dropnullfields3,
connection_type="s3",
connection_options={
"path": s3_output_path,
"partitionKeys": ["state"]
},
format="parquet",
transformation_ctx="datasink4")
job.commit()

The three Python command-line arguments the script expects (lines 10–12, above) are defined in the CloudFormation template, smart-hub-athena-glue.yml. Below, we see them on lines 10–12 of the CloudFormation snippet. They are injected automatically when the job is run and can be overridden from the command line when starting the job.

GlueJobRatesToParquet:
Type: AWS::Glue::Job
Properties:
GlueVersion: 1.0
Command:
Name: glueetl
PythonVersion: 3
ScriptLocation: !Sub "s3://${ScriptBucketName}/glue_scripts/rates_xml_to_parquet.py"
DefaultArguments: {
"–s3_output_path": !Sub "s3://${DataBucketName}/electricity_rates_parquet",
"–source_glue_database": !Ref GlueDatabase,
"–source_glue_table": "electricity_rates_xml",
"–job-bookmark-option": "job-bookmark-enable",
"–enable-spark-ui": "true",
"–spark-event-logs-path": !Sub "s3://${LogBucketName}/glue-etl-jobs/"
}
Description: "Convert electrical rates XML data to Parquet"
ExecutionProperty:
MaxConcurrentRuns: 2
MaxRetries: 0
Name: rates-xml-to-parquet
Role: !GetAtt "CrawlerRole.Arn"
DependsOn:
CrawlerRole
GlueDatabase
DataBucket
ScriptBucket
LogBucket

view raw
elt-job-cfn.yml
hosted with ❤ by GitHub

First, copy the Glue ETL Job Python script to the SCRIPT_BUCKET S3 bucket.

aws s3 cp glue-scripts/rates_xml_to_parquet.py \
s3://${SCRIPT_BUCKET}/glue_scripts/

view raw
step10.sh
hosted with ❤ by GitHub

Next, start the Glue ETL Job (part of step 2a in workflow diagram). Although the conversion is a relatively simple set of tasks, the creation of the Apache Spark environment, to execute the tasks, will take several minutes. Whereas the Glue Crawlers took about 2 minutes on average, the Glue ETL Job could take 10–15 minutes in my experience. The actual execution time only takes about 1–2 minutes of the 10–15 minutes to complete. In my opinion, waiting up to 15 minutes is too long to be viable for ad-hoc jobs against smaller datasets; Glue ETL Jobs are definitely targeted for big data.

aws glue start-job-run –job-name rates-xml-to-parquet

view raw
step11.sh
hosted with ❤ by GitHub

To check on the status of the job, use the Glue ETL Jobs Console, or use the AWS CLI.

# get status of most recent job (the one that is running)
aws glue get-job-run \
–job-name rates-xml-to-parquet \
–run-id "$(aws glue get-job-runs \
–job-name rates-xml-to-parquet \
| jq -r '.JobRuns[0].Id')"

view raw
step11.sh
hosted with ❤ by GitHub

When complete, you should see results similar to the following. Note the ‘JobRunState’ is ‘SUCCEEDED.’ This particular job ran for a total of 14.92 minutes, while the actual execution time was 2.25 minutes.

{
"JobRun": {
"Id": "jr_f7186b26bf042ea7773ad08704d012d05299f080e7ac9b696ca8dd575f79506b",
"Attempt": 0,
"JobName": "rates-xml-to-parquet",
"StartedOn": 1578022390.301,
"LastModifiedOn": 1578023285.632,
"CompletedOn": 1578023285.632,
"JobRunState": "SUCCEEDED",
"PredecessorRuns": [],
"AllocatedCapacity": 10,
"ExecutionTime": 135,
"Timeout": 2880,
"MaxCapacity": 10.0,
"LogGroupName": "/aws-glue/jobs",
"GlueVersion": "1.0"
}
}

view raw
job-results.json
hosted with ❤ by GitHub

The job’s progress and the results are also visible in the AWS Glue Console’s ETL Jobs tab.

screen_shot_2020-01-04_at_7_42_51_pm

Detailed Apache Spark logs are also available in CloudWatch Management Console, which is accessible directly from the Logs link in the AWS Glue Console’s ETL Jobs tab.

screen_shot_2020-01-04_at_7_44_08_pm

The last step in the Data Transformation stage is to convert catalog the Parquet-format electrical rates data, created with the previous Glue ETL Job, using yet another Glue Crawler (part of step 2b in workflow diagram). Start the following Glue Crawler to catalog the Parquet-format electrical rates data.

aws glue start-crawler –name smart-hub-rates-parquet

view raw
step11b.sh
hosted with ❤ by GitHub

This concludes the Data Transformation stage. The raw and transformed data is in the data lake, and the following nine tables should exist in the Glue Data Catalog.

electricity_rates_parquet
electricity_rates_xml
etl_tmp_output_parquet
sensor_mappings_json
sensor_mappings_parquet
smart_hub_data_json
smart_hub_data_parquet
smart_hub_locations_csv
smart_hub_locations_parquet

If we examine the tables, we should observe the data partitions we used to organize the data files in the Amazon S3-based data lake are contained in the table metadata. Below, we see the four partitions, based on state, of the Parquet-format locations data.

screen_shot_2020-01-05_at_7_45_46_am

Data Enrichment

To begin the Data Enrichment stage, we will invoke the AWS Lambda, athena-complex-etl-query/index.py. This Lambda accepts input parameters (lines 28–30, below), passed in the Lambda handler’s event parameter. The arguments include the Smart Hub ID, the start date for the data requested, and the end date for the data requested. The scenario for the demonstration is that a customer with the location id value, using the electrical provider’s application, has requested data for a particular range of days (start date and end date), to visualize and analyze.

The Lambda executes a series of Athena INSERT INTO SQL statements, one statement for each of the possible Smart Hub connected electrical sensors, s_01 through s_10, for which there are values in the Smart Hub electrical usage data. Amazon just released the Amazon Athena INSERT INTO a table using the results of a SELECT query capability in September 2019, an essential addition to Athena. New Athena features are listed in the release notes.

Here, the SELECT query is actually a series of chained subqueries, using Presto SQL’s WITH clause capability. The queries join the Parquet-format Smart Hub electrical usage data sources in the S3-based data lake, with the other three Parquet-format, S3-based data sources: sensor mappings, locations, and electrical rates. The Parquet-format data is written as individual files to S3 and inserted into the existing ‘etl_tmp_output_parquet’ Glue Data Catalog database table. Compared to traditional relational database-based queries, the capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!

The capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!

Below, we see the SQL statement starting on line 43.

import boto3
import os
import logging
import json
from typing import Dict
# environment variables
data_catalog = os.getenv('DATA_CATALOG')
data_bucket = os.getenv('DATA_BUCKET')
# variables
output_directory = 'etl_tmp_output_parquet'
# uses list comprehension to generate the equivalent of:
# ['s_01', 's_02', …, 's_09', 's_10']
sensors = [f's_{i:02d}' for i in range(1, 11)]
# logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# athena client
athena_client = boto3.client('athena')
def handler(event, context):
args = {
"loc_id": event['loc_id'],
"date_from": event['date_from'],
"date_to": event['date_to']
}
athena_query(args)
return {
'statusCode': 200,
'body': json.dumps("function 'athena-complex-etl-query' complete")
}
def athena_query(args: Dict[str, str]):
for sensor in sensors:
query = \
"INSERT INTO " + data_catalog + "." + output_directory + " " \
"WITH " \
" t1 AS " \
" (SELECT d.loc_id, d.ts, d.data." + sensor + " AS kwh, l.state, l.tz " \
" FROM smart_hub_data_catalog.smart_hub_data_parquet d " \
" LEFT OUTER JOIN smart_hub_data_catalog.smart_hub_locations_parquet l " \
" ON d.loc_id = l.hash " \
" WHERE d.loc_id = '" + args['loc_id'] + "' " \
" AND d.dt BETWEEN cast('" + args['date_from'] + \
"' AS date) AND cast('" + args['date_to'] + "' AS date)), " \
" t2 AS " \
" (SELECT at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz) AS ts, " \
" date_format(at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz), '%H') AS rate_period, " \
" m.description AS device, m.location, t1.loc_id, t1.state, t1.tz, t1.kwh " \
" FROM t1 LEFT OUTER JOIN smart_hub_data_catalog.sensor_mappings_parquet m " \
" ON t1.loc_id = m.loc_id " \
" WHERE t1.loc_id = '" + args['loc_id'] + "' " \
" AND m.state = t1.state " \
" AND m.description = (SELECT m2.description " \
" FROM smart_hub_data_catalog.sensor_mappings_parquet m2 " \
" WHERE m2.loc_id = '" + args['loc_id'] + "' AND m2.id = '" + sensor + "')), " \
" t3 AS " \
" (SELECT substr(r.to, 1, 2) AS rate_period, r.type, r.rate, r.year, r.month, r.state " \
" FROM smart_hub_data_catalog.electricity_rates_parquet r " \
" WHERE r.year BETWEEN cast(date_format(cast('" + args['date_from'] + \
"' AS date), '%Y') AS integer) AND cast(date_format(cast('" + args['date_to'] + \
"' AS date), '%Y') AS integer)) " \
"SELECT replace(cast(t2.ts AS VARCHAR), concat(' ', t2.tz), '') AS ts, " \
" t2.device, t2.location, t3.type, t2.kwh, t3.rate AS cents_per_kwh, " \
" round(t2.kwh * t3.rate, 4) AS cost, t2.state, t2.loc_id " \
"FROM t2 LEFT OUTER JOIN t3 " \
" ON t2.rate_period = t3.rate_period " \
"WHERE t3.state = t2.state " \
"ORDER BY t2.ts, t2.device;"
logger.info(query)
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': data_catalog
},
ResultConfiguration={
'OutputLocation': 's3://' + data_bucket + '/tmp/' + output_directory
},
WorkGroup='primary'
)
logger.info(response)

view raw
athena_query.py
hosted with ❤ by GitHub

Below, is an example of one of the final queries, for the s_10 sensor, as executed by Athena. All the input parameter values, Python variables, and environment variables have been resolved into the query.

INSERT INTO smart_hub_data_catalog.etl_tmp_output_parquet
WITH t1 AS (SELECT d.loc_id, d.ts, d.data.s_10 AS kwh, l.state, l.tz
FROM smart_hub_data_catalog.smart_hub_data_parquet d
LEFT OUTER JOIN smart_hub_data_catalog.smart_hub_locations_parquet l ON d.loc_id = l.hash
WHERE d.loc_id = 'b6a8d42425fde548'
AND d.dt BETWEEN cast('2019-12-21' AS date) AND cast('2019-12-22' AS date)),
t2 AS (SELECT at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz) AS ts,
date_format(at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz), '%H') AS rate_period,
m.description AS device,
m.location,
t1.loc_id,
t1.state,
t1.tz,
t1.kwh
FROM t1
LEFT OUTER JOIN smart_hub_data_catalog.sensor_mappings_parquet m ON t1.loc_id = m.loc_id
WHERE t1.loc_id = 'b6a8d42425fde548'
AND m.state = t1.state
AND m.description = (SELECT m2.description
FROM smart_hub_data_catalog.sensor_mappings_parquet m2
WHERE m2.loc_id = 'b6a8d42425fde548'
AND m2.id = 's_10')),
t3 AS (SELECT substr(r.to, 1, 2) AS rate_period, r.type, r.rate, r.year, r.month, r.state
FROM smart_hub_data_catalog.electricity_rates_parquet r
WHERE r.year BETWEEN cast(date_format(cast('2019-12-21' AS date), '%Y') AS integer)
AND cast(date_format(cast('2019-12-22' AS date), '%Y') AS integer))
SELECT replace(cast(t2.ts AS VARCHAR), concat(' ', t2.tz), '') AS ts,
t2.device,
t2.location,
t3.type,
t2.kwh,
t3.rate AS cents_per_kwh,
round(t2.kwh * t3.rate, 4) AS cost,
t2.state,
t2.loc_id
FROM t2
LEFT OUTER JOIN t3 ON t2.rate_period = t3.rate_period
WHERE t3.state = t2.state
ORDER BY t2.ts, t2.device;

Along with enriching the data, the query performs additional data transformation using the other data sources. For example, the Unix timestamp is converted to a localized timestamp containing the date and time, according to the customer’s location (line 7, above). Transforming dates and times is a frequent, often painful, data analysis task. Another example of data enrichment is the augmentation of the data with a new, computed column. The column’s values are calculated using the values of two other columns (line 33, above).

Invoke the Lambda with the following three parameters in the payload (step 3a in workflow diagram).

aws lambda invoke \
–function-name athena-complex-etl-query \
–payload "{ \"loc_id\": \"b6a8d42425fde548\",
\"date_from\": \"2019-12-21\", \"date_to\": \"2019-12-22\"}" \
response.json

view raw
step12.sh
hosted with ❤ by GitHub

The ten INSERT INTO SQL statement’s result statuses (one per device sensor) are visible from the Athena Console’s History tab.

screen_shot_2020-01-05_at_9_17_23_pm

Each Athena query execution saves that query’s results to the S3-based data lake as individual, uncompressed Parquet-format data files. The data is partitioned in the Amazon S3-based data lake by the Smart Meter location ID (e.g., ‘loc_id=b6a8d42425fde548’).

Below is a snippet of the enriched data for a customer’s clothes washer (sensor ‘s_04’). Note the timestamp is now an actual date and time in the local timezone of the customer (e.g., ‘2019-12-21 20:10:00.000’). The sensor ID (‘s_04’) is replaced with the actual device name (‘Clothes Washer’). The location of the device (‘Basement’) and the type of electrical usage period (e.g. ‘peak’ or ‘partial-peak’) has been added. Finally, the cost column has been computed.


ts device location type kwh cents_per_kwh cost state loc_id
2019-12-21 19:40:00.000 Clothes Washer Basement peak 0.0 12.623 0.0 or b6a8d42425fde548
2019-12-21 19:45:00.000 Clothes Washer Basement peak 0.0 12.623 0.0 or b6a8d42425fde548
2019-12-21 19:50:00.000 Clothes Washer Basement peak 0.1501 12.623 1.8947 or b6a8d42425fde548
2019-12-21 19:55:00.000 Clothes Washer Basement peak 0.1497 12.623 1.8897 or b6a8d42425fde548
2019-12-21 20:00:00.000 Clothes Washer Basement partial-peak 0.1501 7.232 1.0855 or b6a8d42425fde548
2019-12-21 20:05:00.000 Clothes Washer Basement partial-peak 0.2248 7.232 1.6258 or b6a8d42425fde548
2019-12-21 20:10:00.000 Clothes Washer Basement partial-peak 0.2247 7.232 1.625 or b6a8d42425fde548
2019-12-21 20:15:00.000 Clothes Washer Basement partial-peak 0.2248 7.232 1.6258 or b6a8d42425fde548
2019-12-21 20:20:00.000 Clothes Washer Basement partial-peak 0.2253 7.232 1.6294 or b6a8d42425fde548
2019-12-21 20:25:00.000 Clothes Washer Basement partial-peak 0.151 7.232 1.092 or b6a8d42425fde548

view raw
elt_data.csv
hosted with ❤ by GitHub

To transform the enriched CSV-format data to Parquet-format, we need to catalog the CSV-format results using another Crawler, first (step 3d in workflow diagram).

aws glue start-crawler –name smart-hub-etl-tmp-output-parquet

view raw
step13.sh
hosted with ❤ by GitHub

Optimizing Enriched Data

The previous step created enriched Parquet-format data. However, this data is not as optimized for query efficiency as it should be. Using the Athena INSERT INTO WITH SQL statement, allowed the data to be partitioned. However, the method does not allow the Parquet data to be easily combined into larger files and compressed. To perform both these optimizations, we will use one last Lambda, athena-parquet-to-parquet-elt-data/index.py. The Lambda will create a new location in the Amazon S3-based data lake, containing all the enriched data, in a single file and compressed using Snappy compression.

aws lambda invoke \
–function-name athena-parquet-to-parquet-elt-data \
response.json

view raw
step14.sh
hosted with ❤ by GitHub

The resulting Parquet file is visible in the S3 Management Console.

screen_shot_2020-01-04_at_6_07_23_pm

The final step in the Data Enrichment stage is to catalog the optimized Parquet-format enriched ETL data. To catalog the data, run the following Glue Crawler (step 3i in workflow diagram

aws glue start-crawler –name smart-hub-etl-output-parquet

view raw
step15.sh
hosted with ❤ by GitHub

Final Data Lake and Data Catalog

We should now have the following ten top-level folders of partitioned data in the S3-based data lake. The ‘tmp’ folder may be ignored.

aws s3 ls s3://${DATA_BUCKET}/

view raw
step16.sh
hosted with ❤ by GitHub

PRE electricity_rates_parquet/
PRE electricity_rates_xml/
PRE etl_output_parquet/
PRE etl_tmp_output_parquet/
PRE sensor_mappings_json/
PRE sensor_mappings_parquet/
PRE smart_hub_data_json/
PRE smart_hub_data_parquet/
PRE smart_hub_locations_csv/
PRE smart_hub_locations_parquet/

Similarly, we should now have the following ten corresponding tables in the Glue Data Catalog. Use the AWS Glue Console to confirm the tables exist.

screen_shot_2020-01-04_at_8_30_50_pm

Alternately, use the following AWS CLI / jq command to list the table names.

aws glue get-tables \
–database-name smart_hub_data_catalog \
| jq -r '.TableList[].Name'

view raw
step17.sh
hosted with ❤ by GitHub

electricity_rates_parquet
electricity_rates_xml
etl_output_parquet
etl_tmp_output_parquet
sensor_mappings_json
sensor_mappings_parquet
smart_hub_data_json
smart_hub_data_parquet
smart_hub_locations_csv
smart_hub_locations_parquet

view raw
gistfile1.txt
hosted with ❤ by GitHub

‘Unknown’ Bug

You may have noticed the four tables created with the AWS Lambda functions, using the CTAS SQL statement, erroneously have the ‘Classification’ of ‘Unknown’ as opposed to ‘parquet’. I am not sure why, I believe it is a possible bug with the CTAS feature. It seems to have no adverse impact on the table’s functionality. However, to fix the issue, run the following set of commands. This aws glue update-table hack will switch the table’s ‘Classification’ to ‘parquet’.

database=smart_hub_data_catalog
tables=(smart_hub_locations_parquet sensor_mappings_parquet smart_hub_data_parquet etl_output_parquet)
for table in ${tables}; do
fixed_table=$(aws glue get-table \
–database-name "${database}" \
–name "${table}" \
| jq '.Table.Parameters.classification = "parquet" | del(.Table.DatabaseName) | del(.Table.CreateTime) | del(.Table.UpdateTime) | del(.Table.CreatedBy) | del(.Table.IsRegisteredWithLakeFormation)')
fixed_table=$(echo ${fixed_table} | jq .Table)
aws glue update-table \
–database-name "${database}" \
–table-input "${fixed_table}"
echo "table '${table}' classification changed to 'parquet'"
done

The results of the fix may be seen from the AWS Glue Console. All ten tables are now classified correctly.

screen_shot_2020-01-05_at_11_43_50_pm

Explore the Data

Before starting to visualize and analyze the data with Amazon QuickSight, try executing a few Athena queries against the tables in the Glue Data Catalog database, using the Athena Query Editor. Working in the Editor is the best way to understand the data, learn Athena, and debug SQL statements and queries. The Athena Query Editor has convenient developer features like SQL auto-complete and query formatting capabilities.

Be mindful when writing queries and searching the Internet for SQL references, the Athena query engine is based on Presto 0.172. The current version of Presto, 0.229, is more than 50 releases ahead of the current Athena version. Both Athena and Presto functionality has changed and diverged. There are additional considerations and limitations for SQL queries in Athena to be aware of.

screen_shot_2020-01-05_at_10_32_25_am

Here are a few simple, ad-hoc queries to run in the Athena Query Editor.

preview the final etl data
SELECT *
FROM smart_hub_data_catalog.etl_output_parquet
LIMIT 10;
total cost in $'s for each device, at location 'b6a8d42425fde548'
from high to low, on December 21, 2019
SELECT device,
concat('$', cast(cast(sum(cost) / 100 AS decimal(10, 2)) AS varchar)) AS total_cost
FROM smart_hub_data_catalog.etl_tmp_output_parquet
WHERE loc_id = 'b6a8d42425fde548'
AND date (cast(ts AS timestamp)) = date '2019-12-21'
GROUP BY device
ORDER BY total_cost DESC;
count of smart hub residential locations in Oregon and California,
grouped by zip code, sorted by count
SELECT DISTINCT postcode, upper(state), count(postcode) AS smart_hub_count
FROM smart_hub_data_catalog.smart_hub_locations_parquet
WHERE state IN ('or', 'ca')
AND length(cast(postcode AS varchar)) >= 5
GROUP BY state, postcode
ORDER BY smart_hub_count DESC, postcode;
electrical usage for the clothes washer
over a 30-minute period, on December 21, 2019
SELECT ts, device, location, type, cost
FROM smart_hub_data_catalog.etl_tmp_output_parquet
WHERE loc_id = 'b6a8d42425fde548'
AND device = 'Clothes Washer'
AND cast(ts AS timestamp)
BETWEEN timestamp '2019-12-21 08:45:00'
AND timestamp '2019-12-21 09:15:00'
ORDER BY ts;

view raw
athena_queries.sql
hosted with ❤ by GitHub

Cleaning Up

You may choose to save the AWS resources created in part one of this demonstration, to be used in part two. Since you are not actively running queries against the data, ongoing AWS costs will be minimal. If you eventually choose to clean up the AWS resources created in part one of this demonstration, execute the following AWS CLI commands. To avoid failures, make sure each command completes before running the subsequent command. You will need to confirm the CloudFormation stacks are deleted using the AWS CloudFormation Console or the AWS CLI. These commands will not remove Amazon QuickSight data sets, analyses, and dashboards created in part two. However, deleting the AWS Glue Data Catalog and the underlying data sources will impact the ability to visualize the data in QuickSight.

# delete s3 contents first
aws s3 rm s3://${DATA_BUCKET} –recursive
aws s3 rm s3://${SCRIPT_BUCKET} –recursive
aws s3 rm s3://${LOG_BUCKET} –recursive
# then, delete lambda cfn stack
aws cloudformation delete-stack –stack-name smart-hub-lambda-stack
# finally, delete athena-glue-s3 stack
aws cloudformation delete-stack –stack-name smart-hub-athena-glue-stack

view raw
step18.sh
hosted with ❤ by GitHub

Part Two

In part one, starting with raw, semi-structured data in multiple formats, we learned how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we will use the transformed and enriched datasets, stored in the data lake, to create compelling visualizations using Amazon QuickSight.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , ,

2 Comments

IoT Telemetry Collection using Google Protocol Buffers, Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas

Business team meeting. Photo professional investor working new s
Collect IoT sensor telemetry using Google Protocol Buffers’ serialized binary format over HTTPS, serverless Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas on GCP, as an alternative to integrated Cloud IoT platforms and standard IoT protocols. Aggregate, analyze, and build machine learning models with the data using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

Introduction

Most of the dominant Cloud providers offer IoT (Internet of Things) and IIotT  (Industrial IoT) integrated services. Amazon has AWS IoT, Microsoft Azure has multiple offering including IoT Central, IBM’s offering including IBM Watson IoT Platform, Alibaba Cloud has multiple IoT/IIoT solutions for different vertical markets, and Google offers Google Cloud IoT platform. All of these solutions are marketed as industrial-grade, highly-performant, scalable technology stacks. They are capable of scaling to tens-of-thousands of IoT devices or more and massive amounts of streaming telemetry.

In reality, not everyone needs a fully integrated IoT solution. Academic institutions, research labs, tech start-ups, and many commercial enterprises want to leverage the Cloud for IoT applications, but may not be ready for a fully-integrated IoT platform or are resistant to Cloud vendor platform lock-in.

Similarly, depending on the performance requirements and the type of application, organizations may not need or want to start out using IoT/IIOT industry standard data and transport protocols, such as MQTT (Message Queue Telemetry Transport) or CoAP (Constrained Application Protocol), over UDP (User Datagram Protocol). They may prefer to transmit telemetry over HTTP using TCP, or securely, using HTTPS (HTTP over TLS).

Demonstration

In this demonstration, we will collect environmental sensor data from a number of IoT device sensors and stream that telemetry over the Internet to Google Cloud. Each IoT device is installed in a different physical location. The devices contain a variety of common sensors, including humidity and temperature, motion, and light intensity.

iot_3.jpg

Prototype IoT Devices used in this Demonstration

We will transmit the sensor telemetry data as JSON over HTTP to serverless Google Cloud Function HTTPS endpoints. We will then switch to using Google’s Protocol Buffers to transmit binary data over HTTP. We should observe a reduction in the message size contained in the request payload as we move from JSON to Protobuf, which should reduce system latency and cost.

Data received by Cloud Functions over HTTP will be published asynchronously to Google Cloud Pub/Sub. A second Cloud Function will respond to all published events and push the messages to MongoDB Atlas on GCP. Once in Atlas, we will aggregate, transform, analyze, and build machine learning models with the data, using tools such as MongoDB Compass, Jupyter Notebooks, and Google’s AI Platform Notebooks.

For this demonstration, the architecture for JSON over HTTP will look as follows. All sensors will transmit data to a single Cloud Function HTTPS endpoint.

JSON IoT Basic Icons.png

For Protobuf over HTTP, the architecture will look as follows in the demonstration. Each type of sensor will transmit data to a different Cloud Function HTTPS endpoint.

Demo IoT Diagram Icons.png

Although the Cloud Functions will automatically scale horizontally to accommodate additional load created by the volume of telemetry being received, there are also other options to scale the system. For example, we could create individual pipelines of functions and topic/subscriptions for each sensor type. We could also split the telemetry data across multiple MongoDB Atlas Collections, based on sensor type, instead of a single collection. In all cases, we will still benefit from the Cloud Function’s horizontal scaling capabilities.

Complex IoT Diagram Icons.png

Source Code

All source code is all available on GitHub. Use the following command to clone the project.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/iot-protobuf-demo.git

You will need to adjust the project’s environment variables to fit your own development and Cloud environments. All source code for this post is written in Python. It is intended for Python 3 interpreters but has been tested using Python 2 interpreters. The project’s Jupyter Notebooks can be viewed from within the project on GitHub or using the free, online Jupyter nbviewer.

screen_shot_2019-05-21_at_12_55_25_pm

Technologies

Protocol Buffers

Image result for google developerAccording to Google, Protocol Buffers (aka Protobuf) are a language- and platform-neutral, efficient, extensible, automated mechanism for serializing structured data for use in communications protocols, data storage, and more. Protocol Buffers are 3 to 10 times smaller and 20 to 100 times faster than XML.

Each protocol buffer message is a small logical record of information, containing a series of strongly-typed name-value pairs. Once you have defined your messages, you run the protocol buffer compiler for your application’s language on your .proto file to generate data access classes.

Google Cloud Functions

Cloud-Functions.png

According to Google, Cloud Functions is Google’s event-driven, serverless compute platform. Key features of Cloud Functions include automatic scaling, high-availability, fault-tolerance,
no servers to provision, manage, patch or update, only
pay while your code runs, and they easily connect and extend other cloud services. Cloud Functions natively support multiple event-types, including HTTP, Cloud Pub/Sub, Cloud Storage, and Firebase. Current language support includes Python, Go, and Node.

Google Cloud Pub/Sub

google pub-subAccording to Google, Cloud Pub/Sub is an enterprise message-oriented middleware for the Cloud. It is a scalable, durable event ingestion and delivery system. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independent applications. Cloud Pub/Sub delivers low-latency, durable messaging that integrates with systems hosted on the Google Cloud Platform and externally.

MongoDB Atlas

mongodbMongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime service-level agreements, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.

MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, free M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.

Cost Effectiveness of Cloud Functions

At true IIoT scale, Google Cloud Functions may not be the most efficient or cost-effective method of ingesting telemetry data. Based on Google’s pricing model, you get two million free function invocations per month, with each additional million invocations costing USD $0.40. The total cost also includes memory usage, total compute time, and outbound data transfer. If your system is comprised of tens or hundreds of IoT devices, Cloud Functions may prove cost-effective.

However, with thousands of devices or more, each transmitting data multiple times per minutes, you could quickly outgrow the cost-effectiveness of Google Functions. In that case, you might look to Google’s Google Cloud IoT platform. Alternately, you can build your own platform with Google products such as Knative, letting you choose to run your containers either fully managed with the newly-released Cloud Run, or in your Google Kubernetes Engine cluster with Cloud Run on GKE.

Sensor Scripts

For each sensor type, I have developed separate Python scripts, which run on each IoT device. There are two versions of each script, one for JSON over HTTP and one for Protobuf over HTTP.

JSON over HTTPS

Below we see the script, dht_sensor_http_json.py, used to transmit humidity and temperature data via JSON over HTTP to a Google Cloud Function running on GCP. The JSON request payload contains a timestamp, IoT device ID, device type, and the temperature and humidity sensor readings. The URL for the Google Cloud Function is stored as an environment variable, local to the IoT devices, and set when the script is deployed.

import json
import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests

URL = os.environ.get('GCF_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)
        payload = {'device': device_id,
                   'type': TYPE,
                   'timestamp': time.time(),
                   'data': {'temperature': temperature,
                            'humidity': humidity}}
        post_data(payload)
        time.sleep(FREQUENCY)


def post_data(payload):
    payload = json.dumps(payload)

    headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': JWT
    }

    try:
        requests.post(URL, json=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Telemetry Frequency

Although the sensors are capable of producing data many times per minute, for this demonstration, sensor telemetry is intentionally limited to only being transmitted every 15 seconds. To reduce system complexity, potential latency, back-pressure, and cost, in my opinion, you should only produce telemetry data at the frequency your requirements dictate.

JSON Web Tokens

For security, in addition to the HTTPS endpoints exposed by the Google Cloud Functions, I have incorporated the use of a JSON Web Token (JWT). JSON Web Tokens are an open, industry standard RFC 7519 method for representing claims securely between two parties. In this case, the JWT is used to verify the identity of the sensor scripts sending telemetry to the Cloud Functions. The JWT contains an id, password, and expiration, all encrypted with a secret key, which is known to each Cloud Function, in order to verify the IoT device’s identity. Without the correct JWT being passed in the Authorization header, the request to the Cloud Function will fail with an HTTP status code of 401 Unauthorized. Below is an example of the JWT’s payload data.

{
  "sub": "IoT Protobuf Serverless Demo",
  "id": "iot-demo-key",
  "password": "t7J2gaQHCFcxMD6584XEpXyzWhZwRrNJ",
  "iat": 1557407124,
  "exp": 1564664724
}

For this demonstration, I created a temporary JWT using jwt.io. The HTTP Functions are using PyJWT, a Python library which allows you to encode and decode the JWT. The PyJWT library allows the Function to decode and validate the JWT (Bearer Token) from the incoming request’s Authorization header. The JWT token is stored as an environment variable. Deployment instructions are included in the GitHub project.

screen_shot_2019-05-09_at_5_13_28_pm

JSON Payload

Below is a typical JSON request payload (pretty-printed), containing DHT sensor data. This particular message is 148 bytes in size. The message format is intentionally reader-friendly. We could certainly shorten the message’s key fields, to reduce the payload size by an additional 15-20 bytes.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Protocol Buffers

For the demonstration, I have built a Protocol Buffers file, sensors.proto, to support the data output by three sensor types: digital humidity and temperature (DHT), passive infrared sensor (PIR), and digital light intensity (DLI). I am using the newer proto3 version of the protocol buffers language. I have created a common Protobuf sensor message schema, with the variable sensor telemetry stored in the nested data object, within each message type.

It is important to use the correct Protobuf Scalar Value Type to maintain numeric precision in the language you compile for. For simplicity, I am using a double to represent the timestamp, as well as the numeric humidity and temperature readings. Alternately, you could choose Google’s Protobuf WellKnownTypesTimestamp to store timestamp.

syntax = "proto3";

package sensors;

// DHT22
message SensorDHT {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDHT data = 4;
}

message DataDHT {
    double temperature = 1;
    double humidity = 2;
}

// Onyehn_PIR
message SensorPIR {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataPIR data = 4;
}

message DataPIR {
    bool motion = 1;
}

// Anmbest_MD46N
message SensorDLI {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDLI data = 4;
}

message DataDLI {
    bool light = 1;
}

Since the sensor data will be captured with scripts written in Python 3, the Protocol Buffers file is compiled for Python, resulting in the file, sensors_pb2.py.

protoc --python_out=. sensors.proto

Protocol Buffers over HTTPS

Below we see the alternate DHT sensor script, dht_sensor_http_pb.py, which transmits a Protocol Buffers-based binary request payload over HTTPS to a Google Cloud Function running on GCP. Note the request’s Content-Type header has been changed from application/json to application/x-protobuf. In this case, instead of JSON, the same data fields are stored in an instance of the Protobuf’s SensorDHT message type (sensors_pb2.SensorDHT()). Note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device.

import logging
import os
import socket
import sys
import time

import Adafruit_DHT
import requests
import sensors_pb2

URL = os.environ.get('GCF_DHT_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15


def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())


def get_sensor_data(device_id):
    while True:
        try:
            humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)

            sensor_dht = sensors_pb2.SensorDHT()
            sensor_dht.device = device_id
            sensor_dht.type = TYPE
            sensor_dht.timestamp = time.time()
            sensor_dht.data.temperature = temperature
            sensor_dht.data.humidity = humidity

            payload = sensor_dht.SerializeToString()

            post_data(payload)
            time.sleep(FREQUENCY)
        except TypeError:
            logging.error('Error getting sensor data!')


def post_data(payload):
    headers = {
        'Content-Type': 'application/x-protobuf',
        'Authorization': JWT
    }

    try:
        requests.post(URL, data=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')


if __name__ == '__main__':
    sys.exit(main())

Protobuf Binary Payload

To understand the binary Protocol Buffers-based payload, we can write a sample SensorDHT message to a file on disk as a byte array.

message = sensorDHT.SerializeToString()

binary_file_output = open("./data_binary.txt", "wb")
file_byte_array = bytearray(message)
binary_file_output.write(file_byte_array)

Then, using the hexdump command, we can view a representation of the binary data file.

> hexdump -C data_binary.txt
00000000  0a 08 38 32 39 63 37 65  30 65 12 05 44 48 54 32  |..829c7e0e..DHT2|
00000010  32 1d 05 a0 b9 4e 22 0a  0d ec 51 b2 41 15 cd cc  |2....N"...Q.A...|
00000020  38 42                                             |8B|
00000022

The binary data file size is 48 bytes on disk, as compared to the equivalent JSON file size of 148 bytes on disk (32% the size). As a test, we could then send that binary data file as the payload of a POST to the Cloud Function, as shown below using Postman. Postman will serialize the binary data file’s contents to a binary string before transmitting.

screen_shot_2019-05-14_at_7_00_39_am.png

Similarly, we can serialize the same binary Protocol Buffers-based SensorDHT message to a binary string using the SerializeToString method.

message = sensorDHT.SerializeToString()
print(message)

The resulting binary string resembles the following.

b'\n\nrp829c7e0e\x12\x05DHT22\x19c\xee\xbcg\xf5\x8e\xccA"\x12\t\x00\x00\x00\xa0\x99\x191@\x11\x00\x00\x00`f\x06Q@'

The binary string length of the serialized message, and therefore the request payload sent by Postman and received by the Cloud Function for this particular message, is 111 bytes, as compared to the JSON payload size of 148 bytes (75% the size).

Validate Protobuf Payload

To validate the data contained in the Protobuf payload is identical to the JSON payload, we can parse the payload from the serialized binary string using the Protobuf ParseFromString method. We then convert it to JSON using the Protobuf MessageToJson method.

message = sensorDHT.SerializeToString() 
message_parsed = sensors_pb2.SensorDHT()
message_parsed.ParseFromString(message)
print(MessageToJson(message_parsed))

The resulting JSON object is identical to the JSON payload sent using JSON over HTTPS, earlier in the demonstration.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Google Cloud Functions

There are a series of Google Cloud Functions, specifically four HTTP Functions, which accept the sensor data over HTTP from the IoT devices. Each function exposes an HTTPS endpoint. According to Google, you use HTTP functions when you want to invoke your function via an HTTP(S) request. To allow for HTTP semantics, HTTP function signatures accept HTTP-specific arguments.

Below, I have deployed a single function that accepts JSON sensor telemetry from all sensor types, and three functions for Protobuf, one for each sensor type: DHT, PIR, and DLI.

screen-shot-2019-05-13-at-8_34_49-pm

JSON Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming JSON over HTTPS payload from all sensor types. Once the request’s JWT is validated, the JSON message payload is serialized to a byte string and sent to a common Google Cloud Pub/Sub Topic. Note the JWT secret key, id, and password, and the Google Cloud Pub/Sub Topic are all stored as environment variables, local to the Cloud Functions. In my tests, the JSON-based HTTP Functions took an average of 9–18 ms to execute successfully.

import logging
import os

import jwt
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    request_json = request.get_json()
    if not request_json:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    send_message(request_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, 
                      data=bytes(str(message), 'utf-8'))

The Cloud Functions are deployed to GCP using the gcloud functions deploy CLI command (I use Jenkins to automate the deployments). I have wrapped the deploy commands into bash scripts. The script also copies over a common environment variables YAML file, consumed by the Cloud Function. Each Function has a deployment script, included in the project.

# get latest env vars file
cp -f ./../env_vars_file/env.yaml .

# deploy function
gcloud functions deploy http_json_to_pubsub \
  --runtime python37 \
  --trigger-http \
  --region us-central1 \
  --memory 256 \
  --entry-point incoming_message \
  --env-vars-file env.yaml

Using a .gcloudignore file, the gcloud functions deploy CLI command deploys three files: the cloud function (main.py), required Python packages file (requirements.txt), the environment variables file (env.yaml). Google automatically installs dependencies using the requirements.txt file.

Protobuf Message Processing

Below, we see the Cloud Function, main.py, which processes the incoming Protobuf over HTTPS payload from DHT sensor types. Once the sensor data Protobuf message payload is received by the HTTP Function, it is deserialized to JSON and then serialized to a byte string. The byte string is then sent to a Google Cloud Pub/Sub Topic. In my tests, the Protobuf-based HTTP Functions took an average of 7–14 ms to execute successfully.

As before, note the import sensors_pb2 statement. This statement imports the compiled Protocol Buffers file, which is stored locally to the script on the IoT device. It is used to parse a serialized message into its original Protobuf’s SensorDHT message type.

import logging
import os

import jwt
import sensors_pb2
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1
from google.protobuf.json_format import MessageToJson

TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')


def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})

    data = request.get_data()
    if not data:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})

    sensor_pb = sensors_pb2.SensorDHT()
    sensor_pb.ParseFromString(data)
    sensor_json = MessageToJson(sensor_pb)
    send_message(sensor_json)

    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})


def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]

    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, data=bytes(message, 'utf-8'))

Cloud Pub/Sub Functions

In addition to HTTP Functions, the demonstration uses a function triggered by Google Cloud Pub/Sub Triggers. According to Google, Cloud Functions can be triggered by messages published to Cloud Pub/Sub Topics in the same GCP project as the function. The function automatically subscribes to the Topic. Below, we see that the function has automatically subscribed to iot-data-demo Cloud Pub/Sub Topic.

screen_shot_2019-05-09_at_2_41_17_pm

Sending Telemetry to MongoDB Atlas

The common Cloud Function, triggered by messages published to Cloud Pub/Sub, then sends the messages to MongoDB Atlas. There is a minimal amount of cleanup required to re-format the Cloud Pub/Sub messages to BSON (binary JSON). Interestingly, according to bsonspec.org, BSON can be com­pared to bin­ary inter­change for­mats, like Proto­col Buf­fers. BSON is more schema-less than Proto­col Buf­fers, which can give it an ad­vant­age in flex­ib­il­ity but also a slight dis­ad­vant­age in space ef­fi­ciency (BSON has over­head for field names with­in the seri­al­ized data).

The function uses the PyMongo to connect to MongoDB Atlas. According to their website, PyMongo is a Python distribution containing tools for working with MongoDB and is the recommended way to work with MongoDB from Python.

import base64
import json
import logging
import os
import pymongo

MONGODB_CONN = os.environ.get('MONGODB_CONN')
MONGODB_DB = os.environ.get('MONGODB_DB')
MONGODB_COL = os.environ.get('MONGODB_COL')


def read_message(event, context):
    message = base64.b64decode(event['data']).decode('utf-8')
    message = message.replace("'", '"')
    message = message.replace('True', 'true')
    message = json.loads(message)

    client = pymongo.MongoClient(MONGODB_CONN)
    db = client[MONGODB_DB]
    col = db[MONGODB_COL]
    col.insert_one(message)

The function responds to the published events and sends the messages to the MongoDB Atlas cluster, running in the same Region, us-central1, as the Cloud Functions and Pub/Sub Topic. Below, we see the current options available when provisioning an Atlas cluster.

screen_shot_2019-05-09_at_6_17_18_pm

MongoDB Atlas provides a rich, web-based UI for managing and monitoring MongoDB clusters, databases, collections, security, and performance.

screen_shot_2019-05-10_at_10_08_14_pm

Although Cloud Pub/Sub to Atlas function execution times are longer in duration than the HTTP functions, the latency is greatly reduced by locating the Cloud Pub/Sub Topic, Cloud Functions, and MongoDB Atlas cluster into the same GCP Region. Cross-region execution times were as high as 500-600 ms, while same-region execution times averaged 200-225 ms. Selecting a more performant Atlas cluster would likely result in even lower function execution times.

screen_shot_2019-05-10_at_10_16_49_pm

Aggregating Data with MongoDB Compass

MongoDB Compass is a free, convenient, desktop application for interacting with your MongoDB databases. You can view the collected sensor data, review message (document) schema, manage indexes, and build complex MongoDB aggregations.

screen_shot_2019-05-14_at_5_19_17_pm

screen_shot_2019-05-21_at_1_17_40_pm.pngscreen_shot_2019-05-21_at_1_15_09_pm

When performing analytics or machine learning, I primarily use MongoDB Compass to preview the captured telemetry data and build aggregation pipelines. Aggregation operations process data records and returns computed results. This feature saves a ton of time, filtering and preparing data for further analysis, visualization, and machine learning with Jupyter Notebooks.

screen_shot_2019-05-14_at_5_22_58_pm

Aggregation pipelines can be directly exported to Java, Node, C#, and Python 3. The exported aggregation pipeline code can be placed directly into your Python applications and Jupyter Notebooks.

screen_shot_2019-05-14_at_5_23_39_pm

Below, the exported aggregation pipelines code from MongoDB Compass is used to load a resultset directly into a Pandas DataFrame. This particular aggregation returns time-series DHT sensor data from a specific IoT device over a 72-hour period.

DEVICE_1 = 'rp59adf374'
pipeline = [
    {
        '$match': {
            'type': 'DHT22', 
            'device': DEVICE_1, 
            'timestamp': {
                '$gt': 1557619200,
                '$lt': 1557792000
            }
        }
    }, {
        '$project': {
            '_id': 0,
            'timestamp': 1, 
            'temperature': '$data.temperature', 
            'humidity': '$data.humidity'
        }
    }, {
        '$sort': {
            'timestamp': 1
        }
    }
]
aggResult = iot_data.aggregate(pipeline)
df1 = pd.DataFrame(list(aggResult))

MongoDB Atlas Performance

In this demonstration, from Python3-based Jupyter Notebooks, I was able to consistently query a MongoDB Atlas collection of almost 70k documents for resultsets containing 3 days (72 hours) worth of digital temperature and humidity data, roughly 10.2k documents, in an average of 825 ms. That is round trip from my local development laptop to MongoDB Atlas running on GCP, in a different geographic region.

Query times on GCP are much faster, such as when running a Notebook in JupyterLab on Google’s AI Platform, or a PySpark job with Cloud Dataproc, against Atlas. Running the same Jupyter Notebook directly on Google’s AI Platform, the same MongoDB Atlas query took an average of 450 ms versus 825 ms (1.83x faster). This was across two different GCP Regions; same Region times should be even faster.

screen-shot-2019-05-13-at-9_09_52-pm

GCP Observability

There are several choices for observing the system’s Google Cloud Functions, Google Cloud Pub/Sub, and MongoDB Atlas. As shown above, the GCP Cloud Functions interface lets you see the individual function executions, execution times, memory usage, and active instances, over varying time intervals.

For a more detailed view of Google Cloud Functions and Google Cloud Pub/Sub, I built two custom dashboards using Stackdriver. According to Google, Stackdriver aggregates metrics, logs, and events from infrastructure, giving developers and operators a rich set of observable signals. I built a custom Stackdriver Cloud Functions dashboard (shown below) and a Cloud Pub/Sub Topics and Subscriptions dashboard.

For functions, I chose to display execution times, memory usage, the number of executions, and network egress, all in a single pane of glass, using four graphs. Below, I am using the 95th percentile average for monitoring. The 95th percentile asserts that 95% of the time, the observed values are below this amount and the remaining 5% of the time, the observed values are above that amount.

screen_shot_2019-05-10_at_10_13_37_pm

Data Analysis using Jupyter Notebooks

According to jupyter.org, the Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The widespread use of Jupyter Notebooks has grown significantly, as Big Data, AI, and ML have all experienced explosive growth.

PyCharm

JetBrains PyCharm, my favorite Python IDE, has direct integrations with Jupyter Notebooks. In fact, PyCharm’s most recent updates to the Professional Edition greatly enhanced those integrations. PyCharm offers round-trip editing in the IDE and the Jupyter Notebook web browser interface. PyCharm allows you to run and debug individual cells within the notebook. PyCharm automatically starts the Jupyter Server and appropriate kernel for the Notebook you have opened. And, one of my favorite features, PyCharm’s variable viewer tracks the current value of a variable, automatically.

screen_shot_2019-05-10_at_10_38_42_am

Below, we see the example Analytics Notebook, included in the demonstration’s project, displayed in PyCharm 19.1.2 (Professional Edition). To effectively work with Notebooks in PyCharm really requires a full-size monitor. Working on a laptop with PyCharm’s crowded Notebook UI is workable, but certainly not as effective as on a larger monitor.

screen_shot_2019-05-20_at_2_23_46_pm.png

Jupyter Notebook Server

Below, we see the same Analytics Notebook, shown above in PyCharm, opened in Jupyter Notebook Server’s web-based client interface, running locally on the development workstation. The web browser-based interface also offers a rich set of features for Notebook development.

From within the Notebook, we are able to query the data from MongoDB Atlas, again using PyMongo, and load the resultsets into Panda DataFrames. As an alternative to hard-coded values and environment variables, with Notebooks, I use the python-dotenv Python package. This package allows me to place my environment variables in a common .env file and reference them from any Notebook. The package has many options for managing environment variables.

screen_shot_2019-05-19_at_9_46_32_am.png

We can then analyze the data using a number of common frameworks, including PandasMatplotlib, SciPy, PySpark, and NumPy, to name but a few. Below, we see time series data from four different sensors, on the same IoT device. Viewing the data together, we can study the causal effect of one environment variable on another, such as the impact of light on temperature or humidity.

screen_shot_2019-05-23_at_5_25_44_pm

Below, we can use histograms to visualize temperature frequencies for
intervals, over time, for a given device location.

screen-shot-2019-05-13-at-9_13_35-pm

Machine Learning using Jupyter Notebooks

In addition to data analytics, we can use Jupyter Notebooks with tools such as scikit-learn to build machine learning models based on our sensor telemetry.  Scikit-learn is a set of machine learning tools in Python, built on NumPy, SciPy, and matplotlib. Below, I have used JupyterLab on Google’s AI Platform and scikit-learn to build several models, based on the sensor data.

screen_shot_2019-05-19_at_12_25_45_pm

screen_shot_2019-05-19_at_12_26_45_pm

Using scikit-learn, we can build models to predict such things as which IoT device generated a specific temperature and humidity reading, or the temperature and humidity, given the time of day, device location, and external environment variables, or discover anomalies in the sensor telemetry.

Scikit-learn makes it easy to construct randomized training and test datasets, to build models, using data from multiple IoT devices, as shown below.

screen_shot_2019-05-19_at_12_27_39_pm

The project includes a Jupyter Notebook that demonstrates how to build several ML models using sensor data. Examples of supervised learning algorithms used to build the classification models in this demonstration include Support Vector Machine (SVM), k-nearest neighbors (k-NN), and Random Forest Classifier.

screen_shot_2019-05-19_at_12_30_38_pm

Having data from multiple sensors, we are able to enrich the ML models by adding additional categorical (discrete) features to our training data. For example, we could look at the effect of light, motion, and time of day on temperature and humidity.

screen_shot_2019-05-19_at_12_29_25_pm

Conclusion

Hopefully, this post has demonstrated how to efficiently collect telemetry data from IoT devices using Google Protocol Buffers over HTTPS, serverless Google Cloud Functions, Cloud Pub/Sub, and MongoDB Atlas, all on the Google Cloud Platform. Once captured, the telemetry data was easily aggregated and analyzed using common tools, such as MongoDB Compass and Jupyter Notebooks. Further, we used the data and tools to build machine learning models for prediction and anomaly detection.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

Image: everythingpossible © 123RF.com

, , , , , , , , , ,

1 Comment

Integrating Search Capabilities with Actions for Google Assistant, using GKE and Elasticsearch: Part 2

Voice and text-based conversational interfaces, such as chatbots, have recently seen tremendous growth in popularity. Much of this growth can be attributed to leading Cloud providers, such as Google, Amazon, and Microsoft, who now provide affordable, end-to-end development, machine learning-based training, and hosting platforms for conversational interfaces.

Cloud-based machine learning services greatly improve a conversational interface’s ability to interpret user intent with greater accuracy. However, the ability to return relevant responses to user inquiries, also requires interfaces have access to rich informational datastores, and the ability to quickly and efficiently query and analyze that data.

In this two-part post, we will enhance the capabilities of a voice and text-based conversational interface by integrating it with a search and analytics engine. By interfacing an Action for Google Assistant conversational interface with Elasticsearch, we will improve the Action’s ability to provide relevant results to the end-user. Instead of querying a traditional database for static responses to user intent, our Action will access a  Near Real-time (NRT) Elasticsearch index of searchable documents. The Action will leverage Elasticsearch’s advanced search and analytics capabilities to optimize and shape user responses, based on their intent.

Action Preview

Here is a brief YouTube video preview of the final Action for Google Assistant, integrated with Elasticsearch, running on an Apple iPhone.

Architecture

If you recall from part one of this post, the high-level architecture of our search engine-enhanced Action for Google Assistant resembles the following. Most of the components are running on Google Cloud.

Google Search Assistant Diagram GCP

Source Code

All open-sourced code for this post can be found on GitHub in two repositories, one for the Spring Boot Service and one for the Action for Google Assistant. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.

Development Process

In part two of this post, we will tie everything together by creating and integrating our Action for Google Assistant:

  • Create the new Actions for Google Assistant project using the Actions on Google console;
  • Develop the Action’s Intents and Entities using the Dialogflow console;
  • Develop, deploy, and test the Cloud Function to GCP;

Let’s explore each step in more detail.

New ‘Actions on Google’ Project

With Elasticsearch running and the Spring Boot Service deployed to our GKE cluster, we can start building our Actions for Google Assistant. Using the Actions on Google web console, we first create a new Actions project.

wp-search-021

The Directory Information tab is where we define metadata about the project. This information determines how it will look in the Actions directory and is required to publish your project. The Actions directory is where users discover published Actions on the web and mobile devices.

wp-search-019

The Directory Information tab also includes sample invocations, which may be used to invoke our Actions.

wp-search-020

Actions and Intents

Our project will contain a series of related Actions. According to Google, an Action is ‘an interaction you build for the Assistant that supports a specific intent and has a corresponding fulfillment that processes the intent.’ To build our Actions, we first want to create our Intents. To do so, we will want to switch from the Actions on Google console to the Dialogflow console. Actions on Google provides a link for switching to Dialogflow in the Actions tab.

wp-search-022

We will build our Action’s Intents in Dialogflow. The term Intent, used by Dialogflow, is standard terminology across other voice-assistant platforms, such as Amazon’s Alexa and Microsoft’s Azure Bot Service and LUIS. In Dialogflow, will be building Intents — the Find Multiple Posts Intent, Find Post Intent, Find By ID Intent, and so forth.

wp-search-023

Below, we see the Find Post Intent. The Find Post Intent is responsible for handling our user’s requests for a single post about a topic, for example, ‘Find a post about Docker.’ The Intent shown below contains a fair number, but indeed not an exhaustive list, of training phrases. These represent possible ways a user might express intent when invoking the Action.

wp-search-026

Below, we see the Find Multiple Posts Intent. The Find Multiple Posts Intent is responsible for handling our user’s requests for a list of posts about a topic, for example, ‘I’m interested in Docker.’ Similar to the Find Post Intent above, the Find Multiple Posts Intent contains a list of training phrases.

wp-search-025

Dialog Model Training

According to Google, the greater the number of natural language examples in the Training Phrases section of Intents, the better the classification accuracy. Every time a user interacts with our Action, the user’s utterances are logged. Using the Training tab in the Dialogflow console, we can train our model by reviewing and approving or correcting how the Action handled the user’s utterances.

Below we see the user’s utterances, part of an interaction with the Action. We have the option to review and approve the Intent that was called to handle the utterance, re-assign it, or delete it. This helps improve our accuracy of our dialog model.

wp-search-039.png

Dialogflow Entities

Each of the highlighted words in the training phrases maps to the facts parameter, which maps to a collection of @topic Entities. Entities represent a list of intents the Action is trained to understand.  According to Google, there are three types of entities: ‘system’ (defined by Dialogflow), ‘developer’ (defined by a developer), and ‘user’ (built for each individual end-user in every request) objects. We will be creating ‘developer’ type entities for our Action’s Intents.

wp-search-037.png

Automated Expansion

We do not have to define all possible topics a user might search for, as an entity.  By enabling the Allow Automated Expansion option, an Agent will recognize values that have not been explicitly listed in the entity list. Google describes Agents as NLU (Natural Language Understanding) modules.

wp-search-042.png

Entity Synonyms

An entity may contain synonyms. Multiple synonyms are mapped to a single reference value. The reference value is the value passed to the Cloud Function by the Action. For example, take the reference value of ‘GCP.’ The user might ask Google about ‘GCP’. However, the user might also substitute the words ‘Google Cloud’ or ‘Google Cloud Platform.’ Using synonyms, if the user utters any of these three synonymous words or phrase in their intent, the reference value, ‘GCP’, is passed in the request.

But, what if the post contains the phrase, ‘Google Cloud Platform’ more frequently than, or instead of, ‘GCP’? If the acronym, ‘GCP’, is defined as the entity reference value, then it is the value passed to the function, even if you ask for ‘Google Cloud Platform’. In the use case of searching blog posts by topic, entity synonyms are not an effective search strategy.

Elasticsearch Synonyms

A better way to solve for synonyms is by using the synonyms feature of Elasticsearch. Take, for example, the topic of ‘Istio’, Istio is also considered a Service Mesh. If I ask for posts about ‘Service Mesh’, I would like to get back posts that contain the phrase ‘Service Mesh’, but also the word ‘Istio’. To accomplish this, you would define an association between ‘Istio’ and ‘Service Mesh’, as part of the Elasticsearch WordPress posts index.

wp-search-041d

Searches for ‘Istio’ against that index would return results that contain ‘Istio’ and/or contain ‘Service Mesh’; the reverse is also true. Having created and applied a custom synonyms filter to the index, we see how Elasticsearch responds to an analysis of the natural language style phrase, ‘What is a Service Mesh?’. As shown by the tokens output in Kibana’s Dev Tools Console, Elasticsearch understands that ‘service mesh’ is synonymous with ‘istio’.

wp-search-041g

If we query the same five fields as our Action, for the topic of ‘service mesh’, we get four hits for posts (indexed documents) that contain ‘service mesh’ and/or ‘istio’.

wp-search-041c

Actions on Google Integration

Another configuration item in Dialogflow that needs to be completed is the Dialogflow’s Actions on Google integration. This will integrate our Action with Google Assistant. Google currently provides more than fifteen different integrations, including Google Assistant, Slack, Facebook Messanger, Twitter, and Twilio, as shown below.

wp-search-028

To configure the Google Assistant integration, choose the Welcome Intent as our Action’s Explicit Invocation intent. Then we designate our other Intents as Implicit Invocation intents. According to Google, this Google Assistant Integration allows our Action to reach users on every device where the Google Assistant is available.

wp-search-029

Action Fulfillment

When a user’s intent is received, it is fulfilled by the Action. In the Dialogflow Fulfillment console, we see the Action has two fulfillment options, a Webhook or an inline-editable Cloud Function, edited inline. A Webhook allows us to pass information from a matched intent into a web service and get a result back from the service. Our Action’s Webhook will call our Cloud Function on GCP, using the Cloud Function’s URL endpoint (we’ll get this URL in the next section).

wp-search-030

Google Cloud Functions

Our Cloud Function, called by our Action, is written in Node.js. Our function, index.js, is divided into four sections, which are: constants and environment variables, intent handlers, helper functions, and the function’s entry point. The helper functions are part of the Helper module, contained in the helper.js file.

Constants and Environment Variables

The section, in both index.js and helper.js, defines the global constants and environment variables used within the function. Values that reference environment variables, such as SEARCH_API_HOSTNAME are defined in the .env.yaml file. All environment variables in the .env.yaml file will be set during the Cloud Function’s deployment, described later in this post. Environment variables were recently released, and are still considered beta functionality (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
'use strict';
/* CONSTANTS AND GLOBAL VARIABLES */
const Helper = require('./helper');
let helper = new Helper();
const {
dialogflow,
Button,
Suggestions,
BasicCard,
SimpleResponse,
List
} = require('actions-on-google');
const functions = require('firebase-functions');
const app = dialogflow({debug: true});
app.middleware(conv => {
conv.hasScreen =
conv.surface.capabilities.has('actions.capability.SCREEN_OUTPUT');
conv.hasAudioPlayback =
conv.surface.capabilities.has('actions.capability.AUDIO_OUTPUT');
});
const SUGGESTION_1 = 'tell me about Docker';
const SUGGESTION_2 = 'help';
const SUGGESTION_3 = 'cancel';

The npm module dependencies declared in this section are defined in the dependencies section of the package.json file. Function dependencies include Actions on Google, Firebase Functions, Winston, and Request (gist).

{
"name": "functionBlogSearchAction",
"description": "Programmatic Ponderings Search Action for Google Assistant",
"version": "1.0.0",
"private": true,
"license": "MIT License",
"author": "Gary A. Stafford",
"engines": {
"node": ">=8"
},
"scripts": {
"deploy": "sh ./deploy-cloud-function.sh"
},
"dependencies": {
"@google-cloud/logging-winston": "^0.9.0",
"actions-on-google": "^2.2.0",
"dialogflow": "^0.6.0",
"dialogflow-fulfillment": "^0.5.0",
"firebase-admin": "^6.0.0",
"firebase-functions": "^2.0.2",
"request": "^2.88.0",
"request-promise-native": "^1.0.5",
"winston": "2.4.4"
}
}
view raw package.json hosted with ❤ by GitHub

Intent Handlers

The intent handlers in this section correspond to the intents in the Dialogflow console. Each handler responds with a SimpleResponse, BasicCard, and Suggestion Chip response types, or  Simple Response, List, and Suggestion Chip response types. These response types were covered in part one of this post. (gist).

/* INTENT HANDLERS */
app.intent('Welcome Intent', conv => {
const WELCOME_TEXT_SHORT = 'What topic are you interested in reading about?';
const WELCOME_TEXT_LONG = `You can say things like: \n` +
` _'Find a post about GCP'_ \n` +
` _'I'd like to read about Kubernetes'_ \n` +
` _'I'm interested in Docker'_`;
conv.ask(new SimpleResponse({
speech: WELCOME_TEXT_SHORT,
text: WELCOME_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: WELCOME_TEXT_LONG,
title: 'Programmatic Ponderings Search',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Fallback Intent', conv => {
const FACTS_LIST = "Kubernetes, Docker, Cloud, DevOps, AWS, Spring, Azure, Messaging, and GCP";
const HELP_TEXT_SHORT = 'Need a little help?';
const HELP_TEXT_LONG = `Some popular topics include: ${FACTS_LIST}.`;
conv.ask(new SimpleResponse({
speech: HELP_TEXT_LONG,
text: HELP_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: HELP_TEXT_LONG,
title: 'Programmatic Ponderings Search Help',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Post Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let posts = await helper.getPostsByTopic(postTopic, 1);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
let post = posts[0];
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `The top result for '${postTopic}' is the post, '${post.post_title}', published ${formattedDate}, with a relevance score of ${post._score.toFixed(2)}`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate} \nScore: ${post._score.toFixed(2)}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Multiple Posts Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let postCount = 6;
let posts = await helper.getPostsByTopic(postTopic, postCount);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
const POST_SPOKEN = `Here's a list of the top ${posts.length} posts about '${postTopic}'`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
}));
let itemsArray = {};
posts.forEach(function (post) {
itemsArray[post.ID] = {
title: `Post ID ${post.ID}`,
description: `${post.post_title.substring(0,80)}... \nScore: ${post._score.toFixed(2)}`,
};
});
if (conv.hasScreen) {
conv.ask(new List({
title: 'Top Results',
items: itemsArray
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find By ID Intent', async (conv, {topic}) => {
let postId = topic.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Okay, I found that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Option Intent', async (conv, params, option) => {
let postId = option.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Sure, here's that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});

The Welcome Intent handler handles explicit invocations of our Action. The Fallback Intent handler handles both help requests, as well as cases when Dialogflow is unable to handle the user’s request.

As described above in the Dialogflow section, the Find Post Intent handler is responsible for handling our user’s requests for a single post about a topic. For example, ‘Find a post about Docker’. To fulfill the user request, the Find Post Intent handler, calls the Helper module’s getPostByTopic function, passing the topic requested and specifying a result set size of one post with the highest relevance score higher than an arbitrary value of  1.0.

Similarly, the Find Multiple Posts Intent handler is responsible for handling our user’s requests for a list of posts about a topic; for example, ‘I’m interested in Docker’. To fulfill the user request, the Find Multiple Posts Intent handler, calls the Helper module’s getPostsByTopic function, passing the topic requested and specifying a result set size of a maximum of six posts with the highest relevance scores greater than 1.0

The Find By ID Intent handler is responsible for handling our user’s requests for a specific, unique posts ID; for example, ‘Post ID 22141’. To fulfill the user request, the Find By ID Intent handler, calls the Helper module’s getPostById function, passing the unique Post ID (gist).

/* INTENT HANDLERS */
app.intent('Welcome Intent', conv => {
const WELCOME_TEXT_SHORT = 'What topic are you interested in reading about?';
const WELCOME_TEXT_LONG = `You can say things like: \n` +
` _'Find a post about GCP'_ \n` +
` _'I'd like to read about Kubernetes'_ \n` +
` _'I'm interested in Docker'_`;
conv.ask(new SimpleResponse({
speech: WELCOME_TEXT_SHORT,
text: WELCOME_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: WELCOME_TEXT_LONG,
title: 'Programmatic Ponderings Search',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Fallback Intent', conv => {
const FACTS_LIST = "Kubernetes, Docker, Cloud, DevOps, AWS, Spring, Azure, Messaging, and GCP";
const HELP_TEXT_SHORT = 'Need a little help?';
const HELP_TEXT_LONG = `Some popular topics include: ${FACTS_LIST}.`;
conv.ask(new SimpleResponse({
speech: HELP_TEXT_LONG,
text: HELP_TEXT_SHORT,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
text: HELP_TEXT_LONG,
title: 'Programmatic Ponderings Search Help',
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Post Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let posts = await helper.getPostsByTopic(postTopic, 1);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
let post = posts[0];
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `The top result for '${postTopic}' is the post, '${post.post_title}', published ${formattedDate}, with a relevance score of ${post._score.toFixed(2)}`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate} \nScore: ${post._score.toFixed(2)}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find Multiple Posts Intent', async (conv, {topic}) => {
let postTopic = topic.toString();
let postCount = 6;
let posts = await helper.getPostsByTopic(postTopic, postCount);
if (posts !== undefined && posts.length < 1) {
helper.topicNotFound(conv, postTopic);
return;
}
const POST_SPOKEN = `Here's a list of the top ${posts.length} posts about '${postTopic}'`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
}));
let itemsArray = {};
posts.forEach(function (post) {
itemsArray[post.ID] = {
title: `Post ID ${post.ID}`,
description: `${post.post_title.substring(0,80)}... \nScore: ${post._score.toFixed(2)}`,
};
});
if (conv.hasScreen) {
conv.ask(new List({
title: 'Top Results',
items: itemsArray
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Find By ID Intent', async (conv, {topic}) => {
let postId = topic.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Okay, I found that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});
app.intent('Option Intent', async (conv, params, option) => {
let postId = option.toString();
let post = await helper.getPostById(postId);
if (post === undefined) {
helper.postIdNotFound(conv, postId);
return;
}
let formattedDate = helper.convertDate(post.post_date);
const POST_SPOKEN = `Sure, here's that post`;
const POST_TEXT = `Description: ${post.post_excerpt} \nPublished: ${formattedDate}`;
conv.ask(new SimpleResponse({
speech: POST_SPOKEN,
text: post.title,
}));
if (conv.hasScreen) {
conv.ask(new BasicCard({
title: post.post_title,
text: POST_TEXT,
buttons: new Button({
title: `Read Post`,
url: post.guid,
}),
}));
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3]));
}
});

Entry Point

The entry point creates a way to handle the communication with Dialogflow’s fulfillment API (gist).

/* ENTRY POINT */
exports.functionBlogSearchAction = functions.https.onRequest(app);

Helper Functions

The helper functions are part of the Helper module, contained in the helper.js file. In addition to typical utility functions like formatting dates, there are two functions, which interface with Elasticsearch, via our Spring Boot API, getPostsByTopic and getPostById. As described above, the intent handlers call one of these functions to obtain search results from Elasticsearch.

The getPostsByTopic function handles both the Find Post Intent handler and Find Multiple Posts Intent handler, described above. The only difference in the two calls is the size of the response set, either one result or six results maximum (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
'use strict';
/* CONSTANTS AND GLOBAL VARIABLES */
const {
dialogflow,
BasicCard,
SimpleResponse,
} = require('actions-on-google');
const app = dialogflow({debug: true});
app.middleware(conv => {
conv.hasScreen =
conv.surface.capabilities.has('actions.capability.SCREEN_OUTPUT');
conv.hasAudioPlayback =
conv.surface.capabilities.has('actions.capability.AUDIO_OUTPUT');
});
const SEARCH_API_HOSTNAME = process.env.SEARCH_API_HOSTNAME;
const SEARCH_API_PORT = process.env.SEARCH_API_PORT;
const SEARCH_API_ENDPOINT = process.env.SEARCH_API_ENDPOINT;
const rpn = require('request-promise-native');
const winston = require('winston');
const Logger = winston.Logger;
const Console = winston.transports.Console;
const {LoggingWinston} = require('@google-cloud/logging-winston');
const loggingWinston = new LoggingWinston();
const logger = new Logger({
level: 'info', // log at 'info' and above
transports: [
new Console(),
loggingWinston,
],
});
/* HELPER FUNCTIONS */
module.exports = class Helper {
/**
* Returns an collection of ElasticsearchPosts objects based on a topic
* @param postTopic topic to search for
* @param responseSize
* @returns {Promise<any>}
*/
getPostsByTopic(postTopic, responseSize = 1) {
return new Promise((resolve, reject) => {
const SEARCH_API_RESOURCE = `dismax-search?value=${postTopic}&start=0&size=${responseSize}&minScore=1`;
const SEARCH_API_URL = `http://${SEARCH_API_HOSTNAME}:${SEARCH_API_PORT}/${SEARCH_API_ENDPOINT}/${SEARCH_API_RESOURCE}`;
logger.info(`getPostsByTopic API URL: ${SEARCH_API_URL}`);
let options = {
uri: SEARCH_API_URL,
json: true
};
rpn(options)
.then(function (posts) {
posts = posts.ElasticsearchPosts;
logger.info(`getPostsByTopic Posts: ${JSON.stringify(posts)}`);
resolve(posts);
})
.catch(function (err) {
logger.error(`Error: ${err}`);
reject(err)
});
});
}
// truncated for brevity
};
view raw helper-1.js hosted with ❤ by GitHub

Both functions use the request and request-promise-native npm modules to call the Spring Boot service’s RESTful API over HTTP. However, instead of returning a callback, the request-promise-native module allows us to return a native ES6 Promise. By returning a promise, we can use async/await with our Intent handlers. Using async/await with Promises is a newer way of handling asynchronous operations in Node.js. The asynchronous programming model, using promises, is described in greater detail in my previous post, Building Serverless Actions for Google Assistant with Google Cloud Functions, Cloud Datastore, and Cloud Storage.

ThegetPostById function handles both the Find By ID Intent handler and Option Intent handler, described above. This function is similar to the getPostsByTopic function, calling a Spring Boot service’s RESTful API endpoint and passing the Post ID (gist).

// author: Gary A. Stafford
// site: https://programmaticponderings.com
// license: MIT License
// truncated for brevity
module.exports = class Helper {
/**
* Returns a single result based in the Post ID
* @param postId ID of the Post to search for
* @returns {Promise<any>}
*/
getPostById(postId) {
return new Promise((resolve, reject) => {
const SEARCH_API_RESOURCE = `${postId}`;
const SEARCH_API_URL = `http://${SEARCH_API_HOSTNAME}:${SEARCH_API_PORT}/${SEARCH_API_ENDPOINT}/${SEARCH_API_RESOURCE}`;
logger.info(`getPostById API URL: ${SEARCH_API_URL}`);
let options = {
uri: SEARCH_API_URL,
json: true
};
rpn(options)
.then(function (post) {
post = post.ElasticsearchPosts;
logger.info(`getPostById Post: ${JSON.stringify(post)}`);
resolve(post);
})
.catch(function (err) {
logger.error(`Error: ${err}`);
reject(err)
});
});
}
// truncated for brevity
};
view raw helper-2.js hosted with ❤ by GitHub

Cloud Function Deployment

To deploy the Cloud Function to GCP, use the gcloud CLI with the beta version of the functions deploy command. According to Google, gcloud is a part of the Google Cloud SDK. You must download and install the SDK on your system and initialize it before you can use gcloud. Currently, Cloud Functions are only available in four regions. I have included a shell