Archive for category Bash Scripting

Employing Amazon Macie to Discover and Protect Sensitive Data in your Amazon S3-based Data Lake

Introduction

Working with Analytics customers, it’s not uncommon to see data lakes with a dozen or more discrete data sources. Data typically originates from sources both internal and external to the customer. Internal data may come from multiple teams, departments, divisions, and enterprise systems. External data comes from vendors, partners, public sources, and subscriptions to licensed data sources. The volume, velocity, variety, veracity, and method of delivery vary across the data sources. All this data is being fed into data lakes for purposes such as analytics, business intelligence, and machine learning.

Given the growing volumes of incoming data and variations amongst data sources, it is increasingly complex, expensive, and time-consuming for organizations to ensure compliance with relevant laws, policies, and regulations. Regulations that impact how data is handled in a data lake include the Organizations Health Insurance Portability and Accountability Act (HIPAA), General Data Privacy Regulation (GDPR), Payment Card Industry Data Security Standard (PCI DSS), California Consumer Privacy Act (CCPA), and the Federal Information Security Management Act (FISMA).

Data Lake

AWS defines a data lake as a centralized repository that allows you to store all your structured and unstructured data at any scale. Once in the data lake, you run different types of analytics — from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

Data in a data lake is regularly organized or separated by its stage in the analytics process. Incoming data is often referred to as raw data. Data is then processed — cleansed, filtered, enriched, and tokenized if necessary. Lastly, the data is analyzed and aggregated, and the results are written back to the data lake. The analyzed and aggregated data is used to build business intelligence dashboards and reports, machine learning models, and is delivered to downstream or external systems. The different categories of data — raw, processed, and aggregated, are frequently referred to as bronze, silver, and gold, a reference to their overall data quality or value.

Protecting the Data Lake

Imagine you’ve received a large volume of data from an external data source. The incoming data is cleansed, filtered, and enriched. The data is re-formatted, partitioned, compressed for analytical efficiency, and written back to the data lake. Your analytics pipelines run complex and time-consuming queries against the data. Unfortunately, while building reports for a set of stakeholders, you realize that the original data accidentally included credit card information and other sensitive information about your customers. In addition to being out of compliance, you have the wasted time and expense of the initial data processing, as well as the extra time and expense to replace and re-process the data. The solution — Amazon Macie.

Amazon Macie

According to AWS, Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to discover and protect your sensitive data stored in Amazon Simple Storage Service (Amazon S3). Macie’s alerts, or findings, can be searched, filtered, and sent to Amazon EventBridge, formerly called Amazon CloudWatch Events, for easy integration with existing workflow or event management systems, or to be used in combination with AWS services, such as AWS Step Functions or Amazon Managed Workflows for Apache Airflow (MWAA) to take automated remediation actions.

Amazon Macie’s Summary view

Data Discovery and Protection

In this post, we will deploy an automated data inspection workflow to examine sample data in an S3-based data lake. Amazon Macie will examine data files uploaded to an encrypted S3 bucket. If sensitive data is discovered within the files, the files will be moved to an encrypted isolation bucket for further investigation. Email and SMS text alerts will be sent. This workflow will leverage Amazon EventBridge, Amazon Simple Notification Service (Amazon SNS), AWS Lambda, and AWS Systems Manager Parameter Store.

Macie data inspection workflow architecture

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/macie-demo.git

AWS resources for this post can be deployed using AWS Cloud​Formation. To follow along, you will need recent versions of Python 3, Boto3, and the AWS CLI version 2, installed.

Sample Data

We will use synthetic patient data, freely available from the MITRE Corporation. The data was generated by Synthea, MITRE’s open-source, synthetic patient generator that models the medical history of synthetic patients. Synthea data is exported in a variety of data standards, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Download and unzip the CSV files from the Synthea website.

REMOTE_FILE="synthea_sample_data_csv_apr2020.zip"
wget "https://storage.googleapis.com/synthea-public/${REMOTE_FILE}"
unzip -j "${REMOTE_FILE}" -d synthea_data/

The sixteen CSV data files contain a total of 471,852 rows of data, including column headers.

> wc -l *.csv

      598 allergies.csv
    3,484 careplans.csv
    8,377 conditions.csv
       79 devices.csv
   53,347 encounters.csv
      856 imaging_studies.csv
   15,479 immunizations.csv
   42,990 medications.csv
  299,698 observations.csv
    1,120 organizations.csv
    1,172 patients.csv
    3,802 payer_transitions.csv
       11 payers.csv
   34,982 procedures.csv
    5,856 providers.csv
        1 supplies.csv
  ------------------------------
  471,852 total

Amazon Macie Custom Data Identifier

To demonstrate some of the advanced features of Amazon Macie, we will use three Custom Data Identifiers. According to Macie’s documentation, a custom data identifier is a set of criteria that you define that reflects your organization’s particular proprietary data — for example, employee IDs, customer account numbers, or internal data classifications. We will create three custom data identifiers to detect the specific Synthea-format Patient ID, US driver number, and US passport number columns.

Post’s three custom data identifiers

The custom data identifiers in this post use a combination of regular expressions (regex) and keywords. The identifiers are designed to work with structured data, such as CSV files. Macie reports text that matches the regex pattern if any of these keywords are in the name of the column or field that stores the text, or if the text is within the maximum match distance of one of these words in a field value. Macie supports a subset of the regex pattern syntax provided by the Perl Compatible Regular Expressions (PCRE) library.

Patient ID custom data identifier console

Enable Macie

Before creating a CloudFormation stack with this demonstration’s resources, you will need to enable Amazon Macie from the AWS Management Console, or use the macie2 API and the AWS CLI with the enable-macie command.

aws macie2 enable-macie

Macie can also be enabled for your multi-account AWS Organization. The enable-organization-admin-account command designates an account as the delegated Amazon Macie administrator account for an AWS organization. For more information, see Managing multiple accounts in Amazon Macie.

AWS_ACCOUNT=111222333444
aws macie2 enable-organization-admin-account \
--admin-account-id ${AWS_ACCOUNT}

CloudFormation Stack

To create the CloudFormation stack with the supplied template, cloudformation/macie_demo.yml, run the following AWS CLI command. You will need to include an email address and phone number as input parameters. These parameter values will be used to send email and text alerts when Macie produces a sensitive data finding.

Please make sure you understand all the potential cost and security implications of creating the CloudFormation stack before continuing.

SNS_PHONE="+12223334444"
SNS_EMAIL="your-email-address@email.com"

aws cloudformation create-stack \
--stack-name macie-demo \
--template-body file://cloudformation/macie_demo.yml \
--parameters ParameterKey=SNSTopicEndpointSms,ParameterValue=${SNS_PHONE} \
ParameterKey=SNSTopicEndpointEmail,ParameterValue=${SNS_EMAIL} \
--capabilities CAPABILITY_NAMED_IAM

As shown in the AWS CloudFormation console, the new macie-demo stack will contain twenty-one AWS resources.

CloudFormation stack successfully created

Upload Data

Next, with the stack deployed, upload the CSV format data files to the encrypted S3 bucket, representing your data lake. The target S3 bucket has the following naming convention, synthea-data-<aws_account_id>-<region>. You can retrieve the two new bucket names from AWS Systems Manager Parameter Store, which were written there by CloudFormation, using the ssm API.

aws ssm get-parameters-by-path \
--path /macie_demo/ \
--query 'Parameters[*].Value'

Use the following ssm and s3 API commands to upload the data files.

DATA_BUCKET=$(aws ssm get-parameter \
--name /macie_demo/patient_data_bucket \
--query 'Parameter.Value')
aws s3 cp synthea_data/ \
    "s3://$(eval echo ${DATA_BUCKET})/patient_data/" --recursive

You should end up with sixteen CSV files in the S3 bucket, totaling approximately 82.3 MB.

Synthea patient data files uploaded to in S3

Sensitive Data Discovery Jobs

With the CloudFormation stack created and the patient data files uploaded, we will create two sensitive data discovery jobs. These jobs will scan the contents of the encrypted S3 bucket for sensitive data and report the findings. According to the documentation, you can configure a sensitive data discovery job to run only once for on-demand analysis and assessment, or on a recurring basis for periodic analysis, assessment, and monitoring. For this demonstration, we will create a one-time sensitive data discovery job using the AWS CLI. We will also create a recurring sensitive data discovery job using the AWS SDK for Python (Boto3). Both jobs can also be created from within Macie’s Jobs console.

Creating a new job Macie’s Jobs console

For both sensitive data discovery jobs, we will include the three custom data identifiers. Each of the custom data identifiers has a unique ID. We will need all three IDs to create the two sensitive data discovery jobs. You can use the AWS CLI and the macie2 API to retrieve the values.

aws macie2 list-custom-data-identifiers --query 'items[*].id'

Next, modify the job_specs/macie_job_specs_1x.json file, adding the three custom data identifier IDs. Also, update your AWS account ID and S3 bucket name (lines 3–5, 12, and 14). Note that since all the patient data files are in CSV format, we will limit our inspection to only files with a csv file extension (lines 18–33).

{
"customDataIdentifierIds": [
"custom-data-identifier-id-1",
"custom-data-identifier-id-2",
"custom-data-identifier-id-3"
],
"description": "Review Synthea patient data (1x)",
"jobType": "ONE_TIME",
"s3JobDefinition": {
"bucketDefinitions": [
{
"accountId": "111222333444",
"buckets": [
"synthea-data-111222333444-us-east-1"
]
}
],
"scoping": {
"includes": {
"and": [
{
"simpleScopeTerm": {
"comparator": "EQ",
"key": "OBJECT_EXTENSION",
"values": [
"csv"
]
}
}
]
}
}
},
"tags": {
"KeyName": "Project",
"KeyValue": "Amazon Macie Demo"
}
}

The above JSON template was generated using the standard AWS CLI generate-cli-skeleton command.

aws macie2 create-classification-job --generate-cli-skeleton

To create a one-time sensitive data discovery job using the above JSON template, run the following AWS CLI command. The unique job name will be dynamically generated based on the current time.

aws macie2 create-classification-job \
--name $(echo "SyntheaPatientData_${EPOCHSECONDS}") \
--cli-input-json file://job_specs/macie_job_specs_1x.json

In the Amazon Macie Jobs console, we can see a one-time sensitive data discovery job running. With a sampling depth of 100, the job will take several minutes to run. The samplingPercentage job property can be adjusted to scan any percentage of the data. If this value is less than 100, Macie selects the objects to analyze at random, up to the specified percentage and analyzes all the data in those objects.

One-time sensitive data discovery job running

Once the job is completed, the findings will be available in Macie’s Findings console. Using the three custom data identifiers in addition to Macie’s managed data identifiers, there should be a total of fifteen findings from the Synthea patient data files in S3. There should be six High severity findings and nine Medium severity findings. Of those, three are of a Personal finding type, seven of a Custom Identifier finding type, and five of a Multiple finding type, having both Personal and Custom Identifier finding types.

Macie’s Findings console displaying the results of the one-time job

Isolating High Severity Findings

The data inspection workflow we have deployed uses an AWS Lambda function, macie-object-mover, to isolate all data files with High severity findings to a second S3 bucket. The offending files are copied to the isolation bucket and deleted from the source bucket.

#!/usr/bin/env python3
# Purpose: Lambda function that moves S3 objects flagged by Macie
# Author: Gary A. Stafford (March 2021)
import json
import logging
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
def lambda_handler(event, context):
logging.info(f'event: {json.dumps(event)}')
destination_bucket_name = 'macie-isolation-111222333444-us-east-1'
source_bucket_name = event['detail']['resourcesAffected']['s3Bucket']['name']
file_key_name = event['detail']['resourcesAffected']['s3Object']['key']
copy_source_object = {'Bucket': source_bucket_name, 'Key': file_key_name}
logging.debug(f'destination_bucket_name: {destination_bucket_name}')
logging.debug(f'source_bucket_name: {source_bucket_name}')
logging.debug(f'file_key_name: {file_key_name}')
try:
response = s3_client.copy_object(
CopySource=copy_source_object,
Bucket=destination_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
try:
response = s3_client.delete_object(
Bucket=source_bucket_name,
Key=file_key_name
)
logger.info(response)
except ClientError as ex:
logger.error(ex)
exit(1)
return {
'statusCode': 200,
'body': json.dumps(copy_source_object)
}
view raw lambda_function.py hosted with ❤ by GitHub

Amazon EventBridge

According to Macie’s documentation, to support integration with other applications, services, and systems, such as monitoring or event management systems, Amazon Macie automatically publishes findings to Amazon EventBridge as finding events. Amazon EventBridge is a serverless event bus that makes it easier to build event-driven applications at scale using events generated from your applications, integrated Software-as-a-Service (SaaS) applications, and AWS services.

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on any of the custom data identifiers, macie-rule-custom, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

Post’s event rules, shown in the Amazon EventBridge console

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on one of the three custom data identifiers, macie-rule-high, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

{
"source": [
"aws.macie"
],
"detail-type": [
"Macie Finding"
],
"detail": {
"classificationDetails": {
"result": {
"customDataIdentifiers": {
"detections": {
"name": [
"Patient ID",
"US Passport",
"US Driver License"
]
}
}
}
}
}
}

Six data files, containing High severity findings, will be moved to the isolation bucket by the Lambda, triggered by EventBridge.

Isolation bucket containing data files with High severity findings

Scheduled Sensitive Data Discovery Jobs

Data sources commonly deliver data on a repeated basis, such as nightly data feeds. For these types of data sources, we can schedule sensitive data discovery jobs to run on a scheduled basis. For this demonstration, we will create a scheduled job using the AWS SDK for Python (Boto3). Unlike the AWS CLI-based one-time job, you don’t need to modify the project’s script, scripts/create_macie_job_daily.py. The Python script will retrieve your AWS account ID and three custom data identifier IDs. The Python script then runs the create_classification_job command.

#!/usr/bin/env python3
# Purpose: Create Daily Macie classification job – Synthea patient data
# Author: Gary A. Stafford (March 2021)
import logging
import sys
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
sts_client = boto3.client('sts')
macie_client = boto3.client('macie2')
def main():
params = get_parameters()
account_id = sts_client.get_caller_identity()['Account']
custom_data_identifiers = list_custom_data_identifiers()
create_classification_job(params['patient_data_bucket'], account_id, custom_data_identifiers)
def list_custom_data_identifiers():
"""Returns a list of all custom data identifier ids"""
custom_data_identifiers = []
try:
response = macie_client.list_custom_data_identifiers()
for item in response['items']:
custom_data_identifiers.append(item['id'])
return custom_data_identifiers
except ClientError as e:
logging.error(e)
sys.exit(e)
def create_classification_job(patient_data_bucket, account_id, custom_data_identifiers):
"""Create Daily Macie classification job"""
try:
response = macie_client.create_classification_job(
customDataIdentifierIds=custom_data_identifiers,
description='Review Synthea patient data (Daily)',
jobType='SCHEDULED',
initialRun=True,
name='SyntheaPatientData_Daily',
s3JobDefinition={
'bucketDefinitions': [
{
'accountId': account_id,
'buckets': [
patient_data_bucket
]
}
],
'scoping': {
'includes': {
'and': [
{
'simpleScopeTerm': {
'comparator': 'EQ',
'key': 'OBJECT_EXTENSION',
'values': [
'csv',
]
}
},
]
}
}
},
samplingPercentage=100,
scheduleFrequency={
'dailySchedule': {}
},
tags={
'Project': 'Amazon Macie Demo'
}
)
logging.debug(f'Response: {response}')
except ClientError as e:
logging.error(e)
sys.exit(e)
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'patient_data_bucket': ssm_client.get_parameter(Name='/macie_demo/patient_data_bucket')['Parameter']['Value']
}
return params
if __name__ == '__main__':
main()

To create the scheduled sensitive data discovery job, run the following command.

python3 ./scripts/create_macie_job_daily.py

The scheduleFrequency parameter is set to { 'dailySchedule': {} }. This value specifies a daily recurrence pattern for running the job. The initialRun parameter of the create_classification_job command is set to True. This will cause the new job to analyze all eligible objects immediately after the job is created, in addition to on a daily basis.

Scheduled sensitive data discovery job in an active/idle state

Conclusion

In this post, we learned how we can use Amazon Macie to discover and protect sensitive data in Amazon S3. We learned how to use automation to trigger alerts based on Macie’s findings and to isolate data files based on the types of findings. The post’s data inspection workflow can easily be incorporated into existing data lake ingestion pipelines to ensure the integrity of incoming data.


This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS

Introduction

In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.

Amazon EMR

According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache SparkHiveHBaseFlinkHudi, and ZeppelinJupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.

Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation, while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache Airflow’s UI

Apache recently announced the release of Airflow 2.0.0 on December 17, 2020. The latest 1.x version of Airflow is 1.10.14, released December 12, 2020. However, at the time of this post, Amazon MWAA was running Airflow 1.10.12, released August 25, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow 1.10.12 documentation.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.

Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the HooksSecretsSensors, and Operators of Airflow codebase’s contrib section.

Getting Started

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/aws-airflow-demo.git

Preliminary Steps

This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.

Configuring Amazon MWAA

The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Amazon MWAA Environment Creation Process

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12.

Amazon MWAA Environment Creation Process

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-. You must also enable Bucket Versioning on the bucket. Specify a dags folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

Amazon MWAA Environment Creation Process

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

Amazon MWAA Environment Creation Process

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

AWS CloudFormation Create Stack Console

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.

AWS CloudFormation Create Stack Console
Amazon MWAA Environment Creation Process

As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.

You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.

Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small class will be sufficient for this demonstration.

Amazon MWAA Environment Creation Process

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Amazon MWAA Environment Creation Process

Airflow Execution Role

As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole and EMR_EC2_DefaultRole. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole and EMR_EC2_DemoRole. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject permission.

Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:RunJobFlow"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"arn:aws:iam::123412341234:role/EMR_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DemoRole",
"arn:aws:iam::123412341234:role/EMR_EC2_DefaultRole",
"arn:aws:iam::123412341234:role/EMR_DefaultRole"
]
}
]
}

The Airflow service role, created by MWAA, is shown below with the new policy attached.

Airflow Execution Service Role with the new Policy Attached

Final Architecture

Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Demonstration’s Amazon MWAA and Amazon EMR Architecture

Amazon MWAA Environment

The new MWAA Environment will include a link to the Airflow UI.

Amazon MWAA Environment Console

Airflow UI

Using the supplied link, you should be able to access the Airflow UI using your web browser.

Apache Airflow UI

Our First DAG

The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.utils.dates import days_ago
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': 'emr-6.2.0',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw spark_pi_example.py hosted with ❤ by GitHub

Upload the DAG to the Airflow S3 bucket’s dags directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.

aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/

The DAG, spark_pi_example, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

Apache Airflow UI’s Trigger DAG Page

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Apache Airflow UI’s DAG Graph View

Switching to the EMR Console, you should see the single-node EMR cluster being created.

Amazon EMR Console’s Summary tab

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Amazon EMR Console’s Steps tab

Triggering DAGs Programmatically

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.

Below is an example of triggering the spark_pi_example DAG programmatically using Airflow’s trigger_dag CLI command. You will need to replace the WEB_SERVER_HOSTNAME variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME variable assumes only one MWAA environment is returned by jq.

export WEB_SERVER_HOSTNAME="<your_airflow_web_server.us-east-1.airflow.amazonaws.com>"
export ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
export DAG_NAME=spark_pi_example
aws mwaa create-cli-token –name "${ENVIRONMENT_NAME}" | \
export CLI_TOKEN=$(jq -r .CliToken)
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME}"
view raw trigger_dag.sh hosted with ❤ by GitHub

Analytics Job with Airflow

Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py PySpark application. This job should already exist in the processed data S3 bucket.

The DAG, dags/bakery_sales.py, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators and airflow.contrib.sensors packages for EMR.

Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf.

import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
SPARK_STEPS = [
{
'Name': 'Bakery Sales',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'–deploy-mode',
'cluster',
'–master',
'yarn',
'–conf',
'spark.yarn.submit.waitAppCompletion=true',
's3a://{{ var.value.work_bucket }}/analyze/bakery_sales_ssm.py'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': '{{ var.value.release_label }}',
'LogUri': 's3n://{{ var.value.logs_bucket }}',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceFleets': [
{
'Name': 'MASTER',
'InstanceFleetType': 'MASTER',
'TargetSpotCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
},
]
},
{
'Name': 'CORE',
'InstanceFleetType': 'CORE',
'TargetSpotCapacity': 2,
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.xlarge',
},
],
},
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': '{{ var.value.emr_ec2_key_pair }}',
},
'BootstrapActions': [
{
'Name': 'string',
'ScriptBootstrapAction': {
'Path': 's3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh',
}
},
],
'Configurations': [
{
'Classification': 'spark-hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
}
}
],
'VisibleToAllUsers': True,
'JobFlowRole': '{{ var.value.job_flow_role }}',
'ServiceRole': '{{ var.value.service_role }}',
'EbsRootVolumeSize': 32,
'StepConcurrencyLevel': 1,
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
},
{
'Key': 'Name',
'Value': 'Airflow EMR Demo Project'
},
{
'Key': 'Owner',
'Value': 'Data Analytics Team'
}
]
}
with DAG(
dag_id=DAG_ID,
description='Analyze Bakery Sales with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker
view raw bakery_sales.py hosted with ❤ by GitHub

Import Variables into Airflow UI

First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json file. You will need to update the values for bootstrap_bucket, emr_ec2_key_pair, logs_bucket, and work_bucket. The three S3 buckets should all exist from the previous post.

{
"bootstrap_bucket": "emr-demo-bootstrap-123412341234-us-east-1",
"emr_ec2_key_pair": "emr-demo-123412341234-us-east-1",
"job_flow_role": "EMR_EC2_DemoRole",
"logs_bucket": "emr-demo-logs-123412341234-us-east-1",
"release_label": "emr-6.2.0",
"service_role": "EMR_DemoRole",
"work_bucket": "emr-demo-work-123412341234-us-east-1",
"ec2_subnet_id": "subnet-012abc456efg78900"
}

Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Apache Airflow UI’s Admin > Variables tab

Upload the DAG, dags/bakery_sales.py, to the Airflow S3 bucket, similar to the first DAG.

aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/

The second DAG, bakery_sales, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Apache Airflow UI’s DAGs tab

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json.

{
    "airflow_email": "analytics_team@example.com",
    "email_on_failure": false,
    "email_on_retry": false
}

This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Apache Airflow UI’s Trigger DAG Screen

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Amazon EMR Console’s Steps tab

Multi-Step DAG

In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator steps parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator job_flow_overrides parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.

We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.

Below we see a snippet of two of the four Spark submit-job job definitions (steps), which have been moved to a separate JSON file, emr_steps/emr_steps.json.

[
{
"Name": "Movie Ratings",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/movies_avg_ratings_ssm.py",
"–start-date",
"2016-01-01 00:00:00",
"–end-date",
"2016-12-31 23:59:59"
]
}
},
{
"Name": "Stock Volatility",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"–deploy-mode",
"cluster",
"–master",
"yarn",
"–conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3a://{{ var.value.work_bucket }}/analyze/stock_volatility_ssm.py",
"–start-date",
"2017-01-01",
"–end-date",
"2018-12-31"
]
}
}
]
view raw emr_steps.json hosted with ❤ by GitHub

Below are the EMR cluster specifications (job_flow_overrides), which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json.

{
"Name": "demo-cluster-airflow",
"ReleaseLabel": "{{ var.value.release_label }}",
"LogUri": "s3n://{{ var.value.logs_bucket }}",
"Applications": [
{
"Name": "Spark"
}
],
"Instances": {
"InstanceFleets": [
{
"Name": "MASTER",
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
}
]
},
{
"Name": "CORE",
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"InstanceTypeConfigs": [
{
"InstanceType": "r5.2xlarge"
}
]
}
],
"Ec2SubnetId": "{{ var.value.ec2_subnet_id }}",
"KeepJobFlowAliveWhenNoSteps": false,
"TerminationProtected": false,
"Ec2KeyName": "{{ var.value.emr_ec2_key_pair }}"
},
"BootstrapActions": [
{
"Name": "string",
"ScriptBootstrapAction": {
"Path": "s3://{{ var.value.bootstrap_bucket }}/bootstrap_actions.sh"
}
}
],
"Configurations": [
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
],
"VisibleToAllUsers": true,
"JobFlowRole": "{{ var.value.job_flow_role }}",
"ServiceRole": "{{ var.value.service_role }}",
"EbsRootVolumeSize": 32,
"StepConcurrencyLevel": 5,
"Tags": [
{
"Key": "Environment",
"Value": "Development"
},
{
"Key": "Name",
"Value": "Airflow EMR Demo Project"
},
{
"Key": "Owner",
"Value": "Data Analytics Team"
}
]
}

Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name), which loads the JSON.

import json
import os
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.models import Variable
from airflow.utils.dates import days_ago
# ************** AIRFLOW VARIABLES **************
bootstrap_bucket = Variable.get('bootstrap_bucket')
emr_ec2_key_pair = Variable.get('emr_ec2_key_pair')
job_flow_role = Variable.get('job_flow_role')
logs_bucket = Variable.get('logs_bucket')
release_label = Variable.get('release_label')
service_role = Variable.get('service_role')
work_bucket = Variable.get('work_bucket')
# ***********************************************
DAG_ID = os.path.basename(__file__).replace('.py', '')
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ["{{ dag_run.conf['airflow_email'] }}"],
'email_on_failure': ["{{ dag_run.conf['email_on_failure'] }}"],
'email_on_retry': ["{{ dag_run.conf['email_on_retry'] }}"],
}
def get_object(key, bucket_name):
"""
Load S3 object as JSON
"""
hook = S3Hook()
content_object = hook.get_key(key=key, bucket_name=bucket_name)
file_content = content_object.get()['Body'].read().decode('utf-8')
return json.loads(file_content)
with DAG(
dag_id=DAG_ID,
description='Run multiple Spark jobs with Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval=None,
tags=['emr', 'spark', 'pyspark']
) as dag:
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=get_object('job_flow_overrides/job_flow_overrides.json', work_bucket)
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=get_object('emr_steps/emr_steps.json', work_bucket)
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
)
cluster_creator >> step_adder >> step_checker
view raw multiple_steps.py hosted with ❤ by GitHub

This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.

aws s3 cp emr_steps/emr_steps.json \
    s3://emr-demo-work-123412341234-us-east-1/emr_steps/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
    s3://emr-demo-work-123412341234-us-east-1/job_flow_overrides/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234-us-east-1/dags/

The second DAG, multiple_steps, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json.

Apache Airflow UI’s DAGs tab

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.

Amazon EMR Console’s Steps tab showing four Steps running in parallel

Triggering DAGs Programmatically

AWS CLI

Similar to the previous example, below we can trigger the multiple_steps DAG programmatically using Airflow’s trigger_dag CLI command. Note the addition of the —-conf named argument, which passes the configuration, containing three key/value pairs, to the trigger command as a JSON blob.

ENVIRONMENT_NAME=$(aws mwaa list-environments | jq -r '.Environments | .[]')
DAG_NAME="multiple_steps"
CONFIG="""'{
\"airflow_email\": \"analytics_team@example.com\",
\"email_on_failure\": true,
\"email_on_retry\": false
}'"""
CLI_JSON=$(aws mwaa create-cli-token –name ${ENVIRONMENT_NAME}) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
curl –request POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
–header "Authorization: Bearer ${CLI_TOKEN}" \
–header "Content-Type: text/plain" \
–data-raw "trigger_dag ${DAG_NAME} –conf ${CONFIG}"

AWS SDK

Airflow DAGs can also be triggered using the AWS SDK. For example, with boto3 for Python, we could use a script, similar to the following to remotely trigger a DAG.

#!/usr/bin/env python3
# MWAA: Trigger an Apache Airflow DAG using SDK
# Author: Gary A. Stafford (February 2021)
import logging
import boto3
import requests
logging.basicConfig(
format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
mwaa_client = boto3.client('mwaa')
ENVIRONMENT_NAME = 'Your_Airflow_Environment_Name'
DAG_NAME = 'your_dag_name'
CONFIG = '{"foo": "bar"}'
def main():
response = mwaa_client.create_cli_token(
Name=ENVIRONMENT_NAME
)
logging.info('response: ' + str(response))
token = response['CliToken']
url = 'https://{0}/aws_mwaa/cli'.format(response['WebServerHostname'])
headers = {'Authorization': 'Bearer ' + token, 'Content-Type': 'text/plain'}
payload = 'trigger_dag {0} –conf {1}'.format(DAG_NAME, CONFIG)
response = requests.post(url, headers=headers, data=payload)
logging.info('response: ' + str(response)) # should be <Response [200]>
if __name__ == '__main__':
main()
view raw trigger_dag.py hosted with ❤ by GitHub

Cleaning Up

Once you are done with the MWAA Environment, be sure to delete it as soon as possible to save additional costs. Also, delete the MWAA-VPC CloudFormation stack. These resources, like the two NAT Gateways, will also continue to generate additional costs.

aws mwaa delete-environment --name <your_mwaa_environment_name>
aws cloudformation delete-stack --stack-name MWAA-VPC

Conclusion

In this second post in the series, we explored using the newly released Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to run PySpark applications on Amazon Elastic MapReduce (Amazon EMR). In future posts, we will explore the use of Jupyter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.

If you are interested in learning more about configuring Amazon MWAA and Airflow, see my recent post, Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , , , , , ,

1 Comment

Installing Apache Superset on Amazon EMR: Add data exploration and visualization to your analytics cluster

Introduction

AWS provides nearly twenty-five different open-source data analytics applications that can be automatically installed and configured on Amazon Elastic MapReduce (Amazon EMR). Of all those options, EMR doesn’t offer a general-purpose data exploration and visualization tool. However, with EMR, you can automate the installation of additional software as part of the cluster creation process or post cluster creation. This brief post will explore how to install, configure, and access Apache Superset, the modern data exploration and visualization platform on Amazon EMR’s Master Node, as a post-cluster creation step. You can use these same techniques to install other software packages on EMR as well, manually or as part of an automated Data Pipeline.

Amazon EMR

According to AWS, Amazon EMR is a cloud-based big data platform for processing vast amounts of data using open source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Amazon EMR Console’s Cluster Summary tab

AWS currently offers 5.x and 6.x versions of Amazon EMR. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0. Each version of Amazon EMR offers incremental major and minor releases of nearly 25 different, popular open-source big-data software packages to choose from, which Amazon EMR will install and configure when the cluster is created.

Apache Superset

According to its website, Apache Superset is a modern data exploration and visualization platform. Superset is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

Superset natively supports over twenty-five data sources, including Amazon Athena and Redshift, Apache Drill, Druid, Hive, Impala, Kylin, Pinot, and Spark SQL, Elasticsearch, Google BigQuery, Hana, MySQL, Oracle, Postgres, Presto, Snowflake, Microsoft SQL Server, and Teradata.

As shown in their Gallery, Superset includes dozens of visualization types, including Pivot Table, Line Chart, Markup, Pie Chart, Filter Box, Bubble Chart, Box Plot, Histogram, Heatmap, Sunburst, Calendar Heatmap, and several geospatial types.

Apache Superset Visualization Gallery

Setup

Using this git clone command, download a copy of this post’s open-source GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-superset-demo.git

To demonstrate how to install Apache Superset on EMR, I have prepared an AWS CloudFormation template. Deploying the template, cloudformation/superset-emr-demo.yml, to AWS will result in the AWS CloudFormation stack, superset-emr-demo-dev. The stack creates a minimally-sized, two-node EMR cluster, two Amazon S3 buckets, and several AWS Systems Manager (SSM) Parameter Store parameters.

There is also a JSON-format CloudFormation parameters file, cloudformation/superset-emr-demo-params-dev.json. The parameters file contains values for eight of the ten required parameters in the CloudFormation template, all of which you can adjust. For the remaining two required parameters, you will need to supply the name of an existing EC2 key pair to access the EMR Master node. The key pair will need to be deployed to the same AWS Account into which you are deploying EMR. You will also need to supply a Subnet ID for the EMR cluster to be installed into. The subnet must have access to the Internet to install Superset’s required system and Python packages and to access Superset’s web-based user interface. If you need help creating a VPC and subnet to deploy EMR into, refer to my previous blog post, Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce.

The CloudFormation stack is created using a Python script, create_cfn_stack.py. The python script uses the AWS boto3 Python SDK.

#!/usr/bin/env python3
# Purpose: Create EMR bootstrap script bucket and deploy the cfn stack
# Author: Gary A. Stafford (December 2020)
# Reference: https://gist.github.com/svrist/73e2d6175104f7ab4d201280acba049c
# Usage Example: python3 ./create_cfn_stack.py \
# –ec2-key-name emr-demo-123456789012-us-east-1 \
# –ec2-subnet-id subnet-06aa61f790a932b32 \
# –environment dev
import argparse
import json
import logging
import os
import boto3
from botocore.exceptions import ClientError
sts_client = boto3.client('sts')
cfn_client = boto3.client('cloudformation')
region = boto3.DEFAULT_SESSION.region_name
s3_client = boto3.client('s3', region_name=region)
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
def main():
args = parse_args()
# create bootstrap bucket
account_id = sts_client.get_caller_identity()['Account']
bootstrap_bucket = f'superset-emr-demo-bootstrap-{account_id}{region}'
create_bucket(bootstrap_bucket)
# upload bootstrap script
dir_path = os.path.dirname(os.path.realpath(__file__))
upload_file(f'{dir_path}/bootstrap_emr/bootstrap_actions.sh', bootstrap_bucket, 'bootstrap_actions.sh')
# set variables
stack_name = f'emr-superset-demo-{args.environment}'
cfn_template_path = f'{dir_path}/cloudformation/superset-emr-demo.yml'
cfn_params_path = f'{dir_path}/cloudformation/superset-emr-demo-params-{args.environment}.json'
ec2_key_name = args.ec2_key_name
# append new parameters
cfn_params = _parse_parameters(cfn_params_path)
cfn_params.append({'ParameterKey': 'Ec2KeyName', 'ParameterValue': ec2_key_name})
cfn_params.append({'ParameterKey': 'Ec2SubnetId', 'ParameterValue': args.ec2_subnet_id})
cfn_params.append({'ParameterKey': 'BootstrapBucket', 'ParameterValue': bootstrap_bucket})
logging.info(json.dumps(cfn_params, indent=4))
# create the cfn stack
create_stack(stack_name, cfn_template_path, cfn_params)
def create_bucket(bootstrap_bucket):
"""Create an S3 bucket in a specified region
:param bootstrap_bucket: Bucket to create
:return: True if bucket created, else False
"""
try:
s3_client.create_bucket(Bucket=bootstrap_bucket)
logging.info(f'New bucket name: {bootstrap_bucket}')
except ClientError as e:
logging.error(e)
return False
return True
def upload_file(file_name, bootstrap_bucket, object_name):
"""Upload a file to an S3 bucket
:param file_name: File to upload
:param bootstrap_bucket: Bucket to upload to
:param object_name: S3 object name
:return: True if file was uploaded, else False
"""
# Upload the file
try:
response = s3_client.upload_file(file_name, bootstrap_bucket, object_name)
logging.info(f'File {file_name} uploaded to bucket {bootstrap_bucket} as object {object_name}')
except ClientError as e:
logging.error(e)
return False
return True
def create_stack(stack_name, cfn_template, cfn_params):
"""Create EMR Cluster CloudFormation stack"""
template_data = _parse_template(cfn_template)
create_stack_params = {
'StackName': stack_name,
'TemplateBody': template_data,
'Parameters': cfn_params,
'TimeoutInMinutes': 60,
'Capabilities': [
'CAPABILITY_NAMED_IAM',
],
'Tags': [
{
'Key': 'Project',
'Value': 'Superset EMR Demo'
},
]
}
try:
response = cfn_client.create_stack(**create_stack_params)
logging.info(f'Response: {response}')
except ClientError as e:
logging.error(e)
return False
return True
def _parse_template(template):
with open(template) as template_file_obj:
template_data = template_file_obj.read()
cfn_client.validate_template(TemplateBody=template_data)
return template_data
def _parse_parameters(parameters):
with open(parameters) as parameter_file_obj:
parameter_data = json.load(parameter_file_obj)
return parameter_data
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description='Arguments required for script.')
parser.add_argument('-e', '–environment', required=True, choices=['dev', 'test', 'prod'], help='Environment')
parser.add_argument('-k', '–ec2-key-name', required=True, help='Ec2KeyName: Name of EC2 Keypair')
parser.add_argument('-s', '–ec2-subnet-id', required=True, help='Ec2SubnetId: Name of EC2 Keypair')
args = parser.parse_args()
return args
if __name__ == '__main__':
main()
view raw create_cfn_stack.py hosted with ❤ by GitHub

To execute the Python script and create the CloudFormation stack, which will create the EMR cluster, run the following command. Remember to update the parameters to the name of your EC2 key pair and the Subnet ID for the EMR cluster.

python3 ./create_cfn_stack.py \
--ec2-key-name <your_key_pair_name> \
--ec2-subnet-id <your_subnet_id> \
--environment dev

Here is what the complete CloudFormation workflow looks like.

AWS CloudFormation stack creation

Security Group Ingress Rules

To install Superset on the EMR cluster’s Master node via SSH, you need to open port 22 on the Security Group associated with the EMR cluster’s Master Node, allowing access from your IP address. You can use the AWS Management Console or AWS CLI to open port 22. We will use jq and AWS ec2 API from the AWS CLI to get the Security Group ID associated with the EMR cluster’s Master Node and create the two ingress rules.

export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r ".SecurityGroups[] | \
select(.GroupName==\"ElasticMapReduce-master\").GroupId" | \
head -n 1)
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32

Superset Script

Once the CloudFormation stack is created and the ports are open, we can install Apache Superset on the EMR Master Node. The bootstrap script,bootstrap_emr/bootstrap_superset.sh, will be used to install Apache Superset onto the EMR cluster’s Master Node as the hadoop user. The script is roughly based on Superset’s Installing from Scratch instructions.

As part of installing Superset, the script will also deploy several common database drivers, including Amazon Athena, Amazon Redshift, Apache Spark SQL, Presto, PostgreSQL, and MySQL. The script will also create a Superset Admin role, and two Superset User roles — Alpha and Gamma.

#!/bin/bash
# Purpose: Installs Apache Superset on EMR
# Author: Gary A. Stafford (December 2020)
# Usage: sh ./bootstrap_superset.sh 8280
# Reference: https://superset.apache.org/docs/installation/installing-superset-from-scratch
# port for superset (default: 8280)
export SUPERSET_PORT="${1:-8280}"
# install required packages
sudo yum -y install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel \
openssl-devel cyrus-sasl-devel openldap-devel python3-devel.x86_64
# optionally, update Master Node packages
sudo yum -y update
# install required Python package
python3 -m pip install –user –upgrade setuptools virtualenv
python3 -m venv venv
. venv/bin/activate
python3 -m pip install –upgrade apache-superset \
PyAthenaJDBC PyAthena sqlalchemy-redshift pyhive mysqlclient psycopg2-binary
command -v superset
superset db upgrade
export FLASK_APP=superset
echo "export FLASK_APP=superset" >>~/.bashrc
touch superset_config.py
echo "ENABLE_TIME_ROTATE = True" >>superset_config.py
echo "export SUPERSET_CONFIG_PATH=superset_config.py" >>~/.bashrc
export ADMIN_USERNAME="SupersetAdmin"
export ADMIN_PASSWORD="Admin1234"
# create superset admin
superset fab create-admin \
–username "${ADMIN_USERNAME}" \
–firstname Superset \
–lastname Admin \
–email superset_admin@example.com \
–password "${ADMIN_PASSWORD}"
superset init
# create two sample superset users
superset fab create-user \
–role Alpha \
–username SupersetUserAlpha \
–firstname Superset \
–lastname UserAlpha \
–email superset_user_alpha@example.com \
–password UserAlpha1234
superset fab create-user \
–role Gamma \
–username SupersetUserGamma \
–firstname Superset \
–lastname UserGamma \
–email superset_user_gamma@example.com \
–password UserGamma1234
# get instance id
INSTANCE_ID="$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .instanceId)"
export INSTANCE_ID
echo "INSTANCE_ID: ${INSTANCE_ID}"
# use instance id to get public dns of master node
PUBLIC_MASTER_DNS="$(aws ec2 describe-instances –instance-id ${INSTANCE_ID} |
jq -r '.Reservations[0].Instances[0].PublicDnsName')"
export PUBLIC_MASTER_DNS
echo "PUBLIC_MASTER_DNS: ${PUBLIC_MASTER_DNS}"
# start superset in background
nohup superset run \
–host "${PUBLIC_MASTER_DNS}" \
–port "${SUPERSET_PORT}" \
–with-threads –reload –debugger \
>superset_output.log 2>&1 </dev/null &
# output connection info
printf %s """
**********************************************************************
Superset URL: http://${PUBLIC_MASTER_DNS}:${SUPERSET_PORT}
Admin Username: ${ADMIN_USERNAME}
Admin Password: ${ADMIN_PASSWORD}
**********************************************************************
"""

To install Superset using the bootstrap script, we will use another Python script, install_superset.py. The script uses paramiko, a Python implementation of SSHv2. The script also uses scp, a module that uses a paramiko transport to send and receive files via the scp1 protocol.

#!/bin/bash
# Purpose: Installs Apache Superset on EMR
# Author: Gary A. Stafford (December 2020)
# Usage: sh ./bootstrap_superset.sh 8280
# Reference: https://superset.apache.org/docs/installation/installing-superset-from-scratch
# port for superset (default: 8280)
SUPERSET_PORT="${1:-8280}"
# install required packages
sudo yum -y install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel \
openssl-devel cyrus-sasl-devel openldap-devel python3-devel.x86_64
# optionally, update master node packages
sudo yum -y update
# set up a python virtual environment
python3 -m pip install –user –upgrade setuptools virtualenv
python3 -m venv venv
. venv/bin/activate
python3 -m pip install –upgrade apache-superset \
PyAthenaJDBC PyAthena sqlalchemy-redshift pyhive mysqlclient psycopg2-binary
command -v superset
superset db upgrade
FLASK_APP=superset
export "FLASK_APP=${FLASK_APP}"
echo "export FLASK_APP=superset" >>~/.bashrc
touch superset_config.py
echo "ENABLE_TIME_ROTATE = True" >>superset_config.py
echo "export SUPERSET_CONFIG_PATH=superset_config.py" >>~/.bashrc
ADMIN_USERNAME="SupersetAdmin"
ADMIN_PASSWORD="Admin1234"
# create superset admin
superset fab create-admin \
–username "${ADMIN_USERNAME}" \
–firstname Superset \
–lastname Admin \
–email superset_admin@example.com \
–password "${ADMIN_PASSWORD}"
superset init
# create two sample superset users
superset fab create-user \
–role Alpha \
–username SupersetUserAlpha \
–firstname Superset \
–lastname UserAlpha \
–email superset_user_alpha@example.com \
–password UserAlpha1234
superset fab create-user \
–role Gamma \
–username SupersetUserGamma \
–firstname Superset \
–lastname UserGamma \
–email superset_user_gamma@example.com \
–password UserGamma1234
# get ec2 instance id
INSTANCE_ID="$(curl –silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .instanceId)"
# use ec2 instance id to get public dns of master node
PUBLIC_MASTER_DNS="$(aws ec2 describe-instances –instance-id ${INSTANCE_ID} |
jq -r '.Reservations[0].Instances[0].PublicDnsName')"
# start superset in background
nohup superset run \
–host "${PUBLIC_MASTER_DNS}" \
–port "${SUPERSET_PORT}" \
–with-threads –reload –debugger \
>superset_output.log 2>&1 </dev/null &
# output superset connection info
printf %s """
**********************************************************************
Superset URL: http://${PUBLIC_MASTER_DNS}:${SUPERSET_PORT}
Admin Username: ${ADMIN_USERNAME}
Admin Password: ${ADMIN_PASSWORD}
**********************************************************************
"""
view raw install_superset.py hosted with ❤ by GitHub

The script requires a single input parameter, ec2-key-path, which is the full path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem). Optionally, you can change the default Superset port of 8280, using the superset-port parameter.

python3 ./install_superset.py \
--ec2-key-path </path/to/my-key-pair.pem> \
--superset-port 8280

The script uses SSH and SCP to deploy and execute the bootstrap script,bootstrap_superset.sh. The output from the script includes the URL of Apache Superset running on the EMR cluster. The output also contains the username and password of the Superset Admin.

********************************************************************
Superset URL: http://ec2-111-22-333-44.compute-1.amazonaws.com:8280
Admin Username: SupersetAdmin
Admin Password: Admin1234
********************************************************************

SSH Tunnel

According to AWS, EMR applications publish user interfaces as websites hosted on the master node. For security reasons, these websites are only available on the master node’s local web server. To reach any of the web interfaces, you must establish an SSH tunnel with the master node using either dynamic or local port forwarding. If you are using dynamic port forwarding, you must also configure a proxy server to view the web interfaces.

Instructions for creating an SSH tunnel to access UI’s on EMR

Running the command in your terminal will start the SSH tunnel on port 8157. Once the tunnel is enabled, you can access Apache Superset in a web browser, using the script output’s URL shown in the script output above. Use the Admin credentials or either of the two User credentials to sign in to Superset.

Apache Superset Sign In screen

Once signed in, you will have the ability to connect to your data sources and explore and visualize data. Below, we see an example of a SQL query executed against an Amazon RDS for PostgreSQL database, running in a separate VPC from EMR.

Sample query of an Amazon RDS PostgreSQL database using Superset’s SQL Editor

Conclusion

In this post, we learned how to install Apache Superset onto the Master Node of an Amazon EMR Cluster. If you want to install an application on all the nodes of an EMR cluster, you can add the commands to the bootstrap script, which runs when CloudFormation creates the cluster.


This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.

, , ,

Leave a comment

Collecting and Analyzing IoT Data in Near Real-Time with AWS IoT, LoRa, and LoRaWAN

Introduction

In a recent post published on ITNEXT, LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT, we explored the use of the LoRa (Long Range) and LoRaWAN protocols to transmit and receive sensor data, over a substantial distance, between an IoT device, containing several embedded sensors, and an IoT gateway. In this post, we will extend that architecture to the Cloud, using AWS IoT, a broad and deep set of IoT services, from the edge to the Cloud. We will securely collect, transmit, and analyze IoT data using the AWS cloud platform.

LoRa and LoRaWAN

According to the LoRa Alliance, Low-Power, Wide-Area Networks (LPWAN) are projected to support a major portion of the billions of devices forecasted for the Internet of Things (IoT). LoRaWAN is designed from the bottom up to optimize LPWANs for battery lifetime, capacity, range, and cost. LoRa and LoRaWAN permit long-range connectivity for IoT devices in different types of industries. According to Wikipedia, LoRaWAN defines the communication protocol and system architecture for the network, while the LoRa physical layer enables the long-range communication link.

AWS IoT

AWS describes AWS IoT as a set of managed services that enable ‘internet-connected devices to connect to the AWS Cloud and lets applications in the cloud interact with internet-connected devices.’ AWS IoT services span three categories: Device Software, Connectivity and Control, and Analytics.

In this post, we will focus on three AWS IOT services, one from each category, including AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics. According to AWS, the AWS IoT Device SDKs include open-source libraries and developer and porting guides with samples to help you build innovative IoT products or solutions on your choice of hardware platforms. AWS IoT Core is a managed cloud service that lets connected devices easily and securely interact with cloud applications and other devices. AWS IoT Core can process and route messages to AWS endpoints and other devices reliably and securely. Finally, AWS IoT Analytics is a fully-managed IoT analytics service, designed specifically for IoT, which collects, pre-processes, enriches, stores, and analyzes IoT device data at scale.

To learn more about AWS IoT, specifically the AWS IoT services we will be exploring within this post, I recommend reading my recent post published on Towards Data Science, Getting Started with IoT Analytics on AWS.

Hardware Selection

In this post, we will use the following hardware.

IoT Device with Embedded Sensors

An Arduino single-board microcontroller will serve as our IoT device. The 3.3V AI-enabled Arduino Nano 33 BLE Sense board (Amazon: USD 36.00), released in August 2019, comes with the powerful nRF52840 processor from Nordic Semiconductors, a 32-bit ARM Cortex-M4 CPU running at 64 MHz, 1MB of CPU Flash Memory, 256KB of SRAM, and a NINA-B306 stand-alone Bluetooth 5 low energy (BLE) module.

The Sense contains an impressive array of embedded sensors:

  • 9-axis Inertial Sensor (LSM9DS1): 3D digital linear acceleration sensor, a 3D digital
    angular rate sensor, and a 3D digital magnetic sensor
  • Humidity and Temperature Sensor (HTS221): Capacitive digital sensor for relative humidity and temperature
  • Barometric Sensor (LPS22HB): MEMS nano pressure sensor: 260–1260 hectopascal (hPa) absolute digital output barometer
  • Microphone (MP34DT05): MEMS audio sensor omnidirectional digital microphone
  • Gesture, Proximity, Light Color, and Light Intensity Sensor (APDS9960): Advanced Gesture detection, Proximity detection, Digital Ambient Light Sense (ALS), and Color Sense (RGBC).

The Arduino Sense is an excellent, low-cost single-board microcontroller for learning about the collection and transmission of IoT sensor data.

IoT Gateway

An IoT Gateway, according to TechTarget, is a physical device or software program that serves as the connection point between the Cloud and controllers, sensors, and intelligent devices. All data moving to the Cloud, or vice versa, goes through the gateway, which can be either a dedicated hardware appliance or software program.

LoRa Gateways, to paraphrase The Things Network, form the bridge between devices and the Cloud. Devices use low power networks like LoRaWAN to connect to the Gateway, while the Gateway uses high bandwidth networks like WiFi, Ethernet, or Cellular to connect to the Cloud.

A third-generation Raspberry Pi 3 Model B+ single-board computer (SBC) will serve as our LoRa IoT Gateway. This Raspberry Pi model features a 1.4GHz Cortex-A53 (ARMv8) 64-bit quad-core processor System on a Chip (SoC), 1GB LPDDR2 SDRAM, dual-band wireless LAN, Bluetooth 4.2 BLE, and Gigabit Ethernet (Amazon: USD 42.99).

LoRa Transceiver Modules

To transmit the IoT sensor data between the IoT device, containing the embedded sensors, and the IoT gateway, I have used the REYAX RYLR896 LoRa transceiver module (Amazon: USD 19.50 x 2). The transceiver modules are commonly referred to as a universal asynchronous receiver-transmitter (UART). A UART is a computer hardware device for asynchronous serial communication in which the data format and transmission speeds are configurable.

According to the manufacturer, REYAX, the RYLR896 contains the Semtech SX1276 long-range, low power transceiver. The RYLR896 module provides ultra-long range spread spectrum communication and high interference immunity while minimizing current consumption. Each RYLR896 module contains a small, PCB integrated, helical antenna. This transceiver operates at both the 868 and 915 MHz frequency ranges. In this demonstration, we will be transmitting at 915 MHz for North America.

The Arduino Sense (IoT device) transmits data, using one of the RYLR896 modules (shown below front). The Raspberry Pi (IoT Gateway), connected to the other RYLR896 module (shown below rear), receives the data.

LoRaWAN Security

The RYLR896 is capable of AES 128-bit data encryption. Using the Advanced Encryption Standard (AES), we will encrypt the data sent from the IoT device to the IoT gateway, using a 32 hex digit password (128 bits / 4 bits/hex digit).

Provisioning AWS Resources

To start, we will create the necessary AWS IoT and associated resources on the AWS cloud platform. Once these resources are in place, we can then proceed to configure the IoT device and IoT gateway to securely transmit the sensor data to the Cloud.

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone --branch master --single-branch --depth 1 --no-tags \
https://github.com/garystafford/iot-aws-lora-demo.git

AWS CloudFormation

The CloudFormation template, iot-analytics.yaml, will create an AWS IoT CloudFormation stack containing the following resources.

  • AWS IoT Thing
  • AWS IoT Thing Policy
  • AWS IoT Core Topic Rule
  • AWS IoT Analytics Channel, Pipeline, Data store, and Data set
  • AWS Lambda and Lambda Permission
  • Amazon S3 Bucket
  • Amazon SageMaker Notebook Instance
  • AWS IAM Roles

Please be aware of the costs involved with the AWS resources used in the CloudFormation template before continuing. To create the AWS CloudFormation stack from the included CloudFormation template, execute the following AWS CLI command.

aws cloudformation create-stack \
–stack-name lora-iot-demo \
–template-body file://cloudformation/iot-analytics.yaml \
–parameters ParameterKey=ProjectName,ParameterValue=lora-iot-demo \
ParameterKey=IoTTopicName,ParameterValue=lora-iot-demo \
–capabilities CAPABILITY_NAMED_IAM

The resulting CloudFormation stack should contain 16 AWS resources.

Additional Resources

Unfortunately, AWS CloudFormation cannot create all the AWS IoT resources we require for this demonstration. To complete the AWS provisioning process, execute the following series of AWS CLI commands, aws_cli_commands.md. These commands will create the remaining resources, including an AWS IoT Thing Type, Thing Group, Thing Billing Group, and an X.509 Certificate.

# LoRaWAN / AWS IoT Demo
# Author: Gary Stafford
# Run AWS CLI commands after CloudFormation stack completes successfully
# variables
thingName=lora-iot-gateway-01
thingPolicy=LoRaDevicePolicy
thingType=LoRaIoTGateway
thingGroup=LoRaIoTGateways
thingBillingGroup=LoRaIoTGateways
mkdir ${thingName}
aws iot create-keys-and-certificate \
–certificate-pem-outfile "${thingName}/${thingName}.cert.pem" \
–public-key-outfile "${thingName}/${thingName}.public.key" \
–private-key-outfile "${thingName}/${thingName}.private.key" \
–set-as-active
# assuming you only have one certificate registered
certificate=$(aws iot list-certificates | jq '.[][] | .certificateArn')
## alternately, for a specific certificate if you have more than one
# aws iot list-certificates
## then change the value below
# certificate=arn:aws:iot:us-east-1:123456789012:cert/<certificate>
aws iot attach-policy \
–policy-name $thingPolicy \
–target $certificate
aws iot attach-thing-principal \
–thing-name $thingName \
–principal $certificate
aws iot create-thing-type \
–thing-type-name $thingType \
–thing-type-properties "thingTypeDescription=LoRaWAN IoT Gateway"
aws iot create-thing-group \
–thing-group-name $thingGroup \
–thing-group-properties "thingGroupDescription=\"LoRaWAN IoT Gateway Thing Group\", attributePayload={attributes={Manufacturer=RaspberryPiFoundation}}"
aws iot add-thing-to-thing-group \
–thing-name $thingName \
–thing-group-name $thingGroup
aws iot create-billing-group \
–billing-group-name $thingBillingGroup \
–billing-group-properties "billingGroupDescription=\"Gateway Billing Group\""
aws iot add-thing-to-billing-group \
–thing-name $thingName \
–billing-group-name $thingBillingGroup
aws iot update-thing \
–thing-name $thingName \
–thing-type-name $thingType \
–attribute-payload "{\"attributes\": {\"GatewayMfr\":\"RaspberryPiFoundation\", \"LoRaMfr\":\"REYAX\", \"LoRaModel\":\"RYLR896\"}}"
aws iot describe-thing \
–thing-name $thingName
view raw aws_cli_commands.sh hosted with ❤ by GitHub

IoT Device Configuration

With the AWS resources deployed, we can configure the IoT device and IoT Gateway.

Arduino Sketch

For those not familiar with Arduino, a sketch is the name that Arduino uses for a program. It is the unit of code that is uploaded into non-volatile flash memory and runs on an Arduino board. The Arduino language is a set of C and C++ functions. All standard C and C++ constructs supported by the avr-g++ compiler should work in Arduino.

For this post, the sketch, lora_iot_demo_aws.ino, contains the code necessary to collect and securely transmit the environmental sensor data, including temperature, relative humidity, barometric pressure, Red, Green, and Blue (RGB) color, and ambient light intensity, using the LoRaWAN protocol.

/*
Description: Transmits Arduino Nano 33 BLE Sense sensor telemetry over LoRaWAN,
including temperature, humidity, barometric pressure, and color,
using REYAX RYLR896 transceiver modules
http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
Author: Gary Stafford
*/
#include <Arduino_HTS221.h>
#include <Arduino_LPS22HB.h>
#include <Arduino_APDS9960.h>
const int UPDATE_FREQUENCY = 5000; // update frequency in ms
const float CALIBRATION_FACTOR = –4.0; // temperature calibration factor (Celsius)
const int ADDRESS = 116;
const int NETWORK_ID = 6;
const String PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF";
const String DELIMITER = "|";
String uid = "";
void setup()
{
Serial.begin(9600);
Serial1.begin(115200); // default baud rate of module is 115200
delay(1000); // wait for LoRa module to be ready
// get unique transceiver id to identify iot device on network
Serial1.print((String)"AT+UID?\r\n");
uid = Serial1.readString();
uid.replace("+UID=", ""); // trim off '+UID=' at start of line
uid.replace("\r\n", ""); // trim off CR/LF at end of line
// needs all need to be same for receiver and transmitter
Serial1.print((String)"AT+ADDRESS=" + ADDRESS + "\r\n");
delay(200);
Serial1.print((String)"AT+NETWORKID=" + NETWORK_ID + "\r\n");
delay(200);
Serial1.print("AT+CPIN=" + PASSWORD + "\r\n");
delay(200);
Serial1.print("AT+CPIN?\r\n"); // confirm password is set
if (!HTS.begin())
{ // initialize HTS221 sensor
Serial.println("Failed to initialize humidity temperature sensor!");
while (1);
}
if (!BARO.begin())
{ // initialize LPS22HB sensor
Serial.println("Failed to initialize pressure sensor!");
while (1);
}
// avoid bad readings to start bug
// https://forum.arduino.cc/index.php?topic=660360.0
BARO.readPressure();
delay(1000);
if (!APDS.begin())
{ // initialize APDS9960 sensor
Serial.println("Failed to initialize color sensor!");
while (1);
}
}
void loop()
{
updateReadings();
delay(UPDATE_FREQUENCY);
}
void updateReadings()
{
float temperature = getTemperature(CALIBRATION_FACTOR);
float humidity = getHumidity();
float pressure = getPressure();
int colors[4];
getColor(colors);
String payload = buildPayload(temperature, humidity, pressure, colors);
Serial.println("Payload: " + payload); // display the payload for debugging
Serial1.print(payload); // send the payload over LoRaWAN WiFi
displayResults(temperature, humidity, pressure, colors); // display the results for debugging
}
float getTemperature(float calibration)
{
return HTS.readTemperature() + calibration;
}
float getHumidity()
{
return HTS.readHumidity();
}
float getPressure()
{
return BARO.readPressure();
}
void getColor(int c[])
{
// check if a color reading is available
while (!APDS.colorAvailable())
{
delay(5);
}
int r, g, b, a;
APDS.readColor(r, g, b, a);
c[0] = r;
c[1] = g;
c[2] = b;
c[3] = a;
}
// display for debugging purposes
void displayResults(float t, float h, float p, int c[])
{
Serial.println((String)"UID: " + uid);
Serial.print("Temperature: ");
Serial.println(t);
Serial.print("Humidity: ");
Serial.println(h);
Serial.print("Pressure: ");
Serial.println(p);
Serial.print("Color (r, g, b, a): ");
Serial.print(c[0]);
Serial.print(", ");
Serial.print(c[1]);
Serial.print(", ");
Serial.print(c[2]);
Serial.print(", ");
Serial.println(c[3]);
Serial.println("———-");
}
String buildPayload(float t, float h, float p, int c[])
{
String readings = "";
readings += uid;
readings += DELIMITER;
readings += t;
readings += DELIMITER;
readings += h;
readings += DELIMITER;
readings += p;
readings += DELIMITER;
readings += c[0];
readings += DELIMITER;
readings += c[1];
readings += DELIMITER;
readings += c[2];
readings += DELIMITER;
readings += c[3];
String payload = "";
payload += "AT+SEND=";
payload += ADDRESS;
payload += ",";
payload += readings.length();
payload += ",";
payload += readings;
payload += "\r\n";
return payload;
}

AT Commands

Communications with the RYLR896’s long-range modem is done using AT commands. AT commands are instructions used to control a modem. AT is the abbreviation of ATtention. Every command line starts with AT. That is why modem commands are called AT commands, according to Developer’s Home. A complete list of AT commands can be downloaded as a PDF from the RYLR896 product page.

To efficiently transmit the environmental sensor data from the IoT sensor to the IoT gateway, the sketch concatenates the sensor ID and the sensor values together in a single string. The string will be incorporated into an AT command, sent to the RYLR896 LoRa transceiver module. To make it easier to parse the sensor data on the IoT gateway, we will delimit the sensor values with a pipe (|), as opposed to a comma. According to REYAX, the maximum length of the LoRa payload is approximately 330 bytes.

Below, we see an example of an AT command used to send the sensor data from the IoT sensor and the corresponding unencrypted data received by the IoT gateway. Both contain the LoRa transmitter Address ID, payload length (62 bytes in the example), and the payload. The data received by the IoT gateway also has the Received signal strength indicator (RSSI), and Signal-to-noise ratio (SNR).

Receiving Data on IoT Gateway

The Raspberry Pi will act as a LoRa IoT gateway, receiving the environmental sensor data from the IoT device, the Arduino, and sending the data to AWS. The Raspberry Pi runs a Python script, rasppi_lora_receiver_aws.py, which will receive the data from the Arduino Sense, decrypt the data, parse the sensor values, and serialize the data to a JSON payload, and finally, transmit the payload in an MQTT-protocol message to AWS. The script uses the pyserial, the Python Serial Port Extension, which encapsulates the access for the serial port for communication with the RYLR896 module. The script uses the AWS IoT Device SDK for Python v2 to communicate with AWS.

import json
import logging
import sys
import threading
import time
from argparse import ArgumentParser
import serial
from awscrt import io, mqtt, auth, http, exceptions
from awsiot import mqtt_connection_builder
# LoRaWAN IoT Sensor Demo
# Using REYAX RYLR896 transceiver modules
# http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
# Author: Gary Stafford
# Requirements: python3 -m pip install –user -r requirements.txt
# Usage:
# sh ./rasppi_lora_receiver_aws.sh \
# b1d0fncxn1hf3m-ats.iot.us-east-1.amazonaws.com
# constants
ADDRESS = 116
NETWORK_ID = 6
PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF"
# global variables
count = 0 # from args
received_count = 0
received_all_event = threading.Event()
def main():
# get args
logging.basicConfig(filename='output.log',
filemode='w', level=logging.DEBUG)
args = get_args() # get args
payload = ""
lora_payload = {}
# set log level
io.init_logging(getattr(io.LogLevel, args.verbosity), 'stderr')
# spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
# set MQTT connection
mqtt_connection = set_mqtt_connection(args, client_bootstrap)
logging.debug("Connecting to {} with client ID '{}'…".format(
args.endpoint, args.client_id))
connect_future = mqtt_connection.connect()
# future.result() waits until a result is available
connect_future.result()
logging.debug("Connecting to REYAX RYLR896 transceiver module…")
serial_conn = serial.Serial(
port=args.tty,
baudrate=int(args.baud_rate),
timeout=5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
if serial_conn.isOpen():
logging.debug("Connected!")
set_lora_config(serial_conn)
check_lora_config(serial_conn)
while True:
# read data from serial port
serial_payload = serial_conn.readline()
logging.debug(serial_payload)
if len(serial_payload) >= 1:
payload = serial_payload.decode(encoding="utf-8")
payload = payload[:2]
try:
data = parse_payload(payload)
lora_payload = {
"ts": time.time(),
"data": {
"device_id": str(data[0]),
"gateway_id": str(args.gateway_id),
"temperature": float(data[1]),
"humidity": float(data[2]),
"pressure": float(data[3]),
"color": {
"red": float(data[4]),
"green": float(data[5]),
"blue": float(data[6]),
"ambient": float(data[7])
}
}
}
logging.debug(lora_payload)
except IndexError:
logging.error("IndexError: {}".format(payload))
except ValueError:
logging.error("ValueError: {}".format(payload))
# publish mqtt message
message_json = json.dumps(
lora_payload,
sort_keys=True,
indent=None,
separators=(',', ':'))
try:
mqtt_connection.publish(
topic=args.topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE)
except mqtt.SubscribeError as err:
logging.error(".SubscribeError: {}".format(err))
except exceptions.AwsCrtError as err:
logging.error("AwsCrtError: {}".format(err))
def set_mqtt_connection(args, client_bootstrap):
if args.use_websocket:
proxy_options = None
if args.proxy_host:
proxy_options = http.HttpProxyOptions(
host_name=args.proxy_host, port=args.proxy_port)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(
client_bootstrap)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=args.endpoint,
client_bootstrap=client_bootstrap,
region=args.signing_region,
credentials_provider=credentials_provider,
websocket_proxy_options=proxy_options,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
else:
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=args.endpoint,
cert_filepath=args.cert,
pri_key_filepath=args.key,
client_bootstrap=client_bootstrap,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
return mqtt_connection
def get_args():
parser = ArgumentParser(
description="Send and receive messages through and MQTT connection.")
parser.add_argument("–tty", required=True,
help="serial tty", default="/dev/ttyAMA0")
parser.add_argument("–baud-rate", required=True,
help="serial baud rate", default=1152000)
parser.add_argument('–endpoint', required=True, help="Your AWS IoT custom endpoint, not including a port. " +
"Ex: \"abcd123456wxyz-ats.iot.us-east-1.amazonaws.com\"")
parser.add_argument('–cert', help="File path to your client certificate, in PEM format.")
parser.add_argument('–key', help="File path to your private key, in PEM format.")
parser.add_argument('–root-ca', help="File path to root certificate authority, in PEM format. " +
"Necessary if MQTT server uses a certificate that's not already in " +
"your trust store.")
parser.add_argument('–client-id', default='samples-client-id',
help="Client ID for MQTT connection.")
parser.add_argument('–topic', default="samples/test",
help="Topic to subscribe to, and publish messages to.")
parser.add_argument('–message', default="Hello World!", help="Message to publish. " +
"Specify empty string to publish nothing.")
parser.add_argument('–count', default=0, type=int, help="Number of messages to publish/receive before exiting. " +
"Specify 0 to run forever.")
parser.add_argument('–use-websocket', default=False, action='store_true',
help="To use a websocket instead of raw mqtt. If you specify this option you must "
"specify a region for signing, you can also enable proxy mode.")
parser.add_argument('–signing-region', default='us-east-1',
help="If you specify –use-web-socket, this is the region that will be used for computing "
"the Sigv4 signature")
parser.add_argument('–proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " +
"you will likely need to set –root-ca to the ca for your proxy.")
parser.add_argument('–proxy-port', type=int, default=8080,
help="Port for proxy to connect to.")
parser.add_argument('–verbosity', choices=[x.name for x in io.LogLevel], default=io.LogLevel.NoLogs.name,
help='Logging level')
parser.add_argument("–gateway-id", help="IoT Gateway serial number")
args = parser.parse_args()
return args
def parse_payload(payload):
# input: +RCV=116,29,0447383033363932003C0034|23.94|37.71|99.89|16|38|53|80,-61,56
# output: [0447383033363932003C0034, 23.94, 37.71, 99.89, 16.0, 38.0, 53.0, 80.0]
payload = payload.split(",")
payload = payload[2].split("|")
payload = [i for i in payload]
return payload
def set_lora_config(serial_conn):
# configures the REYAX RYLR896 transceiver module
serial_conn.write(str.encode("AT+ADDRESS=" + str(ADDRESS) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID=" + str(NETWORK_ID) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network Id set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN=" + PASSWORD + "\r\n"))
time.sleep(1)
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES-128 password set? {}".format(serial_payload.decode(encoding="utf-8")))
def check_lora_config(serial_conn):
serial_conn.write(str.encode("AT?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Module responding? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+ADDRESS?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network id: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+IPR?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("UART baud rate: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+BAND?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF frequency: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CRFOP?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF output power: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+MODE?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Work mode: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+PARAMETER?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF parameters: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES128 password of the network: {}".format(serial_payload.decode(encoding="utf-8")))
# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
logging.error("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
logging.warning("Connection resumed. return_code: {} session_present: {}".format(
return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
logging.warning("Session did not persist. Resubscribing to existing topics…")
resubscribe_future, _ = connection.resubscribe_existing_topics()
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
logging.warning("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
logging.debug("Received message from topic '{}': {}".format(topic, payload))
global received_count
received_count += 1
if received_count == count:
received_all_event.set()
if __name__ == "__main__":
sys.exit(main())

Running the IoT Gateway Python Script

To run the Python script on the Raspberry Pi, we will use a helper shell script, rasppi_lora_receiver_aws.sh. The shell script helps construct the arguments required to execute the Python script.

#!/bin/bash
# LoRaWAN / AWS IoT Demo
# Author: Gary A. Stafford
# Revised: 2021-03-24
# Start IoT data collector script and tails output
# Usage:
# sh ./rasppi_lora_receiver_aws.sh a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com
if [[ $# -ne 1 ]]; then
echo "Script requires 1 parameter!"
exit 1
fi
# input parameters
ENDPOINT=$1 # e.g. a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com
DEVICE="lora-iot-gateway-01" # matches CloudFormation thing name
CERTIFICATE="${DEVICE}-cert.pem" # e.g. lora-iot-gateway-01-cert.pem
KEY="${DEVICE}-private.key" # e.g. lora-iot-gateway-01-private.key
GATEWAY_ID=$(< /proc/cpuinfo grep Serial | grep -oh "[a-z0-9]*$") # e.g. 00000000f62051ce
# output for debugging
echo "DEVICE: ${DEVICE}"
echo "ENDPOINT: ${ENDPOINT}"
echo "CERTIFICATE: ${CERTIFICATE}"
echo "KEY: ${KEY}"
echo "GATEWAY_ID: ${GATEWAY_ID}"
# call the python script
nohup python3 rasppi_lora_receiver_aws.py \
–endpoint "${ENDPOINT}" \
–cert "${DEVICE}-creds/${CERTIFICATE}" \
–key "${DEVICE}-creds/${KEY}" \
–root-ca "${DEVICE}-creds/AmazonRootCA1.pem" \
–client-id "${DEVICE}" \
–topic "lora-iot-demo" \
–gateway-id "${GATEWAY_ID}" \
–verbosity "Info" \
–tty "/dev/ttyAMA0" \
–baud-rate 115200 \
>collector.log 2>&1 </dev/null &
sleep 2
# tail the log (Control-C to exit)
tail -f collector.log

To run the helper script, we execute the following command, substituting the input parameter, the AWS IoT endpoint, with your endpoint.

sh ./rasppi_lora_receiver_aws.sh \
  a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com

You should see the console output, similar to the following.

The script starts by configuring the RYLR896 module and outputting that configuration to a log file, output.log. If successful, we should see the following debug information logged.

DEBUG:root:Connecting to a1b2c3d4e5f6-ats.iot.us-east-1.amazonaws.com with client ID 'lora-iot-gateway-01'
DEBUG:root:Connecting to REYAX RYLR896 transceiver module
DEBUG:root:Connected!
DEBUG:root:Address set? +OK
DEBUG:root:Network Id set? +OK
DEBUG:root:AES-128 password set? +OK
DEBUG:root:Module responding? +OK
DEBUG:root:Address: +ADDRESS=116
DEBUG:root:Network id: +NETWORKID=6
DEBUG:root:UART baud rate: +IPR=115200
DEBUG:root:RF frequency: +BAND=915000000
DEBUG:root:RF output power: +CRFOP=15
DEBUG:root:Work mode: +MODE=0
DEBUG:root:RF parameters: +PARAMETER=12,7,1,4
DEBUG:root:AES128 password of the network: +CPIN=92A0ECEC9000DA0DCF0CAAB0ABA2E0EF

That sensor data is also written to the log file for debugging purposes. This first line in the log (shown below) is the raw decrypted data received from the IoT device via LoRaWAN. The second line is the JSON-serialized payload, sent securely to AWS, using the MQTT protocol.

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.46|41.89|99.38|230|692|833|1116,-48,39\r\n'

DEBUG:root:{'ts': 1598305503.7041512, 'data': {'humidity': 41.89, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 230.0, 'blue': 833.0, 'ambient': 1116.0, 'green': 692.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.46|41.63|99.38|236|696|837|1127,-49,35\r\n'

DEBUG:root:{'ts': 1598305513.7918658, 'data': {'humidity': 41.63, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 236.0, 'blue': 837.0, 'ambient': 1127.0, 'green': 696.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.44|41.57|99.38|232|686|830|1113,-48,32\r\n'

DEBUG:root:{'ts': 1598305523.8556132, 'data': {'humidity': 41.57, 'temperature': 23.44, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 232.0, 'blue': 830.0, 'ambient': 1113.0, 'green': 686.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.51|41.44|99.38|205|658|802|1040,-48,36\r\n'

DEBUG:root:{'ts': 1598305528.8890748, 'data': {'humidity': 41.44, 'temperature': 23.51, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 205.0, 'blue': 802.0, 'ambient': 1040.0, 'green': 658.0}}}

AWS IoT Core

The Raspberry Pi-based IoT gateway will be registered with AWS IoT Core. IoT Core allows users to connect devices quickly and securely to AWS.

Things

According to AWS, IoT Core can reliably scale to billions of devices and trillions of messages. Registered devices are referred to as things in AWS IoT Core. A thing is a representation of a specific device or logical entity. Information about a thing is stored in the registry as JSON data.

Below, we see an example of the Thing created by CloudFormation. The Thing, lora-iot-gateway-01, represents the physical IoT gateway. We have assigned the IoT gateway a Thing Type, LoRaIoTGateway, a Thing Group, LoRaIoTGateways, and a Thing Billing Group, IoTGateways.

In a real IoT environment, containing hundreds, thousands, even millions of IoT devices, gateways, and sensors, these classification mechanisms, Thing Type, Thing Group, and Thing Billing Group, will help to organize IoT assets.

Device Gateway and Message Broker

IoT Core provides a Device Gateway, which manages all active device connections. The Gateway currently supports MQTT, WebSockets, and HTTP 1.1 protocols. Behind the Message Gateway is a high-throughput pub/sub Message Broker, which securely transmits messages to and from all IoT devices and applications with low latency. Below, we see a typical AWS IoT Core architecture containing multiple Topics, Rules, and Actions.

AWS IoT Security

AWS IoT Core provides mutual authentication and encryption, ensuring all data is exchanged between AWS and the devices are secure by default. In the demonstration, all data is sent securely using Transport Layer Security (TLS) 1.2 with X.509 digital certificates on port 443. Below, we see an example of an X.509 certificate assigned to the Thing, lora-iot-gateway-01, which represents the physical IoT gateway. The X.509 certificate and the private key, generated using the AWS CLI, previously, are installed on the IoT gateway.

Authorization of the device to access any resource on AWS is controlled by AWS IoT Core Policies. These policies are similar to AWS IAM Policies. Below, we see an example of an AWS IoT Core Policy, LoRaDevicePolicy, which is assigned to the IoT gateway.

AWS IoT Core Rules

Once an MQTT message is received from the IoT gateway (a thing), we use AWS IoT Rules to send message data to an AWS IoT Analytics Channel. Rules give your devices the ability to interact with AWS services. Rules are analyzed, and Actions are performed based on the MQTT topic stream. Below, we see an example rule that forwards our messages to an IoT Analytics Channel.

Rule query statements are written in standard Structured Query Language (SQL). The datasource for the Rule query is an IoT Topic.

SELECT
data.device_id,
data.gateway_id,
data.temperature,
data.humidity,
data.pressure,
data.color.red,
data.color.green,
data.color.blue,
data.color.ambient,
ts,
Clientid () AS device,
parse_time ("yyyy-MM-dd'T'HH:mm:ss.SSSZ", timestamp(), "UTC") AS msg_received
FROM
"${IoTTopicName}"
view raw iot_rule.sql hosted with ❤ by GitHub

AWS IoT Analytics

AWS IoT Analytics is composed of five primary components: Channels, Pipelines, Data stores, Data sets, and Notebooks. These components enable you to collect, prepare, store, analyze, and visualize your IoT data.

Below, we see a typical AWS IoT Analytics architecture. IoT messages are received from AWS IoT Core, thought a Rule Action. Amazon QuickSight provides business intelligence, visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting.

IoT Analytics Channel

An AWS IoT Analytics Channel pulls messages or data into IoT Analytics from other AWS sources, such as Amazon S3, Amazon Kinesis, or Amazon IoT Core. Channels store data for IoT Analytics Pipelines. Both Channels and Data store support storing data in your own Amazon S3 bucket or an IoT Analytics service-managed S3 bucket. In the demonstration, we are using a service managed S3 bucket.

When creating a Channel, you also decide how long to retain the data. For the demonstration, we have set the data retention period for 21 days. Channels are generally not used for long term storage of data. Typically, you would only retain data in the Channel for the period you need to analyze. For long term storage of IoT message data, I recommend using an AWS IoT Core Rule to send a copy of the raw IoT data to Amazon S3, using a service such as Amazon Kinesis Data Firehose.

IoT Analytics Pipeline

An AWS IoT Analytics Pipeline consumes messages from one or more Channels. Pipelines transform, filter, and enrich the messages before storing them in IoT Analytics Data stores. A Pipeline is composed of an ordered list of activities. Logically, you must specify both a Channel (source) and a Datastore (destination) activity. Optionally, you may choose as many as 23 additional activities in the pipelineActivities array.

In our demonstration’s Pipeline, iot_analytics_pipeline, we have specified three additional activities, including DeviceRegistryEnrich, Filter, and Lambda. Other activity types include Math, SelectAttributes, RemoveAttributes, and AddAttributes.

The Filter activity ensures the sensor values are not Null or otherwise erroneous; if true, the message is dropped. The Lambda Pipeline activity executes an AWS Lambda function to transform the messages in the pipeline. Messages are sent in an event object to the Lambda. The message is modified, and the event object is returned to the activity.

The Python-based Lambda function easily handles typical IoT data transformation tasks, including converting the temperature from Celsius to Fahrenheit, pressure from kilopascals (kPa) to inches of Mercury (inHg), and 12-bit RGBA values to 8-bit color values (0–255). The Lambda function also rounds down all the values to between 0 and 2 decimal places of precision.

def lambda_handler(event, context):
for e in event:
e['temperature'] = round((e['temperature'] * 1.8) + 32, 2)
e['humidity'] = round(e['humidity'], 2)
e['pressure'] = round((e['pressure'] / 3.3864), 2)
e['red'] = int(round(e['red'] / (4097 / 255), 0))
e['green'] = int(round(e['green'] / (4097 / 255), 0))
e['blue'] = int(round(e['blue'] / (4097 / 255), 0))
e['ambient'] = int(round(e['ambient'] / (4097 / 255), 0))
return event

The demonstration’s Pipeline also enriches the IoT data with metadata from the IoT device’s AWS IoT Core Registry. The metadata includes additional information about the device that generated the IoT data, including the custom attributes such as LoRa transceiver manufacturer and model, and the IoT gateway manufacturer.

A notable feature of Pipelines is the ability to reprocess messages. If you make changes to the Pipeline, which often happens during the data preparation stage, you can reprocess any or all the IoT data in the associated Channel, and overwrite the IoT data in the Data set.

IoT Analytics Data store

An AWS IoT Analytics Data store stores prepared data from an AWS IoT Analytics Pipeline, in a fully-managed database. Both Channels and Data store support storing IoT data in your own Amazon S3 bucket or an IoT Analytics managed S3 bucket. In the demonstration, we are using a service-managed S3 bucket to store the IoT data in our Data store, iot_analytics_data_store.

IoT Analytics Data set

An AWS IoT Analytics Data set automatically provides regular, up-to-date insights for data analysts by querying a Data store using standard SQL. Periodic updates are implemented using a cron expression. For the demonstration, we are updating our Data set, iot_analytics_data_set, at a 15-minute interval. The time interval can be increased or reduced, depending on the desired ‘near real-time’ nature of the IoT data being analyzed.

Below, we see messages in the Result preview pane of the Data set. Note the SQL query used to obtain the messages, which queries the Data store. The Data store, as you will recall, contains the transformed messages from the Pipeline.

IoT Analytics Data sets also support sending content results, which are materialized views of your IoT Analytics data, to an Amazon S3 bucket.

The CloudFormation stack created an encrypted Amazon S3 Bucket. This bucket receives a copy of the messages from the IoT Analytics Data set whenever the cron expression runs the scheduled update.

IoT Analytics Notebook

An AWS IoT Analytics Notebook allows users to perform statistical analysis and machine learning on IoT Analytics Data sets using Jupyter Notebooks. The IoT Analytics Notebook service includes a set of notebook templates that contain AWS-authored machine learning models and visualizations. Notebook Instances can be linked to a GitHub or other source code repository. Notebooks created with IoT Analytics Notebook can also be accessed directly through Amazon SageMaker. For the demonstration, the Notebooks Instance is cloned from our project’s GitHub repository.

The repository contains a sample Jupyter Notebook, LoRa_IoT_Analytics_Demo.ipynb, based on the conda_python3 kernel. This preinstalled environment includes the default Anaconda installation and Python 3.

The Notebook uses pandas, matplotlib, and plotly to manipulate and visualize the sample IoT data stored in the IoT Analytics Data set.

The Notebook can be modified, and the changes pushed back to GitHub. You could easily fork the demonstration’s GitHub repository and modify the CloudFormation template to point to your source code repository.

Amazon QuickSight

Amazon QuickSight provides business intelligence (BI) and visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting. We can use Amazon QuickSight to visualize the IoT message data, stored in the IoT Analytics Data set.

Amazon QuickSight has both a Standard and an Enterprise Edition. AWS provides a detailed product comparison of each edition. For the post, I am demonstrating the Enterprise Edition, which includes additional features, such as ML Insights, hourly refreshes of SPICE (super-fast, parallel, in-memory, calculation engine), and theme customization.

Please be aware of the costs of Amazon QuickSight if you choose to follow along with this part of the demo. Although there is an Amazon QuickSight API, Amazon QuickSight is not automatically enabled or configured with CloudFormation or using the AWS CLI in this demonstration.

QuickSight Data Sets

Amazon QuickSight has a wide variety of data source options for creating Amazon QuickSight Data sets, including the ones shown below. Do not confuse Amazon QuickSight Data sets with IoT Analytics Data sets; they are two different service features.

For the demonstration, we will create an Amazon QuickSight Data set that will use our IoT Analytics Data set, iot_analytics_data_set.

Amazon QuickSight gives you the ability to view and modify QuickSight Data sets before visualizing. QuickSight even provides a wide variety of functions, enabling us to perform dynamic calculations on the field values. For this demonstration, we will leave the data unchanged since all transformations were already completed in the IoT Analytics Pipeline.

QuickSight Analysis

Using the QuickSight Data set, built from the IoT Analytics Data set as a data source, we create a QuickSight Analysis. The QuickSight Analysis console is shown below. An Analysis is primarily a collection of Visuals (aka Visual types). QuickSight provides several Visual types. Each visual is associated with a Data set. Data for the QuickSight Analysis or each visual within the Analysis can be filtered. For the demo, I have created a simple QuickSight Analysis, including a few typical QuickSight visuals.

QuickSight Dashboards

To share a QuickSight Analysis, we can create a QuickSight Dashboard. Below, we see a few views of the QuickSight Analysis, shown above, as a Dashboard. Although viewers of the Dashboard cannot edit the visuals, they can apply filtering and interactively drill-down into data in the Visuals.

Amazon QuickSight ML Insights

According to Amazon, ML Insights leverages AWS’s machine learning (ML) and natural language capabilities to gain deeper insights from data. QuickSight’s ML-powered Anomaly Detection continuously analyze data to discover anomalies and variations inside of the aggregates, giving you the insights to act when business changes occur. QuickSight’s ML-powered Forecasting can be used to predict your business metrics accurately, and perform interactive what-if analysis with point-and-click simplicity. QuickSight’s built-in algorithms make it easy for anyone to use ML that learns from your data patterns to provide you with accurate predictions based on historical trends.

Below, we see the ML Insights tab (left) in the demonstration’s QuickSight Analysis. Individually detected anomalies can be added to the QuickSight Analysis, like Visuals, and configured to tune the detection parameters. Observe the temperature, humidity, and barometric pressure anomalies, identified by ML Insights, based on their Anomaly Score, which is higher or lower, given a minimum delta of five percent. These anomalies accurately reflected an actual failure of the IoT device, caused by overheated during testing, which resulted in abnormal sensor readings.

Receiving the Messages on AWS

To confirm the IoT gateway is sending messages, we can use a packet analyzer, like tcpdump, on the IoT gateway. Running tcpdump on the IoT gateway, below, we see outbound encrypted MQTT messages being sent to AWS on port 443.

To confirm those messages are being received from the IoT gateway on AWS, we can use the AWS IoT Core Test feature and subscribe to the lora-iot-demo topic. We should see messages flowing in from the IoT gateway at approximately 5-second intervals.

The JSON payload structure of the incoming MQTT messages will look similar to the below example. The device_id is the unique id of the IoT device that transmitted the message using LoRaWAN. The gateway_id is the unique id of the IoT gateway that received the message using LoRaWAN and sent it to AWS. A single IoT gateway would usually manage messages from multiple IoT devices, each with a unique id.

{
"data": {
"color": {
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281
},
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6
},
"ts": 1598543131.9861386
}

The SQL query used by the AWS IoT Rule described earlier, transforms and flattens the nested JSON payload structure, before passing it to the AWS IoT Analytics Channel, as shown below.

{
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281,
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6,
"ts": 1598543131.9861386,
"msg_received": "2020-08-27T11:45:32.074+0000",
"device": "lora-iot-gateway-01"
}

We can measure the near real-time nature of the IoT data using the ts and msg_received data fields. The ts data field is date and time when the sensor reading occurred on the IoT device, while the msg_received data field is the date and time when the message was received on AWS. The delta between the two values is a measure of how near real-time the sensor readings are being streamed to the AWS IoT Analytics Channel. In the below example, the difference between ts (2020–08–27T11:45:31.986) and msg_received (2020–08–27T11:45:32.074) is 88 ms.

Final IoT Data Message Structure

Once the message payload passes through the AWS IoT Analytics Pipeline and lands in the AWS IoT Analytics Data set, its final data structure looks as follows. Note that the device’s attribute metadata has been added from the AWS IoT Core device registry. Regrettably, the metadata is not well-formatted JSON and will require additional transformation to be usable.

{
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"temperature": 74.48,
"humidity": 45.73,
"pressure": 29.13,
"red": 17,
"green": 42,
"blue": 40,
"ambient": 66,
"ts": 1598543131.9861386,
"device": "lora-iot-gateway-01",
"msg_received": "2020-08-27T15:45:32.024+0000",
"metadata": {
"defaultclientid": "lora-iot-gateway-01",
"thingname": "lora-iot-gateway-01",
"thingid": "017db4b8-7fca-4617-aa58-7125dd94ab36",
"thingarn": "arn:aws:iot:us-east-1:123456789012:thing/lora-iot-gateway-01",
"thingtypename": "LoRaIoTGateway",
"attributes": {
"loramfr": "REYAX",
"gatewaymfr": "RaspberryPiFoundation",
"loramodel": "RYLR896"
},
"version": "2",
"billinggroupname": "LoRaIoTGateways"
},
"__dt": "2020-08-27 00:00:00.000"
}

A set of sample messages is included in the GitHub project’s sample_messages directory.

Conclusion

In this post, we explored the use of the LoRa and LoRaWAN protocols to transmit environmental sensor data from an IoT device to an IoT gateway. Given its low energy consumption, long-distance transmission capabilities, and well-developed protocols, LoRaWAN is an ideal long-range wireless protocol for IoT devices. We then demonstrated how to use AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics to securely collect, analyze, and visualize streaming messages from the IoT device, in near real-time.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , ,

Leave a comment

Getting Started with IoT Analytics on AWS

Introduction

AWS defines AWS IoT as a set of managed services that enable ‘internet-connected devices to connect to the AWS Cloud and lets applications in the cloud interact with internet-connected devices.’ AWS IoT services span three categories: Device Software, Connectivity and Control, and Analytics. In this post, we will focus on AWS IoT Analytics, one of four services, which are part of the AWS IoT Analytics category. According to AWS, AWS IoT Analytics is a fully-managed IoT analytics service, designed specifically for IoT, which collects, pre-processes, enriches, stores, and analyzes IoT device data at scale.

Certainly, AWS IoT Analytics is not the only way to analyze the Internet of Things (IoT) or Industrial Internet of Things (IIoT) data on AWS. It is common to see Data Analyst teams using a more general AWS data analytics stack, composed of Amazon S3, Amazon Kinesis, AWS Glue, and Amazon Athena or Amazon Redshift and Redshift Spectrum, for analyzing IoT data. So then why choose AWS IoT Analytics over a more traditional AWS data analytics stack? According to AWS, IoT Analytics was purpose-built to manage the complexities of IoT and IIoT data on a petabyte-scale. According to AWS, IoT data frequently has significant gaps, corrupted messages, and false readings that must be cleaned up before analysis can occur. Additionally, IoT data must often be enriched and transformed to be meaningful. IoT Analytics can filter, transform, and enrich IoT data before storing it in a time-series data store for analysis.

In the following post, we will explore the use of AWS IoT Analytics to analyze environmental sensor data, in near real-time, from a series of IoT devices. To follow along with the post’s demonstration, there is an option to use sample data to simulate the IoT devices (see the ‘Simulating IoT Device Messages’ section of this post).

IoT Devices

In this post, we will explore IoT Analytics using IoT data generated from a series of custom-built environmental sensor arrays. Each breadboard-based sensor array is connected to a Raspberry Pi single-board computer (SBC), the popular, low cost, credit-card sized Linux computer. The IoT devices were purposely placed in physical locations that vary in temperature, humidity, and other environmental conditions.

rasppi

Each device includes the following sensors:

  1. MQ135 Air Quality Sensor Hazardous Gas Detection Sensor: CO, LPG, Smoke (link)
    (requires an MCP3008 – 8-Channel 10-Bit ADC w/ SPI Interface (link))
  2. DHT22/AM2302 Digital Temperature and Humidity Sensor (link)
  3. Onyehn IR Pyroelectric Infrared PIR Motion Sensor (link)
  4. Anmbest Light Intensity Detection Photosensitive Sensor (link)

rasppi_detail

AWS IoT Device SDK

Each Raspberry Pi device runs a custom Python script, sensor_collector_v2.py. The script uses the AWS IoT Device SDK for Python v2 to communicate with AWS. The script collects a total of seven different readings from the four sensors at a regular interval. Sensor readings include temperature, humidity, carbon monoxide (CO), liquid petroleum gas (LPG), smoke, light, and motion.

The script securely publishes the sensor readings, along with a device ID and timestamp, as a single message, to AWS using the ISO standard Message Queuing Telemetry Transport (MQTT) network protocol. Below is an example of an MQTT message payload, published by the collector script.

{
"data": {
"co": 0.006104480269226063,
"humidity": 55.099998474121094,
"light": true,
"lpg": 0.008895956948783413,
"motion": false,
"smoke": 0.023978358312270912,
"temp": 31.799999237060547
},
"device_id": "6e:81:c9:d4:9e:58",
"ts": 1594419195.292461
}

view raw
iot_message_01.json
hosted with ❤ by GitHub

As shown below, using tcpdump on the IoT device, the MQTT message payloads generated by the script average approximately 275 bytes. The complete MQTT messages average around 300 bytes.

screen_shot_2020-07-15_at_1_56_21_pm

AWS IoT Core

Each Raspberry Pi is registered with AWS IoT Core. IoT Core allows users to quickly and securely connect devices to AWS. According to AWS, IoT Core can reliably scale to billions of devices and trillions of messages. Registered devices are referred to as things in AWS IoT Core. A thing is a representation of a specific device or logical entity. Information about a thing is stored in the registry as JSON data.

IoT Core provides a Device Gateway, which manages all active device connections. The Gateway currently supports MQTT, WebSockets, and HTTP 1.1 protocols. Behind the Message Gateway is a high-throughput pub/sub Message Broker, which securely transmits messages to and from all IoT devices and applications with low latency. Below, we see a typical AWS IoT Core architecture.

AWS_IoT_Diagram_01_Ingest_blog

At a message frequency of five seconds, the three Raspberry Pi devices publish a total of roughly 50,000 IoT messages per day to AWS IoT Core.

monitoring

AWS IoT Security

AWS IoT Core provides mutual authentication and encryption, ensuring all data is exchanged between AWS and the devices are secure by default. In the demo, all data is sent securely using Transport Layer Security (TLS) 1.2 with X.509 digital certificates on port 443. Authorization of the device to access any resource on AWS is controlled by individual AWS IoT Core Policies, similar to AWS IAM Policies. Below, we see an example of an X.509 certificate, assigned to a registered device.

thing_cert

AWS IoT Core Rules

Once an MQTT message is received from an IoT device (a thing), we use AWS IoT Rules to send message data to an AWS IoT Analytics Channel. Rules give your devices the ability to interact with AWS services. Rules are written in standard Structured Query Language (SQL). Rules are analyzed, and Actions are performed based on the MQTT topic stream. Below, we see an example rule that forwards our messages to IoT Analytics, in addition to AWS IoT Events and Amazon Kinesis Data Firehose.

rule

Simulating IoT Device Messages

Building and configuring multiple Raspberry Pi-based sensor arrays, and registering the devices with AWS IoT Core would require a lot of work just for this post. Therefore, I have provided everything you need to simulate the three IoT devices, on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/aws-iot-analytics-demo.git

view raw
iot_github.sh
hosted with ❤ by GitHub

AWS CloudFormation

Use the CloudFormation template, iot-analytics.yaml, to create an IoT Analytics stack containing (17) resources, including the following.

  • (3) AWS IoT Things
  • (1) AWS IoT Core Topic Rule
  • (1) AWS IoT Analytics Channel, Pipeline, Data store, and Data set
  • (1) AWS Lambda and Lambda Permission
  • (1) Amazon S3 Bucket
  • (1) Amazon SageMaker Notebook Instance
  • (5) AWS IAM Roles

Please be aware of the costs involved with the AWS resources used in the CloudFormation template before continuing. To build the AWS CloudFormation stack, run the following AWS CLI command.

aws cloudformation create-stack \
–stack-name iot-analytics-demo \
–template-body file://cloudformation/iot-analytics.yaml \
–parameters ParameterKey=ProjectName,ParameterValue=iot-analytics-demo \
ParameterKey=IoTTopicName,ParameterValue=iot-device-data \
–capabilities CAPABILITY_NAMED_IAM

view raw
iot_cfn.sh
hosted with ❤ by GitHub

Below, we see a successful deployment of the IoT Analytics Demo CloudFormation Stack.

cfn_stack

Publishing Sample Messages

Once the CloudFormation stack is created successfully, use an included Python script, send_sample_messages.py, to send sample IoT data to an AWS IoT Topic, from your local machine. The script will use your AWS identity and credentials, instead of an actual IoT device registered with IoT Core. The IoT data will be intercepted by an IoT Topic Rule and redirected, using a Topic Rule Action, to the IoT Analytics Channel.

First, we will ensure the IoT stack is running correctly on AWS by sending a few test messages. Go to the AWS IoT Core Test tab. Subscribe to the iot-device-data topic.

screen_shot_2020-07-13_at_2_06_32_pm

Then, run the following command using the smaller data file, raw_data_small.json.

cd sample_data/
time python3 ./send_sample_messages.py \
-f raw_data_small.json -t iot-device-data

If successful, you should see the five messages appear in the Test tab, shown above. Example output from the script is shown below.

screen_shot_2020-07-15_at_10.30.58_pm

Then, run the second command using the larger data file, raw_data_large.json, containing 9,995 messages (a few hours worth of data). The command will take approximately 12 minutes to complete.

time python3 ./send_sample_messages.py \
-f raw_data_large.json -t iot-device-data

Once the second command completes successfully, your IoT Analytics Channel should contain 10,000 unique messages. There is an optional extra-large data file containing approximately 50,000 IoT messages (24 hours of IoT messages).

AWS IoT Analytics

AWS IoT Analytics is composed of five primary components: Channels, Pipelines, Data stores, Data sets, and Notebooks. These components enable you to collect, prepare, store, analyze, and visualize your IoT data.

iot_analytics

Below, we see a typical AWS IoT Analytics architecture. IoT messages are pulled from AWS IoT Core, thought a Rule Action. Amazon QuickSight provides business intelligence, visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting.

AWS_IoT_Diagram_02_IoT_Analytics_blog

IoT Analytics Channel

An AWS IoT Analytics Channel pulls messages or data into IoT Analytics from other AWS sources, such as Amazon S3, Amazon Kinesis, or Amazon IoT Core. Channels store data for IoT Analytics Pipelines. Both Channels and Data store support storing data in your own Amazon S3 bucket or in an IoT Analytics service-managed S3 bucket. In the demonstration, we are using a service managed S3 bucket.

When creating a Channel, you also decide how long to retain the data. For the demonstration, we have set the data retention period for 14 days. Channels are generally not used for long term storage of data. Typically, you would only retain data in the Channel for the time period you need to analyze. For long term storage of IoT message data, I recommend using an AWS IoT Core Rule to send a copy of the raw IoT data to Amazon S3, using a service such as Amazon Kinesis Data Firehose.

screen_shot_2020-07-13_at_3_03_09_pm

IoT Analytics Pipeline

An AWS IoT Analytics Pipeline consumes messages from one or more Channels. Pipelines transform, filter, and enrich the messages before storing them in IoT Analytics Data stores. A Pipeline is composed of an array of activities. Logically, you must specify both a Channel (source) and a Datastore (destination) activity. Optionally, you may choose as many as 23 additional activities in the pipelineActivities array.

In our demonstration’s Pipeline, iot_analytics_pipeline, we have specified five additional activities, including DeviceRegistryEnrich, Filter, Math, Lambda, and SelectAttributes. There are two additional Activity types we did not choose, RemoveAttributes and AddAttributes.

screen_shot_2020-07-14_at_3_11_01_pm

The demonstration’s Pipeline created by CloudFormation starts with messages from the demonstration’s Channel, iot_analytics_channel, similar to the following.

{
"co": 0.004782974313835918,
"device_id": "ae:c4:1d:34:1c:7b",
"device": "iot-device-01",
"humidity": 68.81000305175781,
"light": true,
"lpg": 0.007456714657976871,
"msg_received": "2020-07-13T19:44:58.690+0000",
"motion": false,
"smoke": 0.019858593777432054,
"temp": 19.200000762939453,
"ts": 1594496359.235107
}

view raw
iot_message_02.json
hosted with ❤ by GitHub

The demonstration’s Pipeline transforms the messages through a series of Pipeline Activities and then stores the resulting message in the demonstration’s Data store, iot_analytics_data_store. The resulting messages appear similar to the following.

{
"co": 0.0048,
"device": "iot-device-01",
"humidity": 68.81,
"light": true,
"lpg": 0.0075,
"metadata": "{defaultclientid=iot-device-01, thingname=iot-device-01, thingid=5de1c2af-14b4-49b5-b20b-b25cf251b01a, thingarn=arn:aws:iot:us-east-1:864887685992:thing/iot-device-01, thingtypename=null, attributes={installed=1594665292, latitude=37.4133144, longitude=-122.1513069}, version=2, billinggroupname=null}",
"msg_received": "2020-07-13T19:44:58.690+0000",
"motion": false,
"smoke": 0.0199,
"temp": 66.56,
"ts": 1594496359.235107
}

view raw
iot_message_03.json
hosted with ❤ by GitHub

In our demonstration, transformations to the messages include dropping the device_id attribute and converting the temp attribute value to Fahrenheit. In addition, the Lambda Activity rounds down the temp, humidity, co, lpg, and smoke attribute values to between 2–4 decimal places of precision.

screen_shot_2020-07-14_at_4_38_33_pm

The demonstration’s Pipeline also enriches the message with the metadata attribute, containing metadata from the IoT device’s AWS IoT Core Registry. The metadata includes additional information about the device that generated the message, including custom attributes we input, such as location (longitude and latitude) and the device’s installation date.

screen_shot_2020-07-14_at_3_15_27_pm

A significant feature of Pipelines is the ability to reprocess messages. If you make a change to the Pipeline, which often happens during the data preparation stage, you can reprocess any or all messages in the associated Channel, and overwrite the messages in the Data set.

screen_shot_2020-07-14_at_4_55_24_pm

IoT Analytics Data store

An AWS IoT Analytics Data store stores prepared data from an AWS IoT Analytics Pipeline, in a fully-managed database. Both Channels and Data store support storing data in your own Amazon S3 bucket or in an IoT Analytics managed S3 bucket. In the demonstration, we are using a service-managed S3 bucket to store messages in our Data store.

screen_shot_2020-07-13_at_3_03_22_pm

IoT Analytics Data set

An AWS IoT Analytics Data set automatically provides regular, up-to-date insights for data analysts by querying a Data store using standard SQL. Regular updates are provided through the use of a cron expression. For the demonstration, we are using a 15-minute interval.

Below, we see the sample messages in the Result preview pane of the Data set. These are the five test messages we sent to check the stack. Note the SQL query used to obtain the messages, which queries the Data store. The Data store, as you will recall, contains the transformed messages from the Pipeline.

screen_shot_2020-07-14_at_4_46_03_pm

IoT Analytics Data sets also support sending content results, which are materialized views of your IoT Analytics data, to an Amazon S3 bucket.

screen_shot_2020-07-14_at_3_32_18_pm

The CloudFormation stack contains an encrypted Amazon S3 Bucket. This bucket receives a copy of the messages from the IoT Analytics Data set whenever the scheduled update is run by the cron expression.

screen_shot_2020-07-14_at_3_38_40_pm

IoT Analytics Notebook

An AWS IoT Analytics Notebook allows users to perform statistical analysis and machine learning on IoT Analytics Data sets using Jupyter Notebooks. The IoT Analytics Notebook service includes a set of notebook templates that contain AWS-authored machine learning models and visualizations. Notebooks Instances can be linked to a GitHub or other source code repository. Notebooks created with IoT Analytics Notebook can also be accessed directly through Amazon SageMaker. For the demonstration, the Notebooks Instance is associated with the project’s GitHub repository.

screen_shot_2020-07-14_at_10_18_12_pm

The repository contains a sample Jupyter Notebook, IoT_Analytics_Demo_Notebook.ipynb, based on the conda_python3 kernel. This preinstalled environment includes the default Anaconda installation and Python 3. The Notebook uses pandas, matplotlib, and plotly to manipulate and visualize the sample IoT messages we published earlier and stored in the Data set.

screen_shot_2020-07-14_at_10_00_39_pm

screen_shot_2020-07-14_at_10_44_54_pm

screen_shot_2020-07-14_at_9_34_27_pm

screen_shot_2020-07-14_at_9_53_27_pm

Notebooks can be modified, and the changes pushed back to GitHub. You could easily fork a copy of my GitHub repository and modify the CloudFormation template, to include your own GitHub repository URL.

screen_shot_2020-07-14_at_10_14_51_pm

Amazon QuickSight

Amazon QuickSight provides business intelligence (BI) and visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting. We can use Amazon QuickSight to visualize the IoT message data, stored in the IoT Analytics Data set.

Amazon QuickSight has both a Standard and an Enterprise Edition. AWS provides a detailed product comparison of each edition. For the post, I am demonstrating the Enterprise Edition, which includes additional features, such as ML Insights, hourly refreshes of SPICE (super-fast, parallel, in-memory, calculation engine), and theme customization. Please be aware of the costs of Amazon QuickSight if you choose to follow along with this part of the demo. Amazon QuickSight is enabled or configured with the demonstration’s CloudFormation template.

QuickSight Data Sets

Amazon QuickSight has a wide variety of data source options for creating Amazon QuickSight Data sets, including the ones shown below. Do not confuse Amazon QuickSight Data sets with IoT Analytics Data sets. These are two different, yet similar, constructs.

screen_shot_2020-07-15_at_8.50.26_am

For the demonstration, we will create an Amazon QuickSight Data set that will use our IoT Analytics Data set as a data source.

screen_shot_2020-07-15_at_9_06_50_am

Amazon QuickSight gives you the ability to modify QuickSight Data sets. For the demonstration, I have added two additional fields, converting the boolean light and motion values of true and false to binary values of 0 or 1. I have also deselected two fields that I do not need for QuickSight Analysis.

screen_shot_2020-07-15_at_8.53.02_am

QuickSight provides a wide variety of functions, enabling us to perform dynamic calculations on the field values. Below, we see a new calculated field, light_dec, containing the original light field’s Boolean values converted to binary values. I am using a if...else formula to change the field’s value depending on the value in another field.

screen_shot_2020-07-15_at_8.53.16_am

QuickSight Analysis

Using the QuickSight Data set, built from the IoT Analytics Data set as a data source, we create a QuickSight Analysis. The QuickSight Analysis user interface is shown below. An Analysis is primarily a collection of Visuals (Visual types). QuickSight provides a number of Visual types. Each visual is associated with a Data set. Data for the QuickSight Analysis or for each individual visual can be filtered. For the demo, I have created a QuickSight Analysis, including several typical QuickSight Visuals.

screen_shot_2020-07-10_at_12_21_32_pm

QuickSight Dashboards

To share a QuickSight Analysis, we can create a QuickSight Dashboard. Below, we see a few views of the QuickSight Analysis, shown above, as a Dashboard. A viewer of the Dashboard cannot edit the visuals, though they can apply filtering and interactively drill-down into data in the Visuals.

screen_shot_2020-07-15_at_11_38_17_am

screen_shot_2020-07-15_at_11_40_17_am

screen_shot_2020-07-15_at_11_43_23_am

Geospatial Data

Amazon QuickSight understands geospatial data. If you recall, in the IoT Analytics Pipeline, we enriched the messages in the metadata from the device registry. The metadata attributes contained the device’s longitude and latitude. Quicksight will recognize those fields as geographic fields. In our QuickSight Analysis, we can visualize the geospatial data, using the geospatial chart (map) Visual type.

screen_shot_2020-07-10_at_12_22_12_pm

QuickSight Mobile App

Amazon QuickSight offers free iOS and Android versions of the Amazon QuickSight Mobile App. The mobile application makes it easy for registered QuickSight end-users to securely connect to QuickSight Dashboards, using their mobile devices. Below, we see two views of the same Dashboard, shown in the iOS version of the Amazon QuickSight Mobile App.

mobile_quicksight

Amazon QuickSight ML Insights

According to Amazon, ML Insights leverages AWS’s machine learning (ML) and natural language capabilities to gain deeper insights from data. QuickSight’s ML-powered Anomaly Detection continuously analyze data to discover anomalies and variations inside of the aggregates, giving you the insights to act when business changes occur. QuickSight’s ML-powered Forecasting can be used to accurately predict your business metrics, and perform interactive what-if analysis with point-and-click simplicity. QuickSight’s built-in algorithms make it easy for anyone to use ML that learns from your data patterns to provide you with accurate predictions based on historical trends.

Below, we see the ML Insights tab in the demonstration’s QuickSight Analysis. Individually detected anomalies can be added to the QuickSight Analysis, similar to Visuals, and configured to tune the detection parameters.

screen_shot_2020-07-15_at_12_04_21_pm

Below, we see an example of humidity anomalies across all devices, based on their Anomaly Score and are higher or lower with a minimum delta of five percent.

screen_shot_2020-07-15_at_12_24_12_pm

Cleaning Up

You are charged hourly for the SageMaker Notebook Instance. Do not forget to delete your CloudFormation stack when you are done with the demonstration. Note the Amazon S3 bucket will not be deleted; you must do this manually.

aws cloudformation delete-stack \
–stack-name iot-analytics-demo

view raw
iot_cfn_delete.sh
hosted with ❤ by GitHub

Conclusion

In this post, we demonstrated how to use AWS IoT Analytics to analyze and visualize streaming messages from multiple IoT devices, in near real-time. Combined with other AWS IoT analytics services, such as AWS IoT SiteWise, AWS IoT Events, and AWS IoT Things Graph, you can create a robust, full-featured IoT Analytics platform, capable of handling millions of industrial, commercial, and residential IoT devices, generating petabytes of data.

This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , , ,

Leave a comment

Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 1

Introduction

According to Wikipedia, data analysis is “a process of inspecting, cleansing, transforming, and modeling data with the goal of discovering useful information, informing conclusion, and supporting decision-making.” In this two-part post, we will explore how to get started with data analysis on AWS, using the serverless capabilities of Amazon Athena, AWS Glue, Amazon QuickSight, Amazon S3, and AWS Lambda. We will learn how to use these complementary services to transform, enrich, analyze, and visualize semi-structured data.

Data Analysis—discovering useful information, informing conclusion, and supporting decision-making. –Wikipedia

In part one, we will begin with raw, semi-structured data in multiple formats. We will discover how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We will build an S3-based data lake, and learn how AWS leverages open-source technologies, such as Presto, Apache Hive, and Apache Parquet. In part two, we will learn how to further analyze and visualize the data using Amazon QuickSight. Here’s a quick preview of what we will build in part one of the post.

Demonstration

In this demonstration, we will adopt the persona of a large, US-based electric energy provider. The energy provider has developed its next-generation Smart Electrical Monitoring Hub (Smart Hub). They have sold the Smart Hub to a large number of residential customers throughout the United States. The hypothetical Smart Hub wirelessly collects detailed electrical usage data from individual, smart electrical receptacles and electrical circuit meters, spread throughout the residence. Electrical usage data is encrypted and securely transmitted from the customer’s Smart Hub to the electric provider, who is running their business on AWS.

Customers are able to analyze their electrical usage with fine granularity, per device, and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.

screen_shot_2020-01-13_at_7_57_47_pm_v4.pngPreview of post’s data in Amazon QuickSight.

The original concept for the Smart Hub was developed as part of a multi-day training and hackathon, I recently attended with an AWSome group of AWS Solutions Architects in San Francisco. As a team, we developed the concept of the Smart Hub integrated with a real-time, serverless, streaming data architecture, leveraging AWS IoT Core, Amazon Kinesis, AWS Lambda, and Amazon DynamoDB.

SA_Team_PhotoFrom left: Bruno Giorgini, Mahalingam (‘Mahali’) Sivaprakasam, Gary Stafford, Amit Kumar Agrawal, and Manish Agarwal.

This post will focus on data analysis, as opposed to the real-time streaming aspect of data capture or how the data is persisted on AWS.

athena-glue-architecture-v2High-level AWS architecture diagram of the demonstration.

Featured Technologies

The following AWS services and open-source technologies are featured prominently in this post.

Athena-Glue-v2.png

Amazon S3-based Data Lake

Screen Shot 2020-01-02 at 5.09.05 PMAn Amazon S3-based Data Lake uses Amazon S3 as its primary storage platform. Amazon S3 provides an optimal foundation for a data lake because of its virtually unlimited scalability, from gigabytes to petabytes of content. Amazon S3 provides ‘11 nines’ (99.999999999%) durability. It has scalable performance, ease-of-use features, and native encryption and access control capabilities.

AWS Glue

Screen Shot 2020-01-02 at 5.11.37 PMAWS Glue is a fully managed extract, transform, and load (ETL) service to prepare and load data for analytics. AWS Glue discovers your data and stores the associated metadata (e.g., table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

AWS Glue Data Catalog

Screen Shot 2020-01-02 at 5.13.01 PM.pngThe AWS Glue Data Catalog is an Apache Hive Metastore compatible, central repository to store structural and operational metadata for data assets. For a given data set, store table definition, physical location, add business-relevant attributes, as well as track how the data has changed over time.

AWS Glue Crawler

Screen Shot 2020-01-02 at 5.14.57 PMAn AWS Glue Crawler connects to a data store, progresses through a prioritized list of classifiers to extract the schema of your data and other statistics, and then populates the Glue Data Catalog with this metadata. Crawlers can run periodically to detect the availability of new data as well as changes to existing data, including table definition changes. Crawlers automatically add new tables, new partitions to an existing table, and new versions of table definitions. You can even customize Glue Crawlers to classify your own file types.

AWS Glue ETL Job

Screen Shot 2020-01-02 at 5.11.37 PMAn AWS Glue ETL Job is the business logic that performs extract, transform, and load (ETL) work in AWS Glue. When you start a job, AWS Glue runs a script that extracts data from sources, transforms the data, and loads it into targets. AWS Glue generates a PySpark or Scala script, which runs on Apache Spark.

Amazon Athena

Screen Shot 2020-01-02 at 5.17.49 PMAmazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena supports and works with a variety of standard data formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet. Athena is integrated, out-of-the-box, with AWS Glue Data Catalog. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

The underlying technology behind Amazon Athena is Presto, the open-source distributed SQL query engine for big data, created by Facebook. According to the AWS, the Athena query engine is based on Presto 0.172 (released April 9, 2017). In addition to Presto, Athena uses Apache Hive to define tables.

Amazon QuickSight

Screen Shot 2020-01-02 at 5.18.40 PMAmazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that can then be accessed from any device, and embedded into your applications, portals, and websites.

AWS Lambda

Screen Shot 2020-01-02 at 5.25.57 PMAWS Lambda automatically runs code without requiring the provisioning or management servers. AWS Lambda automatically scales applications by running code in response to triggers. Lambda code runs in parallel. With AWS Lambda, you are charged for every 100ms your code executes and the number of times your code is triggered. You pay only for the compute time you consume.

Smart Hub Data

Everything in this post revolves around data. For the post’s demonstration, we will start with four categories of raw, synthetic data. Those data categories include Smart Hub electrical usage data, Smart Hub sensor mapping data, Smart Hub residential locations data, and electrical rate data. To demonstrate the capabilities of AWS Glue to handle multiple data formats, the four categories of raw data consist of three distinct file formats: XML, JSON, and CSV. I have attempted to incorporate as many ‘real-world’ complexities into the data without losing focus on the main subject of the post. The sample datasets are intentionally small to keep your AWS costs to a minimum for the demonstration.

To further reduce costs, we will use a variety of data partitioning schemes. According to AWS, by partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost. We have very little data for the demonstration, in which case partitioning may negatively impact query performance. However, in a ‘real-world’ scenario, there would be millions of potential residential customers generating terabytes of data. In that case, data partitioning would be essential for both cost and performance.

Smart Hub Electrical Usage Data

The Smart Hub’s time-series electrical usage data is collected from the customer’s Smart Hub. In the demonstration’s sample electrical usage data, each row represents a completely arbitrary five-minute time interval. There are a total of ten electrical sensors whose electrical usage in kilowatt-hours (kW) is recorded and transmitted. Each Smart Hub records and transmits electrical usage for 10 device sensors, 288 times per day (24 hr / 5 min intervals), for a total of 2,880 data points per day, per Smart Hub. There are two days worth of usage data for the demonstration, for a total of 5,760 data points. The data is stored in JSON Lines format. The usage data will be partitioned in the Amazon S3-based data lake by date (e.g., ‘dt=2019-12-21’).

{"loc_id":"b6a8d42425fde548","ts":1576915200,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04167}}
{"loc_id":"b6a8d42425fde548","ts":1576915500,"data":{"s_01":0,"s_02":0.00552,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04147}}
{"loc_id":"b6a8d42425fde548","ts":1576915800,"data":{"s_01":0.29267,"s_02":0.00642,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04207}}
{"loc_id":"b6a8d42425fde548","ts":1576916100,"data":{"s_01":0.29207,"s_02":0.00592,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04137}}
{"loc_id":"b6a8d42425fde548","ts":1576916400,"data":{"s_01":0.29217,"s_02":0.00622,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04157}}
{"loc_id":"b6a8d42425fde548","ts":1576916700,"data":{"s_01":0,"s_02":0.00562,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04197}}
{"loc_id":"b6a8d42425fde548","ts":1576917000,"data":{"s_01":0,"s_02":0.00512,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04257}}
{"loc_id":"b6a8d42425fde548","ts":1576917300,"data":{"s_01":0,"s_02":0.00522,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04177}}
{"loc_id":"b6a8d42425fde548","ts":1576917600,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04267}}
{"loc_id":"b6a8d42425fde548","ts":1576917900,"data":{"s_01":0,"s_02":0.00612,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04237}}

view raw
smart_data.json
hosted with ❤ by GitHub

Note the electrical usage data contains nested data. The electrical usage for each of the ten sensors is contained in a JSON array, within each time series entry. The array contains ten numeric values of type, double.

{
"loc_id": "b6a8d42425fde548",
"ts": 1576916400,
"data": {
"s_01": 0.29217,
"s_02": 0.00622,
"s_03": 0,
"s_04": 0,
"s_05": 0,
"s_06": 0,
"s_07": 0,
"s_08": 0,
"s_09": 0,
"s_10": 0.04157
}
}

Real data is often complex and deeply nested. Later in the post, we will see that AWS Glue can map many common data types, including nested data objects, as illustrated below.

screen_shot_2020-01-05_at_7_46_19_am

Smart Hub Sensor Mappings

The Smart Hub sensor mappings data maps a sensor column in the usage data (e.g., ‘s_01’ to the corresponding actual device (e.g., ‘Central Air Conditioner’). The data contains the device location, wattage, and the last time the record was modified. The data is also stored in JSON Lines format. The sensor mappings data will be partitioned in the Amazon S3-based data lake by the state of the residence (e.g., ‘state=or’ for Oregon).

{"loc_id":"b6a8d42425fde548","id":"s_01","description":"Central Air Conditioner","location":"N/A","watts":3500,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_02","description":"Ceiling Fan","location":"Master Bedroom","watts":65,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_03","description":"Clothes Dryer","location":"Basement","watts":5000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_04","description":"Clothes Washer","location":"Basement","watts":1800,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_05","description":"Dishwasher","location":"Kitchen","watts":900,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_06","description":"Flat Screen TV","location":"Living Room","watts":120,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_07","description":"Microwave Oven","location":"Kitchen","watts":1000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_08","description":"Coffee Maker","location":"Kitchen","watts":900,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_09","description":"Hair Dryer","location":"Master Bathroom","watts":2000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_10","description":"Refrigerator","location":"Kitchen","watts":500,"last_modified":1559347200}

view raw
sensor_mappings.json
hosted with ❤ by GitHub

Smart Hub Locations

The Smart Hub locations data contains the geospatial coordinates, home address, and timezone for each residential Smart Hub. The data is stored in CSV format. The data for the four cities included in this demonstration originated from OpenAddresses, ‘the free and open global address collection.’ There are approximately 4k location records. The location data will be partitioned in the Amazon S3-based data lake by the state of the residence where the Smart Hub is installed (e.g., ‘state=or’ for Oregon).


lon lat number street unit city district region postcode id hash tz
-122.8077278 45.4715614 6635 SW JUNIPER TER 97008 b6a8d42425fde548 America/Los_Angeles
-122.8356634 45.4385864 11225 SW PINTAIL LOOP 97007 08ae3df798df8b90 America/Los_Angeles
-122.8252379 45.4481709 9930 SW WRANGLER PL 97008 1c7e1f7df752663e America/Los_Angeles
-122.8354211 45.4535977 9174 SW PLATINUM PL 97007 b364854408ee431e America/Los_Angeles
-122.8315771 45.4949449 15040 SW MILLIKAN WAY # 233 97003 0e97796ba31ba3b4 America/Los_Angeles
-122.7950339 45.4470259 10006 SW CONESTOGA DR # 113 97008 2b5307be5bfeb026 America/Los_Angeles
-122.8072836 45.4908594 12600 SW CRESCENT ST # 126 97005 4d74167f00f63f50 America/Los_Angeles
-122.8211801 45.4689303 7100 SW 140TH PL 97008 c5568631f0b9de9c America/Los_Angeles
-122.831154 45.4317057 15050 SW MALLARD DR # 101 97007 dbd1321080ce9682 America/Los_Angeles
-122.8162856 45.4442878 10460 SW 136TH PL 97008 008faab8a9a3e519 America/Los_Angeles

Electrical Rates

Lastly, the electrical rate data contains the cost of electricity. In this demonstration, the assumption is that the rate varies by state, by month, and by the hour of the day. The data is stored in XML, a data export format still common to older, legacy systems. The electrical rate data will not be partitioned in the Amazon S3-based data lake.

<?xml version="1.0" encoding="UTF-8"?>
<root>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>19:00:00</from>
<to>19:59:59</to>
<type>peak</type>
<rate>12.623</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>20:00:00</from>
<to>20:59:59</to>
<type>partial-peak</type>
<rate>7.232</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>21:00:00</from>
<to>21:59:59</to>
<type>partial-peak</type>
<rate>7.232</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>22:00:00</from>
<to>22:59:59</to>
<type>off-peak</type>
<rate>4.209</rate>
</row>
</root>

view raw
rates.xml
hosted with ❤ by GitHub

Data Analysis Process

Due to the number of steps involved in the data analysis process in the demonstration, I have divided the process into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).

athena-glue-0.pngFull data analysis workflow diagram (click to enlarge…)

Raw Data Ingestion

In the Raw Data Ingestion stage, semi-structured CSV-, XML-, and JSON-format data files are copied to a secure Amazon Simple Storage Service (S3) bucket. Within the bucket, data files are organized into folders based on their physical data structure (schema). Due to the potentially unlimited number of data files, files are further organized (partitioned) into subfolders. Organizational strategies for data files are based on date, time, geographic location, customer id, or other common data characteristics.

This collection of semi-structured data files, S3 buckets, and partitions form what is referred to as a Data Lake. According to AWS, a data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.

A series of AWS Glue Crawlers process the raw CSV-, XML-, and JSON-format files, extracting metadata, and creating table definitions in the AWS Glue Data Catalog. According to AWS, an AWS Glue Data Catalog contains metadata tables, where each table specifies a single data store.

Athena-Glue-1

Data Transformation

In the Data Transformation stage, the raw data in the previous stage is transformed. Data transformation may include both modifying the data and changing the data format. Data modifications include data cleansing, re-casting data types, changing date formats, field-level computations, and field concatenation.

The data is then converted from CSV-, XML-, and JSON-format to Apache Parquet format and written back to the Amazon S3-based data lake. Apache Parquet is a compressed, efficient columnar storage format. Amazon Athena, like many Cloud-based services, charges you by the amount of data scanned per query. Hence, using data partitioning, bucketing, compression, and columnar storage formats, like Parquet, will reduce query cost.

Lastly, the transformed Parquet-format data is cataloged to new tables, alongside the raw CSV, XML, and JSON data, in the Glue Data Catalog.

Athena-Glue-2

Data Enrichment

According to ScienceDirect, data enrichment or augmentation is the process of enhancing existing information by supplementing missing or incomplete data. Typically, data enrichment is achieved by using external data sources, but that is not always the case.

Data Enrichment—the process of enhancing existing information by supplementing missing or incomplete data. –ScienceDirect

In the Data Enrichment stage, the Parquet-format Smart Hub usage data is augmented with related data from the three other data sources: sensor mappings, locations, and electrical rates. The customer’s Smart Hub usage data is enriched with the customer’s device types, the customer’s timezone, and customer’s electricity cost per monitored period based on the customer’s geographic location and time of day.

Athena-Glue-3a

Once the data is enriched, it is converted to Parquet and optimized for query performance, stored in the data lake, and cataloged. At this point, the original CSV-, XML-, and JSON-format raw data files, the transformed Parquet-format data files, and the Parquet-format enriched data files are all stored in the Amazon S3-based data lake and cataloged in the Glue Data Catalog.

Athena-Glue-3b

Data Visualization

In the final Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for visualization and Business Intelligence, which integrate with Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo. In this demonstration, we will focus on Amazon QuickSight.

Athena-Glue-4

Getting Started

Requirements

To follow along with the demonstration, you will need an AWS Account and a current version of the AWS CLI. To get the most from the demonstration, you should also have Python 3 and jq installed in your work environment.

Source Code

All source code for this post can be found on GitHub. Use the following command to clone a copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/athena-glue-quicksight-demo.git

view raw
git_clone.sh
hosted with ❤ by GitHub

Source code samples in this post are displayed as GitHub Gists, which will not display correctly on some mobile and social media browsers.

TL;DR?

Just want the jump in without reading the instructions? All the AWS CLI commands, found within the post, are consolidated in the GitHub project’s README file.

CloudFormation Stack

To start, create the ‘smart-hub-athena-glue-stack’ CloudFormation stack using the smart-hub-athena-glue.yml template. The template will create (3) Amazon S3 buckets, (1) AWS Glue Data Catalog Database, (5) Data Catalog Database Tables, (6) AWS Glue Crawlers, (1) AWS Glue ETL Job, and (1) IAM Service Role for AWS Glue.

Make sure to change the DATA_BUCKET, SCRIPT_BUCKET, and LOG_BUCKET variables, first, to your own unique S3 bucket names. I always suggest using the standard AWS 3-part convention of 1) descriptive name, 2) AWS Account ID or Account Alias, and 3) AWS Region, to name your bucket (e.g. ‘smart-hub-data-123456789012-us-east-1’).

# *** CHANGE ME ***
BUCKET_SUFFIX="123456789012-us-east-1"
DATA_BUCKET="smart-hub-data-${BUCKET_SUFFIX}"
SCRIPT_BUCKET="smart-hub-scripts-${BUCKET_SUFFIX}"
LOG_BUCKET="smart-hub-logs-${BUCKET_SUFFIX}"
aws cloudformation create-stack \
–stack-name smart-hub-athena-glue-stack \
–template-body file://cloudformation/smart-hub-athena-glue.yml \
–parameters ParameterKey=DataBucketName,ParameterValue=${DATA_BUCKET} \
ParameterKey=ScriptBucketName,ParameterValue=${SCRIPT_BUCKET} \
ParameterKey=LogBucketName,ParameterValue=${LOG_BUCKET} \
–capabilities CAPABILITY_NAMED_IAM

view raw
step1-2.sh
hosted with ❤ by GitHub

Raw Data Files

Next, copy the raw CSV-, XML-, and JSON-format data files from the local project to the DATA_BUCKET S3 bucket (steps 1a-1b in workflow diagram). These files represent the beginnings of the S3-based data lake. Each category of data uses a different strategy for organizing and separating the files. Note the use of the Apache Hive-style partitions (e.g., /smart_hub_data_json/dt=2019-12-21). As discussed earlier, the assumption is that the actual, large volume of data in the data lake would necessitate using partitioning to improve query performance.

# location data
aws s3 cp data/locations/denver_co_1576656000.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=co/
aws s3 cp data/locations/palo_alto_ca_1576742400.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ca/
aws s3 cp data/locations/portland_metro_or_1576742400.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=or/
aws s3 cp data/locations/stamford_ct_1576569600.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ct/
# sensor mapping data
aws s3 cp data/mappings/ \
s3://${DATA_BUCKET}/sensor_mappings_json/state=or/ \
–recursive
# electrical usage data
aws s3 cp data/usage/2019-12-21/ \
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-21/ \
–recursive
aws s3 cp data/usage/2019-12-22/ \
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-22/ \
–recursive
# electricity rates data
aws s3 cp data/rates/ \
s3://${DATA_BUCKET}/electricity_rates_xml/ \
–recursive

view raw
step3.sh
hosted with ❤ by GitHub

Confirm the contents of the DATA_BUCKET S3 bucket with the following command.

aws s3 ls s3://${DATA_BUCKET}/ \
–recursive –human-readable –summarize

view raw
step3.sh
hosted with ❤ by GitHub

There should be a total of (14) raw data files in the DATA_BUCKET S3 bucket.

2020-01-04 14:39:51 20.0 KiB electricity_rates_xml/2019_12_1575270000.xml
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/08ae3df798df8b90_1550908800.json
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/1c7e1f7df752663e_1559347200.json
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/b6a8d42425fde548_1568314800.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/08ae3df798df8b90_1576915200.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/1c7e1f7df752663e_1576915200.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/b6a8d42425fde548_1576915200.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/08ae3df798df8b90_15770016000.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/1c7e1f7df752663e_1577001600.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/b6a8d42425fde548_15770016001.json
2020-01-04 14:39:39 89.7 KiB smart_hub_locations_csv/state=ca/palo_alto_ca_1576742400.csv
2020-01-04 14:39:37 84.2 KiB smart_hub_locations_csv/state=co/denver_co_1576656000.csv
2020-01-04 14:39:44 78.6 KiB smart_hub_locations_csv/state=ct/stamford_ct_1576569600.csv
2020-01-04 14:39:42 91.6 KiB smart_hub_locations_csv/state=or/portland_metro_or_1576742400.csv
Total Objects: 14
Total Size: 636.7 KiB

view raw
raw_data_files.txt
hosted with ❤ by GitHub

Lambda Functions

Next, package the (5) Python3.8-based AWS Lambda functions for deployment.

pushd lambdas/athena-json-to-parquet-data || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-csv-to-parquet-locations || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-json-to-parquet-mappings || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-complex-etl-query || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-parquet-to-parquet-elt-data || exit
zip -r package.zip index.py
popd || exit

view raw
step4.sh
hosted with ❤ by GitHub

Copy the five Lambda packages to the SCRIPT_BUCKET S3 bucket. The ZIP archive Lambda packages are accessed by the second CloudFormation stack, smart-hub-serverless. This CloudFormation stack, which creates the Lambda functions, will fail to deploy if the packages are not found in the SCRIPT_BUCKET S3 bucket.

I have chosen to place the packages in a different S3 bucket then the raw data files. In a real production environment, these two types of files would be separated, minimally, into separate buckets for security. Remember, only data should go into the data lake.

aws s3 cp lambdas/athena-json-to-parquet-data/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_data/
aws s3 cp lambdas/athena-csv-to-parquet-locations/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_csv_to_parquet_locations/
aws s3 cp lambdas/athena-json-to-parquet-mappings/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_mappings/
aws s3 cp lambdas/athena-complex-etl-query/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_complex_etl_query/
aws s3 cp lambdas/athena-parquet-to-parquet-elt-data/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_parquet_to_parquet_elt_data/

view raw
step5.sh
hosted with ❤ by GitHub

Create the second ‘smart-hub-lambda-stack’ CloudFormation stack using the smart-hub-lambda.yml CloudFormation template. The template will create (5) AWS Lambda functions and (1) Lambda execution IAM Service Role.

aws cloudformation create-stack \
–stack-name smart-hub-lambda-stack \
–template-body file://cloudformation/smart-hub-lambda.yml \
–capabilities CAPABILITY_NAMED_IAM

view raw
step6.sh
hosted with ❤ by GitHub

At this point, we have deployed all of the AWS resources required for the demonstration using CloudFormation. We have also copied all of the raw CSV-, XML-, and JSON-format data files in the Amazon S3-based data lake.

AWS Glue Crawlers

If you recall, we created five tables in the Glue Data Catalog database as part of the CloudFormation stack. One table for each of the four raw data types and one table to hold temporary ELT data later in the demonstration. To confirm the five tables were created in the Glue Data Catalog database, use the Glue Data Catalog Console, or run the following AWS CLI / jq command.