Posts Tagged AWS

Employing Amazon Macie to Discover and Protect Sensitive Data in your Amazon S3-based Data Lake


Working with Analytics customers, it’s not uncommon to see data lakes with a dozen or more discrete data sources. Data typically originates from sources both internal and external to the customer. Internal data may come from multiple teams, departments, divisions, and enterprise systems. External data comes from vendors, partners, public sources, and subscriptions to licensed data sources. The volume, velocity, variety, veracity, and method of delivery vary across the data sources. All this data is being fed into data lakes for purposes such as analytics, business intelligence, and machine learning.

Given the growing volumes of incoming data and variations amongst data sources, it is increasingly complex, expensive, and time-consuming for organizations to ensure compliance with relevant laws, policies, and regulations. Regulations that impact how data is handled in a data lake include the Organizations Health Insurance Portability and Accountability Act (HIPAA), General Data Privacy Regulation (GDPR), Payment Card Industry Data Security Standard (PCI DSS), California Consumer Privacy Act (CCPA), and the Federal Information Security Management Act (FISMA).

Data Lake

AWS defines a data lake as a centralized repository that allows you to store all your structured and unstructured data at any scale. Once in the data lake, you run different types of analytics — from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

Data in a data lake is regularly organized or separated by its stage in the analytics process. Incoming data is often referred to as raw data. Data is then processed — cleansed, filtered, enriched, and tokenized if necessary. Lastly, the data is analyzed and aggregated, and the results are written back to the data lake. The analyzed and aggregated data is used to build business intelligence dashboards and reports, machine learning models, and is delivered to downstream or external systems. The different categories of data — raw, processed, and aggregated, are frequently referred to as bronze, silver, and gold, a reference to their overall data quality or value.

Protecting the Data Lake

Imagine you’ve received a large volume of data from an external data source. The incoming data is cleansed, filtered, and enriched. The data is re-formatted, partitioned, compressed for analytical efficiency, and written back to the data lake. Your analytics pipelines run complex and time-consuming queries against the data. Unfortunately, while building reports for a set of stakeholders, you realize that the original data accidentally included credit card information and other sensitive information about your customers. In addition to being out of compliance, you have the wasted time and expense of the initial data processing, as well as the extra time and expense to replace and re-process the data. The solution — Amazon Macie.

Amazon Macie

According to AWS, Amazon Macie is a fully managed data security and data privacy service that uses machine learning and pattern matching to discover and protect your sensitive data stored in Amazon Simple Storage Service (Amazon S3). Macie’s alerts, or findings, can be searched, filtered, and sent to Amazon EventBridge, formerly called Amazon CloudWatch Events, for easy integration with existing workflow or event management systems, or to be used in combination with AWS services, such as AWS Step Functions or Amazon Managed Workflows for Apache Airflow (MWAA) to take automated remediation actions.

Amazon Macie’s Summary view

Data Discovery and Protection

In this post, we will deploy an automated data inspection workflow to examine sample data in an S3-based data lake. Amazon Macie will examine data files uploaded to an encrypted S3 bucket. If sensitive data is discovered within the files, the files will be moved to an encrypted isolation bucket for further investigation. Email and SMS text alerts will be sent. This workflow will leverage Amazon EventBridge, Amazon Simple Notification Service (Amazon SNS), AWS Lambda, and AWS Systems Manager Parameter Store.

Macie data inspection workflow architecture

Source Code

Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \

AWS resources for this post can be deployed using AWS Cloud​Formation. To follow along, you will need recent versions of Python 3, Boto3, and the AWS CLI version 2, installed.

Sample Data

We will use synthetic patient data, freely available from the MITRE Corporation. The data was generated by Synthea, MITRE’s open-source, synthetic patient generator that models the medical history of synthetic patients. Synthea data is exported in a variety of data standards, including HL7 FHIR, C-CDA, and CSV. We will use CSV-format data files for this post. Download and unzip the CSV files from the Synthea website.

wget "${REMOTE_FILE}"
unzip -j "${REMOTE_FILE}" -d synthea_data/

The sixteen CSV data files contain a total of 471,852 rows of data, including column headers.

> wc -l *.csv

      598 allergies.csv
    3,484 careplans.csv
    8,377 conditions.csv
       79 devices.csv
   53,347 encounters.csv
      856 imaging_studies.csv
   15,479 immunizations.csv
   42,990 medications.csv
  299,698 observations.csv
    1,120 organizations.csv
    1,172 patients.csv
    3,802 payer_transitions.csv
       11 payers.csv
   34,982 procedures.csv
    5,856 providers.csv
        1 supplies.csv
  471,852 total

Amazon Macie Custom Data Identifier

To demonstrate some of the advanced features of Amazon Macie, we will use three Custom Data Identifiers. According to Macie’s documentation, a custom data identifier is a set of criteria that you define that reflects your organization’s particular proprietary data — for example, employee IDs, customer account numbers, or internal data classifications. We will create three custom data identifiers to detect the specific Synthea-format Patient ID, US driver number, and US passport number columns.

Post’s three custom data identifiers

The custom data identifiers in this post use a combination of regular expressions (regex) and keywords. The identifiers are designed to work with structured data, such as CSV files. Macie reports text that matches the regex pattern if any of these keywords are in the name of the column or field that stores the text, or if the text is within the maximum match distance of one of these words in a field value. Macie supports a subset of the regex pattern syntax provided by the Perl Compatible Regular Expressions (PCRE) library.

Patient ID custom data identifier console

Enable Macie

Before creating a CloudFormation stack with this demonstration’s resources, you will need to enable Amazon Macie from the AWS Management Console, or use the macie2 API and the AWS CLI with the enable-macie command.

aws macie2 enable-macie

Macie can also be enabled for your multi-account AWS Organization. The enable-organization-admin-account command designates an account as the delegated Amazon Macie administrator account for an AWS organization. For more information, see Managing multiple accounts in Amazon Macie.

aws macie2 enable-organization-admin-account \
--admin-account-id ${AWS_ACCOUNT}

CloudFormation Stack

To create the CloudFormation stack with the supplied template, cloudformation/macie_demo.yml, run the following AWS CLI command. You will need to include an email address and phone number as input parameters. These parameter values will be used to send email and text alerts when Macie produces a sensitive data finding.

Please make sure you understand all the potential cost and security implications of creating the CloudFormation stack before continuing.


aws cloudformation create-stack \
--stack-name macie-demo \
--template-body file://cloudformation/macie_demo.yml \
--parameters ParameterKey=SNSTopicEndpointSms,ParameterValue=${SNS_PHONE} \
ParameterKey=SNSTopicEndpointEmail,ParameterValue=${SNS_EMAIL} \

As shown in the AWS CloudFormation console, the new macie-demo stack will contain twenty-one AWS resources.

CloudFormation stack successfully created

Upload Data

Next, with the stack deployed, upload the CSV format data files to the encrypted S3 bucket, representing your data lake. The target S3 bucket has the following naming convention, synthea-data-<aws_account_id>-<region>. You can retrieve the two new bucket names from AWS Systems Manager Parameter Store, which were written there by CloudFormation, using the ssm API.

aws ssm get-parameters-by-path \
--path /macie_demo/ \
--query 'Parameters[*].Value'

Use the following ssm and s3 API commands to upload the data files.

DATA_BUCKET=$(aws ssm get-parameter \
--name /macie_demo/patient_data_bucket \
--query 'Parameter.Value')
aws s3 cp synthea_data/ \
    "s3://$(eval echo ${DATA_BUCKET})/patient_data/" --recursive

You should end up with sixteen CSV files in the S3 bucket, totaling approximately 82.3 MB.

Synthea patient data files uploaded to in S3

Sensitive Data Discovery Jobs

With the CloudFormation stack created and the patient data files uploaded, we will create two sensitive data discovery jobs. These jobs will scan the contents of the encrypted S3 bucket for sensitive data and report the findings. According to the documentation, you can configure a sensitive data discovery job to run only once for on-demand analysis and assessment, or on a recurring basis for periodic analysis, assessment, and monitoring. For this demonstration, we will create a one-time sensitive data discovery job using the AWS CLI. We will also create a recurring sensitive data discovery job using the AWS SDK for Python (Boto3). Both jobs can also be created from within Macie’s Jobs console.

Creating a new job Macie’s Jobs console

For both sensitive data discovery jobs, we will include the three custom data identifiers. Each of the custom data identifiers has a unique ID. We will need all three IDs to create the two sensitive data discovery jobs. You can use the AWS CLI and the macie2 API to retrieve the values.

aws macie2 list-custom-data-identifiers --query 'items[*].id'

Next, modify the job_specs/macie_job_specs_1x.json file, adding the three custom data identifier IDs. Also, update your AWS account ID and S3 bucket name (lines 3–5, 12, and 14). Note that since all the patient data files are in CSV format, we will limit our inspection to only files with a csv file extension (lines 18–33).

"customDataIdentifierIds": [
"description": "Review Synthea patient data (1x)",
"jobType": "ONE_TIME",
"s3JobDefinition": {
"bucketDefinitions": [
"accountId": "111222333444",
"buckets": [
"scoping": {
"includes": {
"and": [
"simpleScopeTerm": {
"comparator": "EQ",
"values": [
"tags": {
"KeyName": "Project",
"KeyValue": "Amazon Macie Demo"

The above JSON template was generated using the standard AWS CLI generate-cli-skeleton command.

aws macie2 create-classification-job --generate-cli-skeleton

To create a one-time sensitive data discovery job using the above JSON template, run the following AWS CLI command. The unique job name will be dynamically generated based on the current time.

aws macie2 create-classification-job \
--name $(echo "SyntheaPatientData_${EPOCHSECONDS}") \
--cli-input-json file://job_specs/macie_job_specs_1x.json

In the Amazon Macie Jobs console, we can see a one-time sensitive data discovery job running. With a sampling depth of 100, the job will take several minutes to run. The samplingPercentage job property can be adjusted to scan any percentage of the data. If this value is less than 100, Macie selects the objects to analyze at random, up to the specified percentage and analyzes all the data in those objects.

One-time sensitive data discovery job running

Once the job is completed, the findings will be available in Macie’s Findings console. Using the three custom data identifiers in addition to Macie’s managed data identifiers, there should be a total of fifteen findings from the Synthea patient data files in S3. There should be six High severity findings and nine Medium severity findings. Of those, three are of a Personal finding type, seven of a Custom Identifier finding type, and five of a Multiple finding type, having both Personal and Custom Identifier finding types.

Macie’s Findings console displaying the results of the one-time job

Isolating High Severity Findings

The data inspection workflow we have deployed uses an AWS Lambda function, macie-object-mover, to isolate all data files with High severity findings to a second S3 bucket. The offending files are copied to the isolation bucket and deleted from the source bucket.

#!/usr/bin/env python3
# Purpose: Lambda function that moves S3 objects flagged by Macie
# Author: Gary A. Stafford (March 2021)
import json
import logging
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
s3_client = boto3.client('s3')
def lambda_handler(event, context):'event: {json.dumps(event)}')
destination_bucket_name = 'macie-isolation-111222333444-us-east-1'
source_bucket_name = event['detail']['resourcesAffected']['s3Bucket']['name']
file_key_name = event['detail']['resourcesAffected']['s3Object']['key']
copy_source_object = {'Bucket': source_bucket_name, 'Key': file_key_name}
logging.debug(f'destination_bucket_name: {destination_bucket_name}')
logging.debug(f'source_bucket_name: {source_bucket_name}')
logging.debug(f'file_key_name: {file_key_name}')
response = s3_client.copy_object(
except ClientError as ex:
response = s3_client.delete_object(
except ClientError as ex:
return {
'statusCode': 200,
'body': json.dumps(copy_source_object)
view raw hosted with ❤ by GitHub

Amazon EventBridge

According to Macie’s documentation, to support integration with other applications, services, and systems, such as monitoring or event management systems, Amazon Macie automatically publishes findings to Amazon EventBridge as finding events. Amazon EventBridge is a serverless event bus that makes it easier to build event-driven applications at scale using events generated from your applications, integrated Software-as-a-Service (SaaS) applications, and AWS services.

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on any of the custom data identifiers, macie-rule-custom, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

Post’s event rules, shown in the Amazon EventBridge console

Each EventBridge rule contains an event pattern. The event pattern is used to filter the incoming stream of events for particular patterns. The EventBridge rule that is triggered when a Macie finding is based on one of the three custom data identifiers, macie-rule-high, uses the event pattern shown below. This pattern examines the finding event for the name of one of the three custom data identifier names that triggered it.

"source": [
"detail-type": [
"Macie Finding"
"detail": {
"classificationDetails": {
"result": {
"customDataIdentifiers": {
"detections": {
"name": [
"Patient ID",
"US Passport",
"US Driver License"

Six data files, containing High severity findings, will be moved to the isolation bucket by the Lambda, triggered by EventBridge.

Isolation bucket containing data files with High severity findings

Scheduled Sensitive Data Discovery Jobs

Data sources commonly deliver data on a repeated basis, such as nightly data feeds. For these types of data sources, we can schedule sensitive data discovery jobs to run on a scheduled basis. For this demonstration, we will create a scheduled job using the AWS SDK for Python (Boto3). Unlike the AWS CLI-based one-time job, you don’t need to modify the project’s script, scripts/ The Python script will retrieve your AWS account ID and three custom data identifier IDs. The Python script then runs the create_classification_job command.

#!/usr/bin/env python3
# Purpose: Create Daily Macie classification job – Synthea patient data
# Author: Gary A. Stafford (March 2021)
import logging
import sys
import boto3
from botocore.exceptions import ClientError
logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
ssm_client = boto3.client('ssm')
sts_client = boto3.client('sts')
macie_client = boto3.client('macie2')
def main():
params = get_parameters()
account_id = sts_client.get_caller_identity()['Account']
custom_data_identifiers = list_custom_data_identifiers()
create_classification_job(params['patient_data_bucket'], account_id, custom_data_identifiers)
def list_custom_data_identifiers():
"""Returns a list of all custom data identifier ids"""
custom_data_identifiers = []
response = macie_client.list_custom_data_identifiers()
for item in response['items']:
return custom_data_identifiers
except ClientError as e:
def create_classification_job(patient_data_bucket, account_id, custom_data_identifiers):
"""Create Daily Macie classification job"""
response = macie_client.create_classification_job(
description='Review Synthea patient data (Daily)',
'bucketDefinitions': [
'accountId': account_id,
'buckets': [
'scoping': {
'includes': {
'and': [
'simpleScopeTerm': {
'comparator': 'EQ',
'values': [
'dailySchedule': {}
'Project': 'Amazon Macie Demo'
logging.debug(f'Response: {response}')
except ClientError as e:
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
'patient_data_bucket': ssm_client.get_parameter(Name='/macie_demo/patient_data_bucket')['Parameter']['Value']
return params
if __name__ == '__main__':

To create the scheduled sensitive data discovery job, run the following command.

python3 ./scripts/

The scheduleFrequency parameter is set to { 'dailySchedule': {} }. This value specifies a daily recurrence pattern for running the job. The initialRun parameter of the create_classification_job command is set to True. This will cause the new job to analyze all eligible objects immediately after the job is created, in addition to on a daily basis.

Scheduled sensitive data discovery job in an active/idle state


In this post, we learned how we can use Amazon Macie to discover and protect sensitive data in Amazon S3. We learned how to use automation to trigger alerts based on Macie’s findings and to isolate data files based on the types of findings. The post’s data inspection workflow can easily be incorporated into existing data lake ingestion pipelines to ensure the integrity of incoming data.

This blog represents my own viewpoints and not of my employer, Amazon Web Services (AWS). All product names, logos, and brands are the property of their respective owners.

, , , ,

Leave a comment

Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options


For anyone new to Amazon Managed Workflows for Apache Airflow (Amazon MWAA), especially those used to managing their own Apache Airflow platform, Amazon MWAA’s configuration might appear to be a bit of a black box at first. This brief post will explore Amazon MWAA’s configuration — how to inspect it and how to modify it. We will use Airflow DAGs to review an MWAA environment’s airflow.cfg file, environment variables, and Python packages.

Amazon MWAA

Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project in 2019.

With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. For more information on Amazon MWAA, read my last post, Running Spark Jobs on Amazon EMR with Apache Airflow.

Image for post
Apache Airflow UI

Source Code

The DAGs referenced in this post are available on GitHub. Using this git clone command, download a copy of this post’s GitHub repository to your local environment.

git clone --branch main --single-branch --depth 1 --no-tags \

Accessing Configuration

Environment Variables

Environment variables are an essential part of an MWAA environment’s configuration. There are various ways to examine the environment variables. You could use Airflow’s BashOperator to simply call the command, env, or the PythonOperator to call a Python iterator function, as shown below. A sample DAG, dags/, is included in the project.

def print_env_vars():
keys = str(os.environ.keys().replace("', '", "'|'").split("|")
for key in keys:
get_env_vars_operator = PythonOperator(
view raw hosted with ❤ by GitHub

The DAG’s PythonOperator will iterate over the MWAA environment’s environment variables and output them to the task’s log. Below is a snippet of an example task’s log.

[2020-12-25 23:59:07,170] {{}} INFO – Job 272: Subtask get_env_vars_task
[2020-12-25 23:59:08,423] {{}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-25 23:59:08,516] {{}} INFO – 'AIRFLOW_CONSOLE_LOGS_ENABLED': 'false'
[2020-12-25 23:59:08,689] {{}} INFO – 'AIRFLOW_CONSOLE_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:08,777] {{}} INFO – 'AIRFLOW_CTX_DAG_EMAIL': ''
[2020-12-25 23:59:08,877] {{}} INFO – 'AIRFLOW_CTX_DAG_ID': 'get_env_vars'
[2020-12-25 23:59:08,970] {{}} INFO – 'AIRFLOW_CTX_DAG_OWNER': 'airflow'
[2020-12-25 23:59:09,269] {{}} INFO – 'AIRFLOW_CTX_TASK_ID': 'get_env_vars_task'
[2020-12-25 23:59:09,357] {{}} INFO – 'AIRFLOW_DAG_PROCESSING_LOGS_ENABLED': 'false'
[2020-12-25 23:59:09,552] {{}} INFO – 'AIRFLOW_DAG_PROCESSING_LOG_LEVEL': 'WARNING'
[2020-12-25 23:59:09,647] {{}} INFO – 'AIRFLOW_ENV_NAME': 'MyAirflowEnvironment'
[2020-12-25 23:59:09,729] {{}} INFO – 'AIRFLOW_HOME': '/usr/local/airflow'
[2020-12-25 23:59:09,827] {{}} INFO – 'AIRFLOW_SCHEDULER_LOGS_ENABLED': 'false'
[2020-12-25 23:59:12,915] {{}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-25 23:59:12,986] {{}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'
[2020-12-25 23:59:13,136] {{}} INFO – 'AIRFLOW__CORE__LOAD_EXAMPLES': 'False'
[2020-12-25 23:59:13,217] {{}} INFO – 'AIRFLOW__CORE__PARALLELISM': '10000'
[2020-12-25 23:59:14,531] {{}} INFO – 'AWS_DEFAULT_REGION': 'us-east-1'
[2020-12-25 23:59:14,565] {{}} INFO – 'AWS_EXECUTION_ENV': 'AWS_ECS_FARGATE'
[2020-12-25 23:59:14,616] {{}} INFO – 'AWS_REGION': 'us-east-1'
[2020-12-25 23:59:14,647] {{}} INFO – 'CELERY_LOG_FILE': ''
[2020-12-25 23:59:14,679] {{}} INFO – 'CELERY_LOG_LEVEL': '20'
[2020-12-25 23:59:14,711] {{}} INFO – 'CELERY_LOG_REDIRECT': '1'
[2020-12-25 23:59:14,747] {{}} INFO – 'CELERY_LOG_REDIRECT_LEVEL': 'WARNING'
view raw airflow_env_vars.txt hosted with ❤ by GitHub

Airflow Configuration File

According to Airflow, the airflow.cfg file contains Airflow’s configuration. You can edit it to change any of the settings. The first time you run Apache Airflow, it creates an airflow.cfg configuration file in your AIRFLOW_HOME directory and attaches the configurations to your environment as environment variables.

Amazon MWAA doesn’t expose the airflow.cfg in the Apache Airflow UI of an environment. Although you can’t access it directly, you can view the airflow.cfg file. The configuration file is located in your AIRFLOW_HOME directory, /usr/local/airflow (~/airflow by default).

There are multiple ways to examine your MWAA environment’s airflow.cfg file. You could use Airflow’s PythonOperator to call a Python function that reads the contents of the file, as shown below. The function uses the AIRFLOW_HOME environment variable to locate and read the airflow.cfg. A sample DAG, dags/, is included in the project.

def print_airflow_cfg():
with open(f"{os.getenv('AIRFLOW_HOME')}/airflow.cfg", 'r') as airflow_cfg:
file_contents =
get_airflow_cfg_operator = PythonOperator(
view raw hosted with ❤ by GitHub

The DAG’s task will read the MWAA environment’s airflow.cfg file and output it to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 00:02:57,163] {{}} INFO – Job 274: Subtask get_airflow_cfg_task
[2020-12-26 00:02:57,583] {{}} INFO –
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /usr/local/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Set this to True if you want to enable remote logging.
remote_logging = True
# Users must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id = aws_default
remote_base_log_folder = cloudwatch://arn:aws:logs:::log-group:airflow-logs:*
encrypt_s3_logs = False
# Logging level
logging_level = INFO
# Logging level for Flask-appbuilder UI
fab_logging_level = WARN
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# Example: logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
redirect_url =
session_duration_minutes = 720
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://localhost:8080
# Default timezone to display all dates in the RBAC UI, can be UTC, system, or
# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the
# default value of core/default_timezone will be used
# Example: default_ui_timezone = America/New_York
default_ui_timezone = UTC
# The ip specified when starting the web server
web_server_host =
# The port on which to run the web server
web_server_port = 8080
view raw airflow_cfg_log.txt hosted with ❤ by GitHub

Customizing Airflow Configurations

While AWS doesn’t expose the airflow.cfg in the Apache Airflow UI of your environment, you can change the default Apache Airflow configuration options directly within the Amazon MWAA console and continue using all other settings in airflow.cfg. The configuration options changed in the Amazon MWAA console are translated into environment variables.

To customize the Apache Airflow configuration, change the default options directly on the Amazon MWAA console. Select Edit, add or modify configuration options and values in the Airflow configuration options menu, then select Save. For example, we can change Airflow’s default timezone (core.default_ui_timezone) to America/New_York.

Image for post
Amazon MWAA’s Airflow configuration options

Once the MWAA environment is updated, which may take several minutes, view your changes by re-running the DAG,dags/ Note the new configuration item on both lines 2 and 6 of the log snippet shown below. The configuration item appears on its own (AIRFLOW__CORE_DEFAULT__UI_TIMEZONE), as well as part of the AIRFLOW_CONFIG_SECRETS dictionary environment variable.

[2020-12-26 05:00:57,756] {{}} INFO – Job 293: Subtask get_env_vars_task
[2020-12-26 05:00:58,158] {{}} INFO – 'AIRFLOW_CONFIG_SECRETS': '{"AIRFLOW__CORE__DEFAULT_UI_TIMEZONE":"America/New_York"}'
[2020-12-26 05:00:58,190] {{}} INFO – 'AIRFLOW_CONN_AWS_DEFAULT': 'aws://'
[2020-12-26 05:01:00,537] {{}} INFO – 'AIRFLOW__CORE__DAG_CONCURRENCY': '10000'
[2020-12-26 05:01:00,578] {{}} INFO – 'AIRFLOW__CORE__DEFAULT_UI_TIMEZONE': 'America/New_York'
[2020-12-26 05:01:00,630] {{}} INFO – 'AIRFLOW__CORE__EXECUTOR': 'CeleryExecutor'

Using the MWAA API

We can also make configuration changes using the MWAA API. For example, to change the default Airflow UI timezone, call the MWAA API’s update-environment command using the AWS CLI. Include the --airflow-configuration-option parameter, passing the core.default_ui_timezone key/value pair as a JSON blob.

aws mwaa update-environment \
–name <your_environment_name> \
–airflow-configuration-options """{
\"core.default_ui_timezone\": \"America/Los_Angeles\"

To review an environment’s configuration, use the get-environment command in combination with jq.

aws mwaa get-environment \
–name <your_environment_name> | \
jq -r '.Environment.AirflowConfigurationOptions'

Below, we see an example of the output.

"core.default_ui_timezone": "America/Los_Angeles"

Python Packages

Airflow is written in Python, and workflows are created via Python scripts. Python packages are a crucial part of an MWAA environment’s configuration. According to the documentation, an ‘extra package’, is a Python subpackage that is not included in the Apache Airflow base, installed on your MWAA environment. As part of setting up an MWAA environment, you can specify the location of the requirements.txt file in the Airflow S3 bucket. Extra packages are installed using the requirements.txt file.

Image for post
Amazon MWAA environment’s configuration

There are several ways to check your MWAA environment’s installed Python packages and versions. You could use Airflow’s BashOperator to call the command, python3 -m pip list. A sample DAG, dags/, is included in the project.

list_python_packages_operator = BashOperator(
bash_command='python3 -m pip list'
view raw hosted with ❤ by GitHub

The DAG’s task will output a list of all Python packages and package versions to the task’s log. Below is a snippet of an example task’s log.

[2020-12-26 21:53:06,310] {{}} INFO – Temporary script location: /tmp/airflowtmp2whgp_p8/list_python_packagesxo8slhc6
[2020-12-26 21:53:06,350] {{}} INFO – Running command: python3 -m pip list
[2020-12-26 21:53:06,395] {{}} INFO – Output:
[2020-12-26 21:53:06,750] {{}} INFO – Package Version
[2020-12-26 21:53:06,786] {{}} INFO – ———————- ———
[2020-12-26 21:53:06,815] {{}} INFO – alembic 1.4.2
[2020-12-26 21:53:06,856] {{}} INFO – amqp 2.6.1
[2020-12-26 21:53:06,898] {{}} INFO – apache-airflow 1.10.12
[2020-12-26 21:53:06,929] {{}} INFO – apispec 1.3.3
[2020-12-26 21:53:06,960] {{}} INFO – argcomplete 1.12.0
[2020-12-26 21:53:07,002] {{}} INFO – attrs 19.3.0
[2020-12-26 21:53:07,036] {{}} INFO – Babel 2.8.0
[2020-12-26 21:53:07,071] {{}} INFO – billiard
[2020-12-26 21:53:07,960] {{}} INFO – boto3 1.16.10
[2020-12-26 21:53:07,993] {{}} INFO – botocore 1.19.10
[2020-12-26 21:53:08,028] {{}} INFO – cached-property 1.5.1
[2020-12-26 21:53:08,061] {{}} INFO – cattrs 1.0.0
[2020-12-26 21:53:08,096] {{}} INFO – celery 4.4.7
[2020-12-26 21:53:08,130] {{}} INFO – certifi 2020.6.20
[2020-12-26 21:53:12,260] {{}} INFO – pandas 1.1.0
[2020-12-26 21:53:12,289] {{}} INFO – pendulum 1.4.4
[2020-12-26 21:53:12,490] {{}} INFO – pip 9.0.3
[2020-12-26 21:53:12,522] {{}} INFO – prison 0.1.3
[2020-12-26 21:53:12,550] {{}} INFO – prometheus-client 0.8.0
[2020-12-26 21:53:12,580] {{}} INFO – psutil 5.7.2
[2020-12-26 21:53:12,613] {{}} INFO – pycparser 2.20
[2020-12-26 21:53:12,641] {{}} INFO – pycurl
[2020-12-26 21:53:12,676] {{}} INFO – Pygments 2.6.1
[2020-12-26 21:53:12,710] {{}} INFO – PyGreSQL 5.2.1
[2020-12-26 21:53:12,746] {{}} INFO – PyJWT 1.7.1


Understanding your Amazon MWAA environment’s airflow.cfg file, environment variables, and Python packages are all important for proper Airflow platform management. This brief post learned more about Amazon MWAA’s configuration — how to inspect it using DAGs and how to modify it through the Amazon MWAA console.

, , , ,

Leave a comment

Getting Started with Presto Federated Queries using Ahana’s PrestoDB Sandbox on AWS


According to The Presto Foundation, Presto (aka PrestoDB), not to be confused with PrestoSQL, is an open-source, distributed, ANSI SQL compliant query engine. Presto is designed to run interactive ad-hoc analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto is used in production at an immense scale by many well-known organizations, including Facebook, Twitter, Uber, Alibaba, Airbnb, Netflix, Pinterest, Atlassian, Nasdaq, and more.

In the following post, we will gain a better understanding of Presto’s ability to execute federated queries, which join multiple disparate data sources without having to move the data. Additionally, we will explore Apache Hive, the Hive Metastore, Hive partitioned tables, and the Apache Parquet file format.

Presto on AWS

There are several options for Presto on AWS. AWS recommends Amazon EMR and Amazon Athena. Presto comes pre-installed on EMR 5.0.0 and later. The Athena query engine is a derivation of Presto 0.172 and does not support all of Presto’s native features. However, Athena has many comparable features and deep integrations with other AWS services. If you need full, fine-grain control, you could deploy and manage Presto, yourself, on Amazon EC2, Amazon ECS, or Amazon EKS. Lastly, you may decide to purchase a Presto distribution with commercial support from an AWS Partner, such as Ahana or Starburst. If your organization needs 24x7x365 production-grade support from experienced Presto engineers, this is an excellent choice.

Federated Queries

In a modern Enterprise, it is rare to find all data living in a monolithic datastore. Given the multitude of available data sources, internal and external to an organization, and the growing number of purpose-built databases, analytics engines must be able to join and aggregate data across many sources efficiently. AWS defines a federated query as a capability that ‘enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources.

Presto allows querying data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, and MongoDB. In fact, there are currently 24 different Presto data source connectors available. With Presto, we can write queries that join multiple disparate data sources, without moving the data. Below is a simple example of a Presto federated query statement that correlates a customer’s credit rating with their age and gender. The query federates two different data sources, a PostgreSQL database table, postgresql.public.customer, and an Apache Hive Metastore table, hive.default.customer_demographics, whose underlying data resides in Amazon S3.

WITH credit_demographics AS (
(year (now()) c_birth_year) AS age,
cd_credit_rating AS credit_rating,
cd_gender AS gender,
count(cd_gender) AS gender_count
LEFT JOIN hive.default.customer_demographics ON c_current_cdemo_sk = cd_demo_sk
c_birth_year IS NOT NULL
AND cd_credit_rating IS NOT NULL
AND lower(cd_credit_rating) != 'unknown'
AND cd_gender IS NOT NULL
age BETWEEN 21 AND 65


The Linux Foundation’s Presto Foundation member, Ahana, was founded as the first company focused on bringing PrestoDB-based ad hoc analytics offerings to market and working to foster growth and evangelize the Presto community. Ahana’s mission is to simplify ad hoc analytics for organizations of all shapes and sizes. Ahana has been successful in raising seed funding, led by GV (formerly Google Ventures). Ahana’s founders have a wealth of previous experience in tech companies, including Alluxio, Kinetica, Couchbase, IBM, Apple, Splunk, and Teradata.

PrestoDB Sandbox

This post will use Ahana’s PrestoDB Sandbox, an Amazon Linux 2, AMI-based solution available on AWS Marketplace, to execute Presto federated queries.

Ahana’s PrestoDB Sandbox AMI allows you to easily get started with Presto to query data wherever your data resides. This AMI configures a single EC2 instance Sandbox to be both the Presto Coordinator and a Presto Worker. It comes with an Apache Hive Metastore backed by PostgreSQL bundled in. In addition, the following catalogs are bundled in to try, test, and prototype with Presto:

  • JMX: useful for monitoring and debugging Presto
  • Memory: stores data and metadata in RAM, which is discarded when Presto restarts
  • TPC-DS: provides a set of schemas to support the TPC Benchmark DS
  • TPC-H: provides a set of schemas to support the TPC Benchmark H

Apache Hive

In this demonstration, we will use Apache Hive and an Apache Hive Metastore backed by PostgreSQL. Apache Hive is data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. The structure can be projected onto data already in storage. A command-line tool and JDBC driver are provided to connect users to Hive. The Metastore provides two essential features of a data warehouse: data abstraction and data discovery. Hive accomplishes both features by providing a metadata repository that is tightly integrated with the Hive query processing system so that data and metadata are in sync.

Getting Started

To get started creating federated queries with Presto, we first need to create and configure our AWS environment, as shown below.

Architecture of the demonstration’s AWS environment and resources

Subscribe to Ahana’s PrestoDB Sandbox

To start, subscribe to Ahana’s PrestoDB Sandbox on AWS Marketplace. Make sure you are aware of the costs involved. The AWS current pricing for the default, Linux-based r5.xlarge on-demand EC2 instance hosted in US East (N. Virginia) is USD 0.252 per hour. For the demonstration, since performance is not an issue, you could try a smaller EC2 instance, such as r5.large instance costs USD 0.126 per hour.

The configuration process will lead you through the creation of an EC2 instance based on Ahana’s PrestoDB Sandbox AMI.

I chose to create the EC2 instance in my default VPC. Part of the demonstration includes connecting to Presto locally using JDBC. Therefore, it was also necessary to include a public IP address for the EC2 instance. If you chose to do so, I strongly recommend limiting the required ports 22 and 8080 in the instance’s Security Group to just your IP address (a /32 CIDR block).

Limiting access to ports 22 and 8080 from only my current IP address

Lastly, we need to assign an IAM Role to the EC2 instance, which has access to Amazon S3. I assigned the AWS managed policy, AmazonS3FullAccess, to the EC2’s IAM Role.

Attaching the AmazonS3FullAccess AWS managed policy to the Role

Part of the configuration also asks for a key pair. You can use an existing key or create a new key for the demo. For reference in future commands, I am using a key named ahana-presto and my key path of ~/.ssh/ahana-presto.pem. Be sure to update the commands to match your own key’s name and location.

Once complete, instructions for using the PrestoDB Sandbox EC2 are provided.

You can view the running EC2 instance, containing Presto, from the web-based AWS EC2 Management Console. Make sure to note the public IPv4 address or the public IPv4 DNS address as this value will be required during the demo.

AWS CloudFormation

We will use Amazon RDS for PostgreSQL and Amazon S3 as additional data sources for Presto. Included in the project files on GitHub is an AWS CloudFormation template, cloudformation/presto_ahana_demo.yaml. The template creates a single RDS for PostgreSQL instance in the default VPC and an encrypted Amazon S3 bucket.

AWSTemplateFormatVersion: "2010-09-09"
Description: "This template deploys a RDS PostgreSQL database and an Amazon S3 bucket"
Type: String
Default: "ahana-prestodb-demo"
Type: String
Default: "postgres"
Type: String
Default: "12.3"
Type: String
Default: "us-east-1f"
Type: String
Default: "db.t3.medium"
Type: String
Default: "gp2"
Type: Number
Default: 20
Type: String
Default: "shipping"
Type: String
Default: "presto"
Type: String
Default: "5up3r53cr3tPa55w0rd"
# NoEcho: True
Type: AWS::RDS::DBInstance
Ref: DBInstanceIdentifier
Ref: DBName
Ref: DBAllocatedStorage
Ref: DBInstanceClass
Ref: DBStorageType
Ref: DBEngine
Ref: DBEngineVersion
Ref: DBUser
Ref: DBPassword
AvailabilityZone: !Ref DBAvailabilityZone
PubliclyAccessible: true
Key: Project
Value: "Demo of RDS PostgreSQL"
DeletionPolicy: Retain
Type: AWS::S3::Bucket
SSEAlgorithm: AES256
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Description: "Endpoint of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Address
Description: "Port of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Port
Description: "JDBC connection string of RDS PostgreSQL database"
Value: !Join
– "jdbc:postgresql://"
!GetAtt MasterDatabase.Endpoint.Address
!GetAtt MasterDatabase.Endpoint.Port
!Ref DBName
!Ref DBUser
!Ref DBPassword
Description: "Name of Amazon S3 data bucket"
Value: !Ref DataBucket

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \

To create the AWS CloudFormation stack from the template, cloudformation/rds_s3.yaml, execute the following aws cloudformation command. Make sure you change the DBAvailabilityZone parameter value (shown in bold) to match the AWS Availability Zone in which your Ahana PrestoDB Sandbox EC2 instance was created. In my case, us-east-1f.

aws cloudformation create-stack \
--stack-name ahana-prestodb-demo \
--template-body file://cloudformation/presto_ahana_demo.yaml \
--parameters ParameterKey=DBAvailabilityZone,ParameterValue=us-east-1f

To ensure the RDS for PostgreSQL database instance can be accessed by Presto running on the Ahana PrestoDB Sandbox EC2, manually add the PrestoDB Sandbox EC2’s Security Group to port 5432 within the database instance’s VPC Security Group’s Inbound rules. I have also added my own IP to port 5432, which enables me to connect to the RDS instance directly from my IDE using JDBC.

The AWS CloudFormation stack’s Outputs tab includes a set of values, including the JDBC connection string for the new RDS for PostgreSQL instance, JdbcConnString, and the Amazon S3 bucket’s name, Bucket. All these values will be required during the demonstration.

Preparing the PrestoDB Sandbox

There are a few steps we need to take to properly prepare the PrestoDB Sandbox EC2 for our demonstration. First, use your PrestoDB Sandbox EC2 SSH key to scp the properties and sql directories to the Presto EC2 instance. First, you will need to set the EC2_ENDPOINT value (shown in bold) to your EC2’s public IPv4 address or public IPv4 DNS value. You can hardcode the value or use the aws ec2 API command is shown below to retrieve the value programmatically.

# on local workstation
EC2_ENDPOINT=$(aws ec2 describe-instances \
--filters "Name=product-code,Values=ejee5zzmv4tc5o3tr1uul6kg2" \
"Name=product-code.type,Values=marketplace" \
--query "Reservations[*].Instances[*].{Instance:PublicDnsName}" \
--output text)
scp -i "~/.ssh/ahana-presto.pem" \
-r properties/ sql/ \
ssh -i "~/.ssh/ahana-presto.pem" ec2-user@${EC2_ENDPOINT}

Environment Variables

Next, we need to set several environment variables. First, replace the DATA_BUCKET and POSTGRES_HOST values below (shown in bold) to match your environment. The PGPASSWORD value should be correct unless you changed it in the CloudFormation template. Then, execute the command to add the variables to your .bash_profile file.

echo """
export DATA_BUCKET=prestodb-demo-databucket-CHANGE_ME
export PGPASSWORD=5up3r53cr3tPa55w0rd
export JAVA_HOME=/usr
export HADOOP_HOME=/home/ec2-user/hadoop
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/tools/lib/*
export HIVE_HOME=/home/ec2-user/hive
""" >>~/.bash_profile

Optionally, I suggest updating the EC2 instance with available updates and install your favorite tools, likehtop, to monitor the EC2 performance.

yes | sudo yum update
yes | sudo yum install htop
View of htop running on an r5.xlarge EC2 instance

Before further configuration for the demonstration, let’s review a few aspects of the Ahana PrestoDB EC2 instance. There are several applications pre-installed on the instance, including Java, Presto, Hadoop, PostgreSQL, and Hive. Versions shown are current as of early September 2020.

java -version
# openjdk version "1.8.0_252"
# OpenJDK Runtime Environment (build 1.8.0_252-b09)
# OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
hadoop version
# Hadoop 2.9.2
postgres --version
# postgres (PostgreSQL) 9.2.24
psql --version
# psql (PostgreSQL) 9.2.24
hive --version
# Hive 2.3.7
presto-cli --version
# Presto CLI 0.235-cb21100

The Presto configuration files are in the /etc/presto/ directory. The Hive configuration files are in the ~/hive/conf/ directory. Here are a few commands you can use to gain a better understanding of their configurations.

ls /etc/presto/
cat /etc/presto/jvm.config
cat /etc/presto/
cat /etc/presto/
# installed and configured catalogs
ls /etc/presto/catalog/
cat ~/hive/conf/hive-site.xml

Configure Presto

To configure Presto, we need to create and copy a new Presto postgresql catalog properties file for the newly created RDS for PostgreSQL instance. Modify the properties/ file, replacing the value, connection-url (shown in bold), with your own JDBC connection string, shown in the CloudFormation Outputs tab.

Move the file to its correct location using sudo.

sudo mv properties/ /etc/presto/catalog/

We also need to modify the existing Hive catalog properties file, which will allow us to write to non-managed Hive tables from Presto.

The following command will overwrite the existing file with the modified version containing the new property.

sudo mv properties/ |

To finalize the configuration of the catalog properties files, we need to restart Presto. The easiest way is to reboot the EC2 instance, then SSH back into the instance. Since our environment variables are in the .bash_profile file, they will survive a restart and logging back into the EC2 instance.

sudo reboot

Add Tables to Apache Hive Metastore

We will use RDS for PostgreSQL and Apache Hive Metastore/Amazon S3 as additional data sources for our federated queries. The Ahana PrestoDB Sandbox instance comes pre-configured with Apache Hive and an Apache Hive Metastore, backed by PostgreSQL (a separate PostgreSQL 9.x instance pre-installed on the EC2).

The Sandbox’s instance of Presto comes pre-configured with schemas for the TPC Benchmark DS (TPC-DS). We will create identical tables in our Apache Hive Metastore, which correspond to three external tables in the TPC-DS data source’s sf1 schema: tpcds.sf1.customer, tpcds.sf1.customer_address, and tpcds.sf1.customer_demographics. A Hive external table describes the metadata/schema on external files. External table files can be accessed and managed by processes outside of Hive. As an example, here is the SQL statement that creates the external customer table in the Hive Metastore and whose data will be stored in the S3 bucket.

`c_customer_sk` bigint,
`c_customer_id` char(16),
`c_current_cdemo_sk` bigint,
`c_current_hdemo_sk` bigint,
`c_current_addr_sk` bigint,
`c_first_shipto_date_sk` bigint,
`c_first_sales_date_sk` bigint,
`c_salutation` char(10),
`c_first_name` char(20),
`c_last_name` char(30),
`c_preferred_cust_flag` char(1),
`c_birth_day` integer,
`c_birth_month` integer,
`c_birth_year` integer,
`c_birth_country` char(20),
`c_login` char(13),
`c_email_address` char(50),
`c_last_review_date_sk` bigint)
TBLPROPERTIES ('parquet.compression'='SNAPPY');

The threeCREATE EXTERNAL TABLE SQL statements are included in the sql/ directory: sql/hive_customer.sql, sql/hive_customer_address.sql, and sql/hive_customer_demographics.sql. The bucket name (shown in bold above), needs to be manually updated to your own bucket name in all three files before continuing.

Next, run the following hive commands to create the external tables in the Hive Metastore within the existing default schema/database.

hive --database default -f sql/hive_customer.sql
hive --database default -f sql/hive_customer_address.sql
hive --database default -f sql/hive_customer_demographics.sql

To confirm the tables were created successfully, we could use a variety of hive commands.

hive --database default -e "SHOW TABLES;"
hive --database default -e "DESCRIBE FORMATTED customer;"
hive --database default -e "SELECT * FROM customer LIMIT 5;"
Using the ‘DESCRIBE FORMATTED customer_address;’ Hive command

Alternatively, you can also create the external table interactively from within Hive, using the hive command to access the CLI. Copy and paste the contents of the SQL files to the hive CLI. To exit hive use quit;.

Interactively querying within Apache Hive

Amazon S3 Data Source Setup

With the external tables created, we will now select all the data from each of the three tables in the TPC-DS data source and insert that data into the equivalent Hive tables. The physical data will be written to Amazon S3 as highly-efficient, columnar storage format, SNAPPY-compressed Apache Parquet files. Execute the following commands. I will explain why the customer_address table statements are a bit different, next.

# inserts 100,000 rows
presto-cli --execute """
INSERT INTO hive.default.customer
SELECT * FROM tpcds.sf1.customer;
# inserts 50,000 rows across 52 partitions
presto-cli --execute """
INSERT INTO hive.default.customer_address
SELECT ca_address_sk, ca_address_id, ca_street_number,
ca_street_name, ca_street_type, ca_suite_number,
ca_city, ca_county, ca_zip, ca_country, ca_gmt_offset,
ca_location_type, ca_state
FROM tpcds.sf1.customer_address
ORDER BY ca_address_sk;
# add new partitions in metastore
hive -e "MSCK REPAIR TABLE default.customer_address;"
# inserts 1,920,800 rows
presto-cli --execute """
INSERT INTO hive.default.customer_demographics
SELECT * FROM tpcds.sf1.customer_demographics;

Confirm the data has been loaded into the correct S3 bucket locations and is in Parquet-format using the AWS Management Console or AWS CLI. Rest assured, the Parquet-format data is SNAPPY-compressed even though the S3 console incorrectly displays Compression as None. You can easily confirm the compression codec with a utility like parquet-tools.

Data organized by key prefixes in Amazon S3
Using S3’s ‘Select from’ feature to preview the SNAPPY-compressed Parquet format data

Partitioned Tables

The customer_address table is unique in that it has been partitioned by the ca_state column. Partitioned tables are created using the PARTITIONED BY clause.

CREATE EXTERNAL TABLE `customer_address`(
`ca_address_sk` bigint,
`ca_address_id` char(16),
`ca_street_number` char(10),
`ca_street_name` char(60),
`ca_street_type` char(15),
`ca_suite_number` char(10),
`ca_city` varchar(60),
`ca_county` varchar(30),
`ca_zip` char(10),
`ca_country` char(20),
`ca_gmt_offset` double precision,
`ca_location_type` char(20)
PARTITIONED BY (`ca_state` char(2))
TBLPROPERTIES ('parquet.compression'='SNAPPY');

According to Apache Hive, a table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Since the data for the Hive tables are stored in Amazon S3, that means that when the data is written to the customer_address table, it is automatically separated into different S3 key prefixes based on the state. The data is physically “partitioned”.

customer_address data, partitioned by the state, in Amazon S3

Whenever add new partitions in S3, we need to run the MSCK REPAIR TABLE command to add that table’s new partitions to the Hive Metastore.

hive -e "MSCK REPAIR TABLE default.customer_address;"

In SQL, a predicate is a condition expression that evaluates to a Boolean value, either true or false. Defining the partitions aligned with the attributes that are frequently used in the conditions/filters (predicates) of the queries can significantly increase query efficiency. When we execute a query that uses an equality comparison condition, such as ca_state = 'TN', partitioning means the query will only work with a slice of the data in the corresponding ca_state=TN prefix key. There are 50,000 rows of data in the customer_address table, but only 1,418 rows (2.8% of the total data) in the ca_state=TN partition. With the additional advantage of Parquet format with SNAPPY compression, partitioning can significantly reduce query execution time.

Adding Data to RDS for PostgreSQL Instance

For the demonstration, we will also replicate the schema and data of the tpcds.sf1.customer_address table to the new RDS for PostgreSQL instance’s shipping database.

CREATE TABLE customer_address (
ca_address_sk bigint,
ca_address_id char(16),
ca_street_number char(10),
ca_street_name char(60),
ca_street_type char(15),
ca_suite_number char(10),
ca_city varchar(60),
ca_county varchar(30),
ca_state char(2),
ca_zip char(10),
ca_country char(20),
ca_gmt_offset double precision,
ca_location_type char(20)

Like Hive and Presto, we can create the table programmatically from the command line or interactively; I prefer the programmatic approach. Use the following psql command, we can create the customer_address table in the public schema of the shipping database.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-f sql/postgres_customer_address.sql

Now, to insert the data into the new PostgreSQL table, run the following presto-cli command.

# inserts 50,000 rows
presto-cli --execute """
INSERT INTO rds_postgresql.public.customer_address
SELECT * FROM tpcds.sf1.customer_address;

To confirm that the data was imported properly, we can use a variety of commands.

-- Should be 50000 rows in table
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT COUNT(*) FROM customer_address;"
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT * FROM customer_address LIMIT 5;"

Alternatively, you could use the PostgreSQL client interactively by copying and pasting the contents of the sql/postgres_customer_address.sql file to the psql command prompt. To interact with PostgreSQL from the psql command prompt, use the following command.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto

Use the \dt command to list the PostgreSQL tables and the \q command to exit the PostgreSQL client. We now have all the new data sources created and configured for Presto!

Interacting with Presto

Presto provides a web interface for monitoring and managing queries. The interface provides dashboard-like insights into the Presto Cluster and queries running on the cluster. The Presto UI is available on port 8080 using the public IPv4 address or the public IPv4 DNS.

There are several ways to interact with Presto, via the PrestoDB Sandbox. The post will demonstrate how to execute ad-hoc queries against Presto from an IDE using a JDBC connection and the Presto CLI. Other options include running queries against Presto from Java and Python applications, Tableau, or Apache Spark/PySpark.

Below, we see a query being run against Presto from JetBrains PyCharm, using a Java Database Connectivity (JDBC) connection. The advantage of using an IDE like JetBrains is having a single visual interface, including all the project files, multiple JDBC configurations, output results, and the ability to run multiple ad hoc queries.

Below, we see an example of configuring the Presto Data Source using the JDBC connection string, supplied in the CloudFormation stack Outputs tab.

Make sure to download and use the latest Presto JDBC driver JAR.

With JetBrains’ IDEs, we can even limit the databases/schemas displayed by the Data Source. This is helpful when we have multiple Presto catalogs configured, but we are only interested in certain data sources.

We can also run queries using the Presto CLI, three different ways. We can pass a SQL statement to the Presto CLI, pass a file containing a SQL statement to the Presto CLI, or work interactively from the Presto CLI. Below, we see a query being run, interactively from the Presto CLI.

As the query is running, we can observe the live Presto query statistics (not very user friendly in my terminal).

And finally, the view the query results.

Federated Queries

The example queries used in the demonstration and included in the project were mainly extracted from the scholarly article, Why You Should Run TPC-DS: A Workload Analysis, available as a PDF on the website. I have modified the SQL queries to work with Presto.

In the first example, we will run the three versions of the same basic query statement. Version 1 of the query is not a federated query; it only queries a single data source. Version 2 of the query queries two different data sources. Finally, version 3 of the query queries three different data sources. Each of the three versions of the SQL statement should return the same results — 93 rows of data.

Version 1: Single Data Source

The first version of the query statement, sql/presto_query2.sql, is not a federated query. Each of the query’s four tables (catalog_returns, date_dim, customer, and customer_address) reference the TPC-DS data source, which came pre-installed with the PrestoDB Sandbox. Note table references on lines 11–13 and 41–42 are all associated with the tpcds.sf1 schema.

Modified version of
Figure 7: Reporting Query (Query 40)
WITH customer_total_return AS (
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
customer_total_return ctr1,
ctr1.ctr_return > (
avg(ctr_return) * 1.2
customer_total_return ctr2
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
view raw presto_query2.sql hosted with ❤ by GitHub

We will run each query non-interactively using the presto-cli. We will choose the sf1 (scale factor of 1) tpcds schema. According to Presto, every unit in the scale factor (sf1, sf10, sf100) corresponds to a gigabyte of data.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2.sql \
--output-format ALIGNED \
--client-tags "presto_query2"

Below, we see the query results in the presto-cli.

Below, we see the first query running in Presto’s web interface.

Below, we see the first query’s results detailed in Presto’s web interface.

Version 2: Two Data Sources

In the second version of the query statement, sql/presto_query2_federated_v1.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. The other two tables (customer and customer_address) now reference the Apache Hive Metastore for their schema and underlying data in Amazon S3. Note table references on lines 11 and 12, as opposed to lines 13, 41, and 42.

Modified version of
Figure 7: Reporting Query (Query 40)
WITH customer_total_return AS (
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
customer_total_return ctr1,
ctr1.ctr_return > (
avg(ctr_return) * 1.2
customer_total_return ctr2
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk

Again, run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v1.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v1"

Below, we see the second query’s results detailed in Presto’s web interface.

Even though the data is in two separate and physically different data sources, we can easily query it as though it were all in the same place.

Version 3: Three Data Sources

In the third version of the query statement, sql/presto_query2_federated_v2.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. One of the tables (hive.default.customer) references the Apache Hive Metastore. The fourth table (rds_postgresql.public.customer_address) references the new RDS for PostgreSQL database instance. The underlying data is in Amazon S3. Note table references on lines 11 and 12, and on lines 13 and 41, as opposed to line 42.

Modified version of
Figure 7: Reporting Query (Query 40)
WITH customer_total_return AS (
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
customer_total_return ctr1,
ctr1.ctr_return > (
avg(ctr_return) * 1.2
customer_total_return ctr2
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk

Again, we have run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v2.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v2"

Below, we see the third query’s results detailed in Presto’s web interface.

Again, even though the data is in three separate and physically different data sources, we can easily query it as though it were all in the same place.

Additional Query Examples

The project contains several additional query statements, which I have extracted from Why You Should Run TPC-DS: A Workload Analysis and modified work with Presto and federate across multiple data sources.

# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1.sql \
--output-format ALIGNED \
--client-tags "presto_query1"
# federated - two sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query1_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4.sql \
--output-format ALIGNED \
--client-tags "presto_query4"
# federated - three sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query4_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query5.sql \
--output-format ALIGNED \
--client-tags "presto_query5"


In this post, we gained a better understanding of Presto using Ahana’s PrestoDB Sandbox product from AWS Marketplace. We learned how Presto queries data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, MongoDB, etc. We also learned about Apache Hive and the Apache Hive Metastore, Apache Parquet file format, and how and why to partition Hive data in Amazon S3. Most importantly, we learned how to write federated queries that join multiple disparate data sources without having to move the data into a single monolithic data store.

This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

Leave a comment

Collecting and Analyzing IoT Data in Near Real-Time with AWS IoT, LoRa, and LoRaWAN


In a recent post published on ITNEXT, LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT, we explored the use of the LoRa (Long Range) and LoRaWAN protocols to transmit and receive sensor data, over a substantial distance, between an IoT device, containing several embedded sensors, and an IoT gateway. In this post, we will extend that architecture to the Cloud, using AWS IoT, a broad and deep set of IoT services, from the edge to the Cloud. We will securely collect, transmit, and analyze IoT data using the AWS cloud platform.

LoRa and LoRaWAN

According to the LoRa Alliance, Low-Power, Wide-Area Networks (LPWAN) are projected to support a major portion of the billions of devices forecasted for the Internet of Things (IoT). LoRaWAN is designed from the bottom up to optimize LPWANs for battery lifetime, capacity, range, and cost. LoRa and LoRaWAN permit long-range connectivity for IoT devices in different types of industries. According to Wikipedia, LoRaWAN defines the communication protocol and system architecture for the network, while the LoRa physical layer enables the long-range communication link.


AWS describes AWS IoT as a set of managed services that enable ‘internet-connected devices to connect to the AWS Cloud and lets applications in the cloud interact with internet-connected devices.’ AWS IoT services span three categories: Device Software, Connectivity and Control, and Analytics.

In this post, we will focus on three AWS IOT services, one from each category, including AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics. According to AWS, the AWS IoT Device SDKs include open-source libraries and developer and porting guides with samples to help you build innovative IoT products or solutions on your choice of hardware platforms. AWS IoT Core is a managed cloud service that lets connected devices easily and securely interact with cloud applications and other devices. AWS IoT Core can process and route messages to AWS endpoints and other devices reliably and securely. Finally, AWS IoT Analytics is a fully-managed IoT analytics service, designed specifically for IoT, which collects, pre-processes, enriches, stores, and analyzes IoT device data at scale.

To learn more about AWS IoT, specifically the AWS IoT services we will be exploring within this post, I recommend reading my recent post published on Towards Data Science, Getting Started with IoT Analytics on AWS.

Hardware Selection

In this post, we will use the following hardware.

IoT Device with Embedded Sensors

An Arduino single-board microcontroller will serve as our IoT device. The 3.3V AI-enabled Arduino Nano 33 BLE Sense board (Amazon: USD 36.00), released in August 2019, comes with the powerful nRF52840 processor from Nordic Semiconductors, a 32-bit ARM Cortex-M4 CPU running at 64 MHz, 1MB of CPU Flash Memory, 256KB of SRAM, and a NINA-B306 stand-alone Bluetooth 5 low energy (BLE) module.

The Sense contains an impressive array of embedded sensors:

  • 9-axis Inertial Sensor (LSM9DS1): 3D digital linear acceleration sensor, a 3D digital
    angular rate sensor, and a 3D digital magnetic sensor
  • Humidity and Temperature Sensor (HTS221): Capacitive digital sensor for relative humidity and temperature
  • Barometric Sensor (LPS22HB): MEMS nano pressure sensor: 260–1260 hectopascal (hPa) absolute digital output barometer
  • Microphone (MP34DT05): MEMS audio sensor omnidirectional digital microphone
  • Gesture, Proximity, Light Color, and Light Intensity Sensor (APDS9960): Advanced Gesture detection, Proximity detection, Digital Ambient Light Sense (ALS), and Color Sense (RGBC).

The Arduino Sense is an excellent, low-cost single-board microcontroller for learning about the collection and transmission of IoT sensor data.

IoT Gateway

An IoT Gateway, according to TechTarget, is a physical device or software program that serves as the connection point between the Cloud and controllers, sensors, and intelligent devices. All data moving to the Cloud, or vice versa, goes through the gateway, which can be either a dedicated hardware appliance or software program.

LoRa Gateways, to paraphrase The Things Network, form the bridge between devices and the Cloud. Devices use low power networks like LoRaWAN to connect to the Gateway, while the Gateway uses high bandwidth networks like WiFi, Ethernet, or Cellular to connect to the Cloud.

A third-generation Raspberry Pi 3 Model B+ single-board computer (SBC) will serve as our LoRa IoT Gateway. This Raspberry Pi model features a 1.4GHz Cortex-A53 (ARMv8) 64-bit quad-core processor System on a Chip (SoC), 1GB LPDDR2 SDRAM, dual-band wireless LAN, Bluetooth 4.2 BLE, and Gigabit Ethernet (Amazon: USD 42.99).

LoRa Transceiver Modules

To transmit the IoT sensor data between the IoT device, containing the embedded sensors, and the IoT gateway, I have used the REYAX RYLR896 LoRa transceiver module (Amazon: USD 19.50 x 2). The transceiver modules are commonly referred to as a universal asynchronous receiver-transmitter (UART). A UART is a computer hardware device for asynchronous serial communication in which the data format and transmission speeds are configurable.

According to the manufacturer, REYAX, the RYLR896 contains the Semtech SX1276 long-range, low power transceiver. The RYLR896 module provides ultra-long range spread spectrum communication and high interference immunity while minimizing current consumption. Each RYLR896 module contains a small, PCB integrated, helical antenna. This transceiver operates at both the 868 and 915 MHz frequency ranges. In this demonstration, we will be transmitting at 915 MHz for North America.

The Arduino Sense (IoT device) transmits data, using one of the RYLR896 modules (shown below front). The Raspberry Pi (IoT Gateway), connected to the other RYLR896 module (shown below rear), receives the data.

LoRaWAN Security

The RYLR896 is capable of AES 128-bit data encryption. Using the Advanced Encryption Standard (AES), we will encrypt the data sent from the IoT device to the IoT gateway, using a 32 hex digit password (128 bits / 4 bits/hex digit).

Provisioning AWS Resources

To start, we will create the necessary AWS IoT and associated resources on the AWS cloud platform. Once these resources are in place, we can then proceed to configure the IoT device and IoT gateway to securely transmit the sensor data to the Cloud.

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone --branch master --single-branch --depth 1 --no-tags \

AWS CloudFormation

The CloudFormation template, iot-analytics.yaml, will create an AWS IoT CloudFormation stack containing the following resources.

  • AWS IoT Thing
  • AWS IoT Thing Policy
  • AWS IoT Core Topic Rule
  • AWS IoT Analytics Channel, Pipeline, Data store, and Data set
  • AWS Lambda and Lambda Permission
  • Amazon S3 Bucket
  • Amazon SageMaker Notebook Instance
  • AWS IAM Roles

Please be aware of the costs involved with the AWS resources used in the CloudFormation template before continuing. To create the AWS CloudFormation stack from the included CloudFormation template, execute the following AWS CLI command.

aws cloudformation create-stack \
–stack-name lora-iot-demo \
–template-body file://cloudformation/iot-analytics.yaml \
–parameters ParameterKey=ProjectName,ParameterValue=lora-iot-demo \
ParameterKey=IoTTopicName,ParameterValue=lora-iot-demo \

The resulting CloudFormation stack should contain 16 AWS resources.

Additional Resources

Unfortunately, AWS CloudFormation cannot create all the AWS IoT resources we require for this demonstration. To complete the AWS provisioning process, execute the following series of AWS CLI commands, These commands will create the remaining resources, including an AWS IoT Thing Type, Thing Group, Thing Billing Group, and an X.509 Certificate.

# LoRaWAN / AWS IoT Demo
# Author: Gary Stafford
# Run AWS CLI commands after CloudFormation stack completes successfully
# variables
mkdir ${thingName}
aws iot create-keys-and-certificate \
–certificate-pem-outfile "${thingName}/${thingName}.cert.pem" \
–public-key-outfile "${thingName}/${thingName}.public.key" \
–private-key-outfile "${thingName}/${thingName}.private.key" \
# assuming you only have one certificate registered
certificate=$(aws iot list-certificates | jq '.[][] | .certificateArn')
## alternately, for a specific certificate if you have more than one
# aws iot list-certificates
## then change the value below
# certificate=arn:aws:iot:us-east-1:123456789012:cert/<certificate>
aws iot attach-policy \
–policy-name $thingPolicy \
–target $certificate
aws iot attach-thing-principal \
–thing-name $thingName \
–principal $certificate
aws iot create-thing-type \
–thing-type-name $thingType \
–thing-type-properties "thingTypeDescription=LoRaWAN IoT Gateway"
aws iot create-thing-group \
–thing-group-name $thingGroup \
–thing-group-properties "thingGroupDescription=\"LoRaWAN IoT Gateway Thing Group\", attributePayload={attributes={Manufacturer=RaspberryPiFoundation}}"
aws iot add-thing-to-thing-group \
–thing-name $thingName \
–thing-group-name $thingGroup
aws iot create-billing-group \
–billing-group-name $thingBillingGroup \
–billing-group-properties "billingGroupDescription=\"Gateway Billing Group\""
aws iot add-thing-to-billing-group \
–thing-name $thingName \
–billing-group-name $thingBillingGroup
aws iot update-thing \
–thing-name $thingName \
–thing-type-name $thingType \
–attribute-payload "{\"attributes\": {\"GatewayMfr\":\"RaspberryPiFoundation\", \"LoRaMfr\":\"REYAX\", \"LoRaModel\":\"RYLR896\"}}"
aws iot describe-thing \
–thing-name $thingName
view raw hosted with ❤ by GitHub

IoT Device Configuration

With the AWS resources deployed, we can configure the IoT device and IoT Gateway.

Arduino Sketch

For those not familiar with Arduino, a sketch is the name that Arduino uses for a program. It is the unit of code that is uploaded into non-volatile flash memory and runs on an Arduino board. The Arduino language is a set of C and C++ functions. All standard C and C++ constructs supported by the avr-g++ compiler should work in Arduino.

For this post, the sketch, lora_iot_demo_aws.ino, contains the code necessary to collect and securely transmit the environmental sensor data, including temperature, relative humidity, barometric pressure, Red, Green, and Blue (RGB) color, and ambient light intensity, using the LoRaWAN protocol.

Description: Transmits Arduino Nano 33 BLE Sense sensor telemetry over LoRaWAN,
including temperature, humidity, barometric pressure, and color,
using REYAX RYLR896 transceiver modules
Author: Gary Stafford
#include <Arduino_HTS221.h>
#include <Arduino_LPS22HB.h>
#include <Arduino_APDS9960.h>
const int UPDATE_FREQUENCY = 5000; // update frequency in ms
const float CALIBRATION_FACTOR = –4.0; // temperature calibration factor (Celsius)
const int ADDRESS = 116;
const int NETWORK_ID = 6;
const String PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF";
const String DELIMITER = "|";
String uid = "";
void setup()
Serial1.begin(115200); // default baud rate of module is 115200
delay(1000); // wait for LoRa module to be ready
// get unique transceiver id to identify iot device on network
uid = Serial1.readString();
uid.replace("+UID=", ""); // trim off '+UID=' at start of line
uid.replace("\r\n", ""); // trim off CR/LF at end of line
// needs all need to be same for receiver and transmitter
Serial1.print((String)"AT+ADDRESS=" + ADDRESS + "\r\n");
Serial1.print((String)"AT+NETWORKID=" + NETWORK_ID + "\r\n");
Serial1.print("AT+CPIN=" + PASSWORD + "\r\n");
Serial1.print("AT+CPIN?\r\n"); // confirm password is set
if (!HTS.begin())
{ // initialize HTS221 sensor
Serial.println("Failed to initialize humidity temperature sensor!");
while (1);
if (!BARO.begin())
{ // initialize LPS22HB sensor
Serial.println("Failed to initialize pressure sensor!");
while (1);
// avoid bad readings to start bug
if (!APDS.begin())
{ // initialize APDS9960 sensor
Serial.println("Failed to initialize color sensor!");
while (1);
void loop()
void updateReadings()
float temperature = getTemperature(CALIBRATION_FACTOR);
float humidity = getHumidity();
float pressure = getPressure();
int colors[4];
String payload = buildPayload(temperature, humidity, pressure, colors);
Serial.println("Payload: " + payload); // display the payload for debugging
Serial1.print(payload); // send the payload over LoRaWAN WiFi
displayResults(temperature, humidity, pressure, colors); // display the results for debugging
float getTemperature(float calibration)
return HTS.readTemperature() + calibration;
float getHumidity()
return HTS.readHumidity();
float getPressure()
return BARO.readPressure();
void getColor(int c[])
// check if a color reading is available
while (!APDS.colorAvailable())
int r, g, b, a;
APDS.readColor(r, g, b, a);
c[0] = r;
c[1] = g;
c[2] = b;
c[3] = a;
// display for debugging purposes
void displayResults(float t, float h, float p, int c[])
Serial.println((String)"UID: " + uid);
Serial.print("Temperature: ");
Serial.print("Humidity: ");
Serial.print("Pressure: ");
Serial.print("Color (r, g, b, a): ");
Serial.print(", ");
Serial.print(", ");
Serial.print(", ");
String buildPayload(float t, float h, float p, int c[])
String readings = "";
readings += uid;
readings += DELIMITER;
readings += t;
readings += DELIMITER;
readings += h;
readings += DELIMITER;
readings += p;
readings += DELIMITER;
readings += c[0];
readings += DELIMITER;
readings += c[1];
readings += DELIMITER;
readings += c[2];
readings += DELIMITER;
readings += c[3];
String payload = "";
payload += "AT+SEND=";
payload += ADDRESS;
payload += ",";
payload += readings.length();
payload += ",";
payload += readings;
payload += "\r\n";
return payload;

AT Commands

Communications with the RYLR896’s long-range modem is done using AT commands. AT commands are instructions used to control a modem. AT is the abbreviation of ATtention. Every command line starts with AT. That is why modem commands are called AT commands, according to Developer’s Home. A complete list of AT commands can be downloaded as a PDF from the RYLR896 product page.

To efficiently transmit the environmental sensor data from the IoT sensor to the IoT gateway, the sketch concatenates the sensor ID and the sensor values together in a single string. The string will be incorporated into an AT command, sent to the RYLR896 LoRa transceiver module. To make it easier to parse the sensor data on the IoT gateway, we will delimit the sensor values with a pipe (|), as opposed to a comma. According to REYAX, the maximum length of the LoRa payload is approximately 330 bytes.

Below, we see an example of an AT command used to send the sensor data from the IoT sensor and the corresponding unencrypted data received by the IoT gateway. Both contain the LoRa transmitter Address ID, payload length (62 bytes in the example), and the payload. The data received by the IoT gateway also has the Received signal strength indicator (RSSI), and Signal-to-noise ratio (SNR).

Receiving Data on IoT Gateway

The Raspberry Pi will act as a LoRa IoT gateway, receiving the environmental sensor data from the IoT device, the Arduino, and sending the data to AWS. The Raspberry Pi runs a Python script,, which will receive the data from the Arduino Sense, decrypt the data, parse the sensor values, and serialize the data to a JSON payload, and finally, transmit the payload in an MQTT-protocol message to AWS. The script uses the pyserial, the Python Serial Port Extension, which encapsulates the access for the serial port for communication with the RYLR896 module. The script uses the AWS IoT Device SDK for Python v2 to communicate with AWS.

import json
import logging
import sys
import threading
import time
from argparse import ArgumentParser
import serial
from awscrt import io, mqtt, auth, http, exceptions
from awsiot import mqtt_connection_builder
# LoRaWAN IoT Sensor Demo
# Using REYAX RYLR896 transceiver modules
# Author: Gary Stafford
# Requirements: python3 -m pip install –user -r requirements.txt
# Usage:
# sh ./ \
# constants
# global variables
count = 0 # from args
received_count = 0
received_all_event = threading.Event()
def main():
# get args
filemode='w', level=logging.DEBUG)
args = get_args() # get args
payload = ""
lora_payload = {}
# set log level
io.init_logging(getattr(io.LogLevel, args.verbosity), 'stderr')
# spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
# set MQTT connection
mqtt_connection = set_mqtt_connection(args, client_bootstrap)
logging.debug("Connecting to {} with client ID '{}'…".format(
args.endpoint, args.client_id))
connect_future = mqtt_connection.connect()
# future.result() waits until a result is available
logging.debug("Connecting to REYAX RYLR896 transceiver module…")
serial_conn = serial.Serial(
if serial_conn.isOpen():
while True:
# read data from serial port
serial_payload = serial_conn.readline()
if len(serial_payload) >= 1:
payload = serial_payload.decode(encoding="utf-8")
payload = payload[:2]
data = parse_payload(payload)
lora_payload = {
"ts": time.time(),
"data": {
"device_id": str(data[0]),
"gateway_id": str(args.gateway_id),
"temperature": float(data[1]),
"humidity": float(data[2]),
"pressure": float(data[3]),
"color": {
"red": float(data[4]),
"green": float(data[5]),
"blue": float(data[6]),
"ambient": float(data[7])
except IndexError:
logging.error("IndexError: {}".format(payload))
except ValueError:
logging.error("ValueError: {}".format(payload))
# publish mqtt message
message_json = json.dumps(
separators=(',', ':'))
except mqtt.SubscribeError as err:
logging.error(".SubscribeError: {}".format(err))
except exceptions.AwsCrtError as err:
logging.error("AwsCrtError: {}".format(err))
def set_mqtt_connection(args, client_bootstrap):
if args.use_websocket:
proxy_options = None
if args.proxy_host:
proxy_options = http.HttpProxyOptions(
host_name=args.proxy_host, port=args.proxy_port)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
mqtt_connection = mqtt_connection_builder.mtls_from_path(
return mqtt_connection
def get_args():
parser = ArgumentParser(
description="Send and receive messages through and MQTT connection.")
parser.add_argument("–tty", required=True,
help="serial tty", default="/dev/ttyAMA0")
parser.add_argument("–baud-rate", required=True,
help="serial baud rate", default=1152000)
parser.add_argument('–endpoint', required=True, help="Your AWS IoT custom endpoint, not including a port. " +
"Ex: \"\"")
parser.add_argument('–cert', help="File path to your client certificate, in PEM format.")
parser.add_argument('–key', help="File path to your private key, in PEM format.")
parser.add_argument('–root-ca', help="File path to root certificate authority, in PEM format. " +
"Necessary if MQTT server uses a certificate that's not already in " +
"your trust store.")
parser.add_argument('–client-id', default='samples-client-id',
help="Client ID for MQTT connection.")
parser.add_argument('–topic', default="samples/test",
help="Topic to subscribe to, and publish messages to.")
parser.add_argument('–message', default="Hello World!", help="Message to publish. " +
"Specify empty string to publish nothing.")
parser.add_argument('–count', default=0, type=int, help="Number of messages to publish/receive before exiting. " +
"Specify 0 to run forever.")
parser.add_argument('–use-websocket', default=False, action='store_true',
help="To use a websocket instead of raw mqtt. If you specify this option you must "
"specify a region for signing, you can also enable proxy mode.")
parser.add_argument('–signing-region', default='us-east-1',
help="If you specify –use-web-socket, this is the region that will be used for computing "
"the Sigv4 signature")
parser.add_argument('–proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " +
"you will likely need to set –root-ca to the ca for your proxy.")
parser.add_argument('–proxy-port', type=int, default=8080,
help="Port for proxy to connect to.")
parser.add_argument('–verbosity', choices=[ for x in io.LogLevel],,
help='Logging level')
parser.add_argument("–gateway-id", help="IoT Gateway serial number")
args = parser.parse_args()
return args
def parse_payload(payload):
# input: +RCV=116,29,0447383033363932003C0034|23.94|37.71|99.89|16|38|53|80,-61,56
# output: [0447383033363932003C0034, 23.94, 37.71, 99.89, 16.0, 38.0, 53.0, 80.0]
payload = payload.split(",")
payload = payload[2].split("|")
payload = [i for i in payload]
return payload
def set_lora_config(serial_conn):
# configures the REYAX RYLR896 transceiver module
serial_conn.write(str.encode("AT+ADDRESS=" + str(ADDRESS) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID=" + str(NETWORK_ID) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network Id set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN=" + PASSWORD + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES-128 password set? {}".format(serial_payload.decode(encoding="utf-8")))
def check_lora_config(serial_conn):
serial_payload = (serial_conn.readline())[:2]
logging.debug("Module responding? {}".format(serial_payload.decode(encoding="utf-8")))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address: {}".format(serial_payload.decode(encoding="utf-8")))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network id: {}".format(serial_payload.decode(encoding="utf-8")))
serial_payload = (serial_conn.readline())[:2]
logging.debug("UART baud rate: {}".format(serial_payload.decode(encoding="utf-8")))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF frequency: {}".format(serial_payload.decode(encoding="utf-8")))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF output power: {}".format(serial_payload.decode(encoding="utf-8")))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Work mode: {}".format(serial_payload.decode(encoding="utf-8")))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF parameters: {}".format(serial_payload.decode(encoding="utf-8")))
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES128 password of the network: {}".format(serial_payload.decode(encoding="utf-8")))
# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
logging.error("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
logging.warning("Connection resumed. return_code: {} session_present: {}".format(
return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
logging.warning("Session did not persist. Resubscribing to existing topics…")
resubscribe_future, _ = connection.resubscribe_existing_topics()
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
logging.warning("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
logging.debug("Received message from topic '{}': {}".format(topic, payload))
global received_count
received_count += 1
if received_count == count:
if __name__ == "__main__":

Running the IoT Gateway Python Script

To run the Python script on the Raspberry Pi, we will use a helper shell script, The shell script helps construct the arguments required to execute the Python script.

# LoRaWAN / AWS IoT Demo
# Author: Gary A. Stafford
# Revised: 2021-03-24
# Start IoT data collector script and tails output
# Usage:
# sh ./
if [[ $# -ne 1 ]]; then
echo "Script requires 1 parameter!"
exit 1
# input parameters
ENDPOINT=$1 # e.g.
DEVICE="lora-iot-gateway-01" # matches CloudFormation thing name
CERTIFICATE="${DEVICE}-cert.pem" # e.g. lora-iot-gateway-01-cert.pem
KEY="${DEVICE}-private.key" # e.g. lora-iot-gateway-01-private.key
GATEWAY_ID=$(< /proc/cpuinfo grep Serial | grep -oh "[a-z0-9]*$") # e.g. 00000000f62051ce
# output for debugging
echo "DEVICE: ${DEVICE}"
echo "KEY: ${KEY}"
# call the python script
nohup python3 \
–endpoint "${ENDPOINT}" \
–cert "${DEVICE}-creds/${CERTIFICATE}" \
–key "${DEVICE}-creds/${KEY}" \
–root-ca "${DEVICE}-creds/AmazonRootCA1.pem" \
–client-id "${DEVICE}" \
–topic "lora-iot-demo" \
–gateway-id "${GATEWAY_ID}" \
–verbosity "Info" \
–tty "/dev/ttyAMA0" \
–baud-rate 115200 \
>collector.log 2>&1 </dev/null &
sleep 2
# tail the log (Control-C to exit)
tail -f collector.log

To run the helper script, we execute the following command, substituting the input parameter, the AWS IoT endpoint, with your endpoint.

sh ./ \

You should see the console output, similar to the following.

The script starts by configuring the RYLR896 module and outputting that configuration to a log file, output.log. If successful, we should see the following debug information logged.

DEBUG:root:Connecting to with client ID 'lora-iot-gateway-01'
DEBUG:root:Connecting to REYAX RYLR896 transceiver module
DEBUG:root:Address set? +OK
DEBUG:root:Network Id set? +OK
DEBUG:root:AES-128 password set? +OK
DEBUG:root:Module responding? +OK
DEBUG:root:Address: +ADDRESS=116
DEBUG:root:Network id: +NETWORKID=6
DEBUG:root:UART baud rate: +IPR=115200
DEBUG:root:RF frequency: +BAND=915000000
DEBUG:root:RF output power: +CRFOP=15
DEBUG:root:Work mode: +MODE=0
DEBUG:root:RF parameters: +PARAMETER=12,7,1,4
DEBUG:root:AES128 password of the network: +CPIN=92A0ECEC9000DA0DCF0CAAB0ABA2E0EF

That sensor data is also written to the log file for debugging purposes. This first line in the log (shown below) is the raw decrypted data received from the IoT device via LoRaWAN. The second line is the JSON-serialized payload, sent securely to AWS, using the MQTT protocol.


DEBUG:root:{'ts': 1598305503.7041512, 'data': {'humidity': 41.89, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 230.0, 'blue': 833.0, 'ambient': 1116.0, 'green': 692.0}}}


DEBUG:root:{'ts': 1598305513.7918658, 'data': {'humidity': 41.63, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 236.0, 'blue': 837.0, 'ambient': 1127.0, 'green': 696.0}}}


DEBUG:root:{'ts': 1598305523.8556132, 'data': {'humidity': 41.57, 'temperature': 23.44, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 232.0, 'blue': 830.0, 'ambient': 1113.0, 'green': 686.0}}}


DEBUG:root:{'ts': 1598305528.8890748, 'data': {'humidity': 41.44, 'temperature': 23.51, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 205.0, 'blue': 802.0, 'ambient': 1040.0, 'green': 658.0}}}

AWS IoT Core

The Raspberry Pi-based IoT gateway will be registered with AWS IoT Core. IoT Core allows users to connect devices quickly and securely to AWS.


According to AWS, IoT Core can reliably scale to billions of devices and trillions of messages. Registered devices are referred to as things in AWS IoT Core. A thing is a representation of a specific device or logical entity. Information about a thing is stored in the registry as JSON data.

Below, we see an example of the Thing created by CloudFormation. The Thing, lora-iot-gateway-01, represents the physical IoT gateway. We have assigned the IoT gateway a Thing Type, LoRaIoTGateway, a Thing Group, LoRaIoTGateways, and a Thing Billing Group, IoTGateways.

In a real IoT environment, containing hundreds, thousands, even millions of IoT devices, gateways, and sensors, these classification mechanisms, Thing Type, Thing Group, and Thing Billing Group, will help to organize IoT assets.

Device Gateway and Message Broker

IoT Core provides a Device Gateway, which manages all active device connections. The Gateway currently supports MQTT, WebSockets, and HTTP 1.1 protocols. Behind the Message Gateway is a high-throughput pub/sub Message Broker, which securely transmits messages to and from all IoT devices and applications with low latency. Below, we see a typical AWS IoT Core architecture containing multiple Topics, Rules, and Actions.

AWS IoT Security

AWS IoT Core provides mutual authentication and encryption, ensuring all data is exchanged between AWS and the devices are secure by default. In the demonstration, all data is sent securely using Transport Layer Security (TLS) 1.2 with X.509 digital certificates on port 443. Below, we see an example of an X.509 certificate assigned to the Thing, lora-iot-gateway-01, which represents the physical IoT gateway. The X.509 certificate and the private key, generated using the AWS CLI, previously, are installed on the IoT gateway.

Authorization of the device to access any resource on AWS is controlled by AWS IoT Core Policies. These policies are similar to AWS IAM Policies. Below, we see an example of an AWS IoT Core Policy, LoRaDevicePolicy, which is assigned to the IoT gateway.

AWS IoT Core Rules

Once an MQTT message is received from the IoT gateway (a thing), we use AWS IoT Rules to send message data to an AWS IoT Analytics Channel. Rules give your devices the ability to interact with AWS services. Rules are analyzed, and Actions are performed based on the MQTT topic stream. Below, we see an example rule that forwards our messages to an IoT Analytics Channel.

Rule query statements are written in standard Structured Query Language (SQL). The datasource for the Rule query is an IoT Topic.

Clientid () AS device,
parse_time ("yyyy-MM-dd'T'HH:mm:ss.SSSZ", timestamp(), "UTC") AS msg_received
view raw iot_rule.sql hosted with ❤ by GitHub

AWS IoT Analytics

AWS IoT Analytics is composed of five primary components: Channels, Pipelines, Data stores, Data sets, and Notebooks. These components enable you to collect, prepare, store, analyze, and visualize your IoT data.

Below, we see a typical AWS IoT Analytics architecture. IoT messages are received from AWS IoT Core, thought a Rule Action. Amazon QuickSight provides business intelligence, visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting.

IoT Analytics Channel

An AWS IoT Analytics Channel pulls messages or data into IoT Analytics from other AWS sources, such as Amazon S3, Amazon Kinesis, or Amazon IoT Core. Channels store data for IoT Analytics Pipelines. Both Channels and Data store support storing data in your own Amazon S3 bucket or an IoT Analytics service-managed S3 bucket. In the demonstration, we are using a service managed S3 bucket.

When creating a Channel, you also decide how long to retain the data. For the demonstration, we have set the data retention period for 21 days. Channels are generally not used for long term storage of data. Typically, you would only retain data in the Channel for the period you need to analyze. For long term storage of IoT message data, I recommend using an AWS IoT Core Rule to send a copy of the raw IoT data to Amazon S3, using a service such as Amazon Kinesis Data Firehose.

IoT Analytics Pipeline

An AWS IoT Analytics Pipeline consumes messages from one or more Channels. Pipelines transform, filter, and enrich the messages before storing them in IoT Analytics Data stores. A Pipeline is composed of an ordered list of activities. Logically, you must specify both a Channel (source) and a Datastore (destination) activity. Optionally, you may choose as many as 23 additional activities in the pipelineActivities array.

In our demonstration’s Pipeline, iot_analytics_pipeline, we have specified three additional activities, including DeviceRegistryEnrich, Filter, and Lambda. Other activity types include Math, SelectAttributes, RemoveAttributes, and AddAttributes.

The Filter activity ensures the sensor values are not Null or otherwise erroneous; if true, the message is dropped. The Lambda Pipeline activity executes an AWS Lambda function to transform the messages in the pipeline. Messages are sent in an event object to the Lambda. The message is modified, and the event object is returned to the activity.

The Python-based Lambda function easily handles typical IoT data transformation tasks, including converting the temperature from Celsius to Fahrenheit, pressure from kilopascals (kPa) to inches of Mercury (inHg), and 12-bit RGBA values to 8-bit color values (0–255). The Lambda function also rounds down all the values to between 0 and 2 decimal places of precision.

def lambda_handler(event, context):
for e in event:
e['temperature'] = round((e['temperature'] * 1.8) + 32, 2)
e['humidity'] = round(e['humidity'], 2)
e['pressure'] = round((e['pressure'] / 3.3864), 2)
e['red'] = int(round(e['red'] / (4097 / 255), 0))
e['green'] = int(round(e['green'] / (4097 / 255), 0))
e['blue'] = int(round(e['blue'] / (4097 / 255), 0))
e['ambient'] = int(round(e['ambient'] / (4097 / 255), 0))
return event

The demonstration’s Pipeline also enriches the IoT data with metadata from the IoT device’s AWS IoT Core Registry. The metadata includes additional information about the device that generated the IoT data, including the custom attributes such as LoRa transceiver manufacturer and model, and the IoT gateway manufacturer.

A notable feature of Pipelines is the ability to reprocess messages. If you make changes to the Pipeline, which often happens during the data preparation stage, you can reprocess any or all the IoT data in the associated Channel, and overwrite the IoT data in the Data set.

IoT Analytics Data store

An AWS IoT Analytics Data store stores prepared data from an AWS IoT Analytics Pipeline, in a fully-managed database. Both Channels and Data store support storing IoT data in your own Amazon S3 bucket or an IoT Analytics managed S3 bucket. In the demonstration, we are using a service-managed S3 bucket to store the IoT data in our Data store, iot_analytics_data_store.

IoT Analytics Data set

An AWS IoT Analytics Data set automatically provides regular, up-to-date insights for data analysts by querying a Data store using standard SQL. Periodic updates are implemented using a cron expression. For the demonstration, we are updating our Data set, iot_analytics_data_set, at a 15-minute interval. The time interval can be increased or reduced, depending on the desired ‘near real-time’ nature of the IoT data being analyzed.

Below, we see messages in the Result preview pane of the Data set. Note the SQL query used to obtain the messages, which queries the Data store. The Data store, as you will recall, contains the transformed messages from the Pipeline.

IoT Analytics Data sets also support sending content results, which are materialized views of your IoT Analytics data, to an Amazon S3 bucket.

The CloudFormation stack created an encrypted Amazon S3 Bucket. This bucket receives a copy of the messages from the IoT Analytics Data set whenever the cron expression runs the scheduled update.

IoT Analytics Notebook

An AWS IoT Analytics Notebook allows users to perform statistical analysis and machine learning on IoT Analytics Data sets using Jupyter Notebooks. The IoT Analytics Notebook service includes a set of notebook templates that contain AWS-authored machine learning models and visualizations. Notebook Instances can be linked to a GitHub or other source code repository. Notebooks created with IoT Analytics Notebook can also be accessed directly through Amazon SageMaker. For the demonstration, the Notebooks Instance is cloned from our project’s GitHub repository.

The repository contains a sample Jupyter Notebook, LoRa_IoT_Analytics_Demo.ipynb, based on the conda_python3 kernel. This preinstalled environment includes the default Anaconda installation and Python 3.

The Notebook uses pandas, matplotlib, and plotly to manipulate and visualize the sample IoT data stored in the IoT Analytics Data set.

The Notebook can be modified, and the changes pushed back to GitHub. You could easily fork the demonstration’s GitHub repository and modify the CloudFormation template to point to your source code repository.

Amazon QuickSight

Amazon QuickSight provides business intelligence (BI) and visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting. We can use Amazon QuickSight to visualize the IoT message data, stored in the IoT Analytics Data set.

Amazon QuickSight has both a Standard and an Enterprise Edition. AWS provides a detailed product comparison of each edition. For the post, I am demonstrating the Enterprise Edition, which includes additional features, such as ML Insights, hourly refreshes of SPICE (super-fast, parallel, in-memory, calculation engine), and theme customization.

Please be aware of the costs of Amazon QuickSight if you choose to follow along with this part of the demo. Although there is an Amazon QuickSight API, Amazon QuickSight is not automatically enabled or configured with CloudFormation or using the AWS CLI in this demonstration.

QuickSight Data Sets

Amazon QuickSight has a wide variety of data source options for creating Amazon QuickSight Data sets, including the ones shown below. Do not confuse Amazon QuickSight Data sets with IoT Analytics Data sets; they are two different service features.

For the demonstration, we will create an Amazon QuickSight Data set that will use our IoT Analytics Data set, iot_analytics_data_set.

Amazon QuickSight gives you the ability to view and modify QuickSight Data sets before visualizing. QuickSight even provides a wide variety of functions, enabling us to perform dynamic calculations on the field values. For this demonstration, we will leave the data unchanged since all transformations were already completed in the IoT Analytics Pipeline.

QuickSight Analysis

Using the QuickSight Data set, built from the IoT Analytics Data set as a data source, we create a QuickSight Analysis. The QuickSight Analysis console is shown below. An Analysis is primarily a collection of Visuals (aka Visual types). QuickSight provides several Visual types. Each visual is associated with a Data set. Data for the QuickSight Analysis or each visual within the Analysis can be filtered. For the demo, I have created a simple QuickSight Analysis, including a few typical QuickSight visuals.

QuickSight Dashboards

To share a QuickSight Analysis, we can create a QuickSight Dashboard. Below, we see a few views of the QuickSight Analysis, shown above, as a Dashboard. Although viewers of the Dashboard cannot edit the visuals, they can apply filtering and interactively drill-down into data in the Visuals.

Amazon QuickSight ML Insights

According to Amazon, ML Insights leverages AWS’s machine learning (ML) and natural language capabilities to gain deeper insights from data. QuickSight’s ML-powered Anomaly Detection continuously analyze data to discover anomalies and variations inside of the aggregates, giving you the insights to act when business changes occur. QuickSight’s ML-powered Forecasting can be used to predict your business metrics accurately, and perform interactive what-if analysis with point-and-click simplicity. QuickSight’s built-in algorithms make it easy for anyone to use ML that learns from your data patterns to provide you with accurate predictions based on historical trends.

Below, we see the ML Insights tab (left) in the demonstration’s QuickSight Analysis. Individually detected anomalies can be added to the QuickSight Analysis, like Visuals, and configured to tune the detection parameters. Observe the temperature, humidity, and barometric pressure anomalies, identified by ML Insights, based on their Anomaly Score, which is higher or lower, given a minimum delta of five percent. These anomalies accurately reflected an actual failure of the IoT device, caused by overheated during testing, which resulted in abnormal sensor readings.

Receiving the Messages on AWS

To confirm the IoT gateway is sending messages, we can use a packet analyzer, like tcpdump, on the IoT gateway. Running tcpdump on the IoT gateway, below, we see outbound encrypted MQTT messages being sent to AWS on port 443.

To confirm those messages are being received from the IoT gateway on AWS, we can use the AWS IoT Core Test feature and subscribe to the lora-iot-demo topic. We should see messages flowing in from the IoT gateway at approximately 5-second intervals.

The JSON payload structure of the incoming MQTT messages will look similar to the below example. The device_id is the unique id of the IoT device that transmitted the message using LoRaWAN. The gateway_id is the unique id of the IoT gateway that received the message using LoRaWAN and sent it to AWS. A single IoT gateway would usually manage messages from multiple IoT devices, each with a unique id.

"data": {
"color": {
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6
"ts": 1598543131.9861386

The SQL query used by the AWS IoT Rule described earlier, transforms and flattens the nested JSON payload structure, before passing it to the AWS IoT Analytics Channel, as shown below.

"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281,
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6,
"ts": 1598543131.9861386,
"msg_received": "2020-08-27T11:45:32.074+0000",
"device": "lora-iot-gateway-01"

We can measure the near real-time nature of the IoT data using the ts and msg_received data fields. The ts data field is date and time when the sensor reading occurred on the IoT device, while the msg_received data field is the date and time when the message was received on AWS. The delta between the two values is a measure of how near real-time the sensor readings are being streamed to the AWS IoT Analytics Channel. In the below example, the difference between ts (2020–08–27T11:45:31.986) and msg_received (2020–08–27T11:45:32.074) is 88 ms.

Final IoT Data Message Structure

Once the message payload passes through the AWS IoT Analytics Pipeline and lands in the AWS IoT Analytics Data set, its final data structure looks as follows. Note that the device’s attribute metadata has been added from the AWS IoT Core device registry. Regrettably, the metadata is not well-formatted JSON and will require additional transformation to be usable.

"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"temperature": 74.48,
"humidity": 45.73,
"pressure": 29.13,
"red": 17,
"green": 42,
"blue": 40,
"ambient": 66,
"ts": 1598543131.9861386,
"device": "lora-iot-gateway-01",
"msg_received": "2020-08-27T15:45:32.024+0000",
"metadata": {
"defaultclientid": "lora-iot-gateway-01",
"thingname": "lora-iot-gateway-01",
"thingid": "017db4b8-7fca-4617-aa58-7125dd94ab36",
"thingarn": "arn:aws:iot:us-east-1:123456789012:thing/lora-iot-gateway-01",
"thingtypename": "LoRaIoTGateway",
"attributes": {
"loramfr": "REYAX",
"gatewaymfr": "RaspberryPiFoundation",
"loramodel": "RYLR896"
"version": "2",
"billinggroupname": "LoRaIoTGateways"
"__dt": "2020-08-27 00:00:00.000"

A set of sample messages is included in the GitHub project’s sample_messages directory.


In this post, we explored the use of the LoRa and LoRaWAN protocols to transmit environmental sensor data from an IoT device to an IoT gateway. Given its low energy consumption, long-distance transmission capabilities, and well-developed protocols, LoRaWAN is an ideal long-range wireless protocol for IoT devices. We then demonstrated how to use AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics to securely collect, analyze, and visualize streaming messages from the IoT device, in near real-time.

This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , ,

Leave a comment

Architecting a Successful SaaS: Understanding Cloud-based SaaS Models

Originally published on the AWS APN Blog.


You’re a startup with an idea for a revolutionary new software product. You quickly build a beta version and deploy it to the cloud. After a successful social-marketing campaign and concerted sales effort, dozens of customers subscribe to your SaaS-based product. You’re ecstatic…until you realize you never architected your product for this level of success. You were so busy coding, raising capital, marketing, and selling, you never planned how you would scale your Sass product. How you would ensure your customer’s security, as well as your own. How you would meet the product reliability, compliance, and performance you promised. And, how you would monitor and meter your customer’s usage, no matter how fast you or they grew.

I’ve often heard budding entrepreneurs jest, if only success was their biggest problem. Certainly, success won’t be their biggest problem. For many, the problems come afterward, when they disappoint their customers by failing to deliver the quality product they promised. Or worse, damaging their customer’s reputation (and their own) by losing or exposing sensitive data. As the old saying goes, ‘you never get a second chance to make a first impression.’ Customer trust is hard-earned and easily lost. Properly architecting a scalable and secure SaaS-based product is just as important as feature development and sales. No one wants to fail on Day 1—you worked too hard to get there.

Architecting a Successful SaaS

In this series of posts, Architecting a Successful SaaS, we will explore how to properly plan and architect a SaaS product offering, designed for hosting on the cloud. We will start by answering basic questions, like, what is SaaS, what are the alternatives to SaaS for software distribution, and what are the most common SaaS product models. We will then examine different high-level SaaS architectures, review tenant isolation strategies, and explore how SaaS vendors securely interact with their customer’s cloud accounts. Finally, we will discuss how SaaS providers can meet established best practices, like those from AWS SaaS Factory and the AWS Well-Architected Framework.

For this post, I have chosen many examples of cloud services from AWS and vendors from AWS Marketplace. However, the principals discussed may be applied to other leading cloud providers, SaaS products, and cloud-based software marketplaces. All information in this post is publicly available.

What is SaaS?

According to AWS Marketplace, ‘SaaS [Software as a Service] is a delivery model for software applications whereby the vendor hosts and operates the application over the Internet. Customers pay for using the software without owning the underlying infrastructure.’ Another definition from AWS, ‘SaaS is a licensing and delivery model whereby software is centrally managed and hosted by a provider and available to customers on a subscription basis.’

A SaaS product, like other forms of software, is produced by what is commonly referred to as an Independent Software Vendor (ISV). According to Wikipedia, an Independent Software Vendor ‘is an organization specializing in making and selling software, as opposed to hardware, designed for mass or niche markets. This is in contrast to in-house software, which is developed by the organization that will use it, or custom software, which is designed or adapted for a single, specific third party. Although ISV-provided software is consumed by end-users, it remains the property of the vendor.’

Although estimates vary greatly, according to The Software as a Service (SaaS) Global Market Report 2020, the global SaaS market was valued at about $134.44B in 2018 and is expected to grow to $220.21B at a compound annual growth rate (CAGR) of 13.1% through 2022. Statista predicts SaaS revenues will grow even faster, forecasting revenues of $266B by 2022, with continued strong positive growth to $346B by 2027.

Cloud-based Usage Models

Let’s start by reviewing the three most common ways that individuals, businesses, academic institutions, the public sector, and government consume services from cloud providers such as Amazon Web Services (AWS), Microsoft Azure, Google Cloud, and IBM Cloud (now includes Red Hat).

Indirect Consumer

Indirect consumers are customers who consume cloud-based SaaS products. Indirect users are often unlikely to know which cloud provider host’s the SaaS products to which they subscribe. Many SaaS products can import and export data, as well as integrate with other SaaS products. Many successful companies run their entire business in the cloud using a combination of SaaS products from multiple vendors.



  • An advertising firm that uses Google G Suite for day-to-day communications and collaboration between its employees and clients.
  • A large automotive parts manufacturer that runs its business using the Workday cloud-based Enterprise Resource Management (ERP) suite.
  • A software security company that uses Zendesk for customer support. They also use the Slack integration for Zendesk to view, create, and take action on support tickets, using Slack channels.
  • A recruiting firm that uses Zoom Meetings & Chat to interview remote candidates. They also use the Zoom integration with Lever recruiting software, to schedule interviews.

Direct Consumer

Direct consumers are customers who use cloud-based Infrastructure as a Service (IaaS) and Platform as a Service (PaaS) services to build and run their software; the DIY (do it yourself) model. The software deployed in the customer’s account may be created by the customer or purchased from a third-party software vendor and deployed within the customer’s cloud account. Direct users may purchase IaaS and PaaS services from multiple cloud providers.



Hybrid Consumers

Hybrid consumers are customers who use a combination of IaaS, PaaS, and SaaS services. Customers often connect multiple IaaS, PaaS, and SaaS services as part of larger enterprise software application platforms.



  • A payroll company that hosts its proprietary payroll software product, using IaaS products like Amazon EC2 and Elastic Load Balancing. In addition, they use an integrated SaaS-based fraud detection product, like Cequence Security CQ botDefense, to ensure the safety and security of payroll customers.
  • An online gaming company that operates its applications using the fully-managed container-based PaaS service, Amazon ECS. To promote their gaming products, they use a SaaS-based marketing product, like Mailchimp Marketing CRM.

Cloud-based Software

Most cloud-based software is sold in one of two ways, Customer-deployed or SaaS. Below, we see a breakdown by the method of product delivery on AWS Marketplace. All items in the chart, except SaaS, represent Customer-deployed products. Serverless applications are available elsewhere on AWS and are not represented in the AWS Marketplace statistics.

AWS Marketplace: All Products – Delivery Methods (February 2020)


An ISV who sells customer-deployed software products to consumers of cloud-based IaaS and PaaS services. Products are installed by the customer, Systems Integrator (SI), or the ISV into the customer’s cloud account. Customer-deployed products are reminiscent of traditional ‘boxed’ software.

Customers typically pay a reoccurring hourly, monthly, or annual subscription fee for the software product, commonly referred to as pay-as-you-go (PAYG). The subscription fee paid to the vendor is in addition to the fees charged to the customer by the cloud service provider for the underlying compute resources on which the customer-deployed product runs in the customer’s cloud account.

Some customer-deployed products may also require a software license. Software licenses are often purchased separately through other channels. Applying a license you already own to a newly purchased product is commonly referred to as bring your own license (BYOL). BYOL is common in larger enterprise customers, who may have entered into an Enterprise License Agreement (ELA) with the ISV.

AWS Marketplace: Customer-deployed Product Subscription Types (February 2020)

Customer-deployed cloud-based software products can take a variety of forms. The most common deliverables include some combination of virtual machines (VMs) such as Amazon Machine Images (AMIs), Docker images, Amazon SageMaker models, or Infrastructure as Code such as AWS CloudFormationHashiCorp Terraform, or Helm Charts. Customers usually pull these deliverables from a vendor’s AWS account or other public or private source code or binary repositories. Below, we see the breakdown of customer-deployed products, by the method of delivery, on AWS Marketplace.

AWS Marketplace: Customer-deployed Product Delivery Methods (February 2020)

Although historically, AMIs have been the predominant method of customer-deployed software delivery, newer technologies, such as Docker images, serverless, SageMaker models, and AWS Data Exchange datasets will continue to grow in this segment. The AWS Serverless Application Repository (SAR), currently contains over 500 serverless applications, not reflected in this chart. AWS appears to be moving toward making it easier to sell serverless software applications in AWS Marketplace, according to one recent post.

Customer-deployed cloud-based software products may require a connection between the installed product and the ISV for product support, license verification, product upgrades, or security notifications.




An ISV who sells SaaS software products to customers. The SaaS product is deployed, managed, and sold by the ISV and hosted by a cloud provider, such as AWS. A SaaS product may or may not interact with a customer’s cloud account. SaaS products are similar to customer-deployed products with respect to their subscription-based fee structure. Subscriptions may be based on a unit of measure, often a period of time. Subscriptions may also be based on the number of users, requests, hosts, or the volume of data.

AWS Marketplace: SaaS Products - Delivery Methods (February 2020)
AWS Marketplace: SaaS Products – Pricing Plans (February 2020)

A significant difference between SaaS products and customer-deployed products is the lack of direct customer costs from the underlying cloud provider. The underlying costs are bundled into the subscription fee for the SaaS product.

Similar to Customer-deployed products, SaaS products target both consumers and businesses. SaaS products span a wide variety of consumer, business, industry-specific, and technical categories. AWS Marketplace offers products from vendors covering eight major categories and over 70 sub-categories.

AWS Marketplace: SaaS Product Categories (February 2020)

SaaS Product Variants

I regularly work with a wide variety of cloud-based software vendors. In my experience, most cloud-based SaaS products fit into one of four categories, based on the primary way a customer interacts with the SaaS product:

  • Stand-alone: A SaaS product that has no interaction with the customer’s cloud account;
  • Data Access: A SaaS product that connects to the customer’s cloud account to only obtain data;
  • Augmentation: A SaaS product that connects to the customer’s cloud account, interacting with and augmenting the customer’s software;
  • Discrete Service: A variation of augmentation, a SaaS product that provides a discrete service or function as opposed to a more complete software product;


A stand-alone SaaS product has no interaction with a customer’s cloud account. Customers of stand-alone SaaS products interact with the product through an interface provided by the SaaS vendor. Many stand-alone SaaS products can import and export customer data, as well as integrate with other cloud-based SaaS products. Stand-alone SaaS products may target consumers, known as Business-to-Consumer (B2C SaaS). They may also target businesses, known as Business-to-Business (B2B SaaS).



Data Access

A SaaS product that connects to a customer’s data sources in their cloud account or on-prem. These SaaS products often fall into the categories of Big Data and Data Analytics, Machine Learning and Artificial Intelligence, and IoT (Internet of Things). Products in these categories work with large quantities of data. Given the sheer quantity of data or real-time nature of the data, importing or manually inputting data directly into the SaaS product, through the SaaS vendor’s user interface is unrealistic. Often, these SaaS products will cache some portion of the customer’s data to reduce customer’s data transfer costs.

Similar to the previous stand-alone SaaS products, customers of these SaaS products interact with the product thought a user interface provided by the SaaS vendor.



  • Zepl provides an enterprise data science analytics platform, which enables data exploration, analysis, and collaboration. Zepl sells its Zepl Science and Analytics Platform SaaS product on AWS Marketplace. The Zepl product provides integration to many types of customer data sources including Snowflake, Amazon S3, Amazon Redshift, Amazon Athena, Google BigQuery, Apache Cassandra (Amazon MCS), and other SQL databases.
  • Sisense provides an enterprise-grade, cloud-native business intelligence and analytics platform, powered by AI. Sisense offers its Sisense Business Intelligence SaaS product on AWS Marketplace. This product lets customers prepare and analyze disparate big datasets using Sisense’s Data Connectors. The wide array of connectors provide connectivity to dozens of different cloud-based and on-prem data sources.
  • Databricks provides a unified data analytics platform, designed for massive-scale data engineering and collaborative data science. Databricks offers its Databricks Unified Analytics Platform SaaS product on AWS Marketplace. Databricks allows customers to interact with data across many different data sources, data storage types, and data types, including batch and streaming.
  • DataRobot provides an enterprise AI platform, which enables global enterprises to collaboratively harness the power of AI. DataRobot sells its DataRobot Automated Machine Learning for AWS SaaS product on AWS Marketplace. Using connectors, like Skyvia’s OData connector, customers can connect their data sources to the DataRobot product.


A SaaS product that interacts with, or augments a customer’s application, which is managed by the customer in their own cloud account. These SaaS products often maintain secure, loosely-coupled, unidirectional or bidirectional connections between the vendor’s SaaS product and the customer’s account. Vendors on AWS often use services like Amazon EventBridgeAWS PrivateLink, VPC Peering, Amazon S3, Amazon Kinesis, Amazon SQS, and Amazon SNS to interact with customer’s accounts and exchange data. Often, these SaaS products fall within the categories of Security, Logging and Monitoring, and DevOps.

Customers of these types of SaaS products generally interact with their own software, as well as the SaaS product thought an interface provided by the SaaS vendor.



  • CloudCheckr provides solutions that enable clients to optimize costs, security, and compliance on leading cloud providers. CloudCheckr sells its Cloud Management Platform SaaS product on AWS Marketplace. CloudCheckr uses an AWS IAM cross-account role and Amazon S3 to exchange data between the customer’s account and their SaaS product.
  • Splunk provides the leading software platform for real-time Operational Intelligence. Splunk sells its Splunk Cloud SaaS product on AWS Marketplace. Splunk Cloud enables rapid application troubleshooting, ensures security and compliance, and provides monitoring of business-critical services in real-time. According to their documentation, Splunk uses a combination of AWS S3, Amazon SQS, and Amazon SNS services to transfer AWS CloudTrail logs from the customer’s accounts to Splunk Cloud.

Discrete Service

Discrete SaaS products are a variation of SaaS augmentation products. Discrete SaaS products provide specific, distinct functionality to a customer’s software application. These products may be an API, data source, or machine learning model, which is often accessed completely through a vendor’s API. The products have a limited or no visual user interface. These SaaS products are sometimes referred to as a ‘Service as a Service’. Discrete SaaS products often fall into the categories of Artificial Intelligence and Machine Learning, Financial Services, Reference Data, and Authentication and Authorization.



AWS Data Exchange

There is a new category of products on AWS Marketplace. Released in November 2019, AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. According to AWS, Data Exchange vendors can publish new data, as well as automatically publish revisions to existing data and notify subscribers. Once subscribed to a data product, customers can use the AWS Data Exchange API to load data into Amazon S3 and then analyze it with a wide variety of AWS analytics and machine learning services.


Data Exchange seems to best fit the description of a customer-deployed product. However, given the nature of the vendor-subscriber relationship, where data may be regularly exchanged—revised and published by the vendor and pulled by the subscriber—I would consider Data Exchange a cloud-based hybrid product.

AWS Data Exchange products are available on AWS Marketplace. The list of qualified data providers is growing and includes Reuters, Foursquare, TransUnion, Pitney Bowes, IMDb, Epsilon, ADP, Dun & Bradstreet, and others. As illustrated below, data sets are available in the categories of financial services, public sector, healthcare, media, telecommunications, and more.

AWS Marketplace: Data Exchange Product Categories (February 2020)



In this first post, we’ve become familiar with the common ways in which customers consume cloud-based IaaS, PaaS, and SaaS products and services. We also explored the different ways in which ISVs sell their software products to customers. In future posts, we will examine different high-level SaaS architectures, review tenant isolation strategies, and explore how SaaS vendors securely interact with their customer’s cloud accounts. Finally, we will discuss how SaaS providers can meet best-practices, like those from AWS SaaS Factory and the AWS Well-Architected Framework.


Here are some great references to learn more about building and managing SaaS products on AWS.

This blog represents my own view points and not of my employer, Amazon Web Services.

, , , , , ,

Leave a comment

Using Amazon Polly Text-to-Speech Service to Expand your Blog’s Audience


Writing a blog about your latest product’s features? Case studies on your latest customer integrations? Opinions and insights into your industry, or your customer’s industries? Well written blog posts not only inform, they can also showcase you or your organization’s experience and expertise. However, if you blog on a regular basis, you know there is a lot of content out there to steal a reader’s interest. Your audience’s interest in your post can be short-lived.

A great way to extend the reach of your posts and maintain interest longer is to produce an audio version. Audio versions can be included along with the written post. Audio versions may also be shared separately on popular services such as YouTube and SoundCloud. Many multi-tasking readers may prefer to listen to a post as opposed to reading. I often listen while commuting, working out, or running.



Amazon Polly’s text-to-speech (TTS) capabilities make it easy to convert a written post to a lifelike, professionally-sounding audio version. In this brief post, we will learn how to use Amazon Polly to convert blog posts to audio.


Preparing Post for Audio

Most posts only require minor modifications to optimize them for conversion to audio.

  • Adding an opening statement to your post, like ‘Audio version of the post’, followed by the post’s title, provides a good way to start the audio. For example, ‘Audio Introduction to: Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker’.
  • If your post includes code, you probably want to exclude those sections. If the post contains a large amount of code, you might consider only creating an audio introduction to the post.
  • If your post contains graphs or charts, which are referenced in the post, I suggest adding text, such as ‘See the Chart’, along with the caption of the graph or chart. For example, ‘See the Chart – AWS Marketplace: Product Delivery Methods (February 2020)’.
  • Create a simple URL for your post and add it to the audio, either at the beginning or end of the post. For example, ‘To read the full version of this post, including code samples, please go to tiny url dot com forward slash streaming warehouse’.

Custom Lexicons

Amazon Polly offers the ability to use custom lexicons, or vocabularies. According to AWS, you can modify the pronunciation of particular words, such as company names, acronyms, foreign words, and neologisms. If you write industry-specific or highly technical blogs, you will find creating a lexicon is probably necessary to ensure your accompanying audio sounds accurate. In my own technical posts, I most often use a custom lexicon file for acronyms and company names. While many acronyms are spelled out, others are not and have unique pronunciations. Likewise, many company names have a unique pronunciation.

Take for example the following acronyms, which I used in my last few posts: PaaS, BYOL, ELA, PAYG, IPv4, IPv6, IAM, ENI. Using the default lexicon of Amazon Polly, we end up with incorrect pronunciations for all these acronyms.

Now listen to the pronunciation of the same acronyms, after we apply a custom lexicon.

Lexicons must conform to the Pronunciation Lexicon Specification (PLS) W3C recommendation. The lexicon files are in XML format. Below is a snippet of a sample lexicon files.

<?xml version="1.0" encoding="UTF-8"?>

view raw
hosted with ❤ by GitHub

Amazon Polly Console

Amazon Polly supports synthesizing speech from either plain text or SSML input. From the Amazon Polly’s Management Console, copy and paste your post’s prepared text into the ‘Plain text’ tab.

Next, choose the Voice Engine. If you are using English, I suggest ‘Neural’. According to AWS, Amazon Polly has a Neural TTS (NTTS) system that can produce even higher quality voices than its standard voices. The NTTS system produces the most natural and human-like text-to-speech voices possible.

Choose your Language and Region. Then, select your Voice; I prefer ‘Joanna’. In my opinion, her voice has a natural, lifelike sound. If you prefer a male voice, ‘Matthew’ is quite natural sounding. Lastly, upload your lexicon file(s).


To start the process, choose ‘Synthesize to S3’. Indicate the S3 bucket you would like the mp3 format audio file, output into. You can also add a prefix to the mp3 files. For most average length posts, the text synthesis process takes less than one minute. To be notified, you can include an Amazon SNS topic ARN. Select ‘Synthesize’.


Polly creates a synthesis task.


The synthesis tasks may be viewed from the ‘S3 synthesis tasks’ tab.


Once the synthesis task is complete, the resulting mp3 audio file may be viewed and downloaded from the S3 Management Console. If you are using a Mac, QuickTime Player works great to review the audio file.



Amazon Polly may also be used from the AWS CLI or using the AWS SDK. In the example below, we have replicated the same operations performed in the Console, this time using the AWS CLI. First, upload your lexicon file(s) using the polly put-lexicon command. Each lexicon can only be up to 4,000 characters in size. Then call the polly start-speech-synthesis-task command to create a synthesis task.

TEXT_FILE_CONTENTS=$(cat path/to/my/blog_text_file.txt)
aws polly put-lexicon \
–name blogvocab \
–content file://path/to/my/blogvocab.pls
aws polly put-lexicon \
–name techterms \
–content file://path/to/my/techterms.pls
aws polly start-speech-synthesis-task \
–engine neural \
–language-code en-US \
–lexicon-names blogvocab techterms \
–output-format mp3 \
–output-s3-bucket-name ${OUTPUT_BUCKET} \
–output-s3-key-prefix ${TOPIC} \
–text-type text \
–voice-id Joanna

view raw
hosted with ❤ by GitHub

The output should look similar to the screengrab, below. The results will be identical to using the Console.


You can check the task’s results using the polly list-speech-synthesis-tasks command.


In this brief post, we saw a great use case for Amazon Polly, converting your written blog posts into audio. Creating audio versions of our blogs is a great way to extend the reach of the post to a potentially new audience and maintain your current audience’s interest a little longer. Amazon Polly has several other features and capabilities to explore.

This blog represents my own view points and not of my employer, Amazon Web Services.

, , , , ,

1 Comment

Streaming Analytics with Data Warehouses, using Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight


Databases are ideal for storing and organizing data that requires a high volume of transaction-oriented query processing while maintaining data integrity. In contrast, data warehouses are designed for performing data analytics on vast amounts of data from one or more disparate sources. In our fast-paced, hyper-connected world, those sources often take the form of continuous streams of web application logs, e-commerce transactions, social media feeds, online gaming activities, financial trading transactions, and IoT sensor readings. Streaming data must be analyzed in near real-time, while often first requiring cleansing, transformation, and enrichment.

In the following post, we will demonstrate the use of Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight to analyze streaming data. We will simulate time-series data, streaming from a set of IoT sensors to Kinesis Data Firehose. Kinesis Data Firehose will write the IoT data to an Amazon S3 Data Lake, where it will then be copied to Redshift in near real-time. In Amazon Redshift, we will enhance the streaming sensor data with data contained in the Redshift data warehouse, which has been gathered and denormalized into a star schema.


In Redshift, we can analyze the data, asking questions like, what is the min, max, mean, and median temperature over a given time period at each sensor location. Finally, we will use Amazon Quicksight to visualize the Redshift data using rich interactive charts and graphs, including displaying geospatial sensor data.


Featured Technologies

The following AWS services are discussed in this post.

Amazon Kinesis Data Firehose

According to Amazon, Amazon Kinesis Data Firehose can capture, transform, and load streaming data into data lakes, data stores, and analytics tools. Direct Kinesis Data Firehose integrations include Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. Kinesis Data Firehose enables near real-time analytics with existing business intelligence (BI) tools and dashboards.

Amazon Redshift

According to Amazon, Amazon Redshift is the most popular and fastest cloud data warehouse. With Redshift, users can query petabytes of structured and semi-structured data across your data warehouse and data lake using standard SQL. Redshift allows users to query and export data to and from data lakes. Redshift can federate queries of live data from Redshift, as well as across one or more relational databases.

Amazon Redshift Spectrum

According to Amazon, Amazon Redshift Spectrum can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables. Redshift Spectrum tables are created by defining the structure for data files and registering them as tables in an external data catalog. The external data catalog can be AWS Glue or an Apache Hive metastore. While Redshift Spectrum is an alternative to copying the data into Redshift for analysis, we will not be using Redshift Spectrum in this post.

Amazon QuickSight

According to Amazon, Amazon QuickSight is a fully managed business intelligence service that makes it easy to deliver insights to everyone in an organization. QuickSight lets users easily create and publish rich, interactive dashboards that include Amazon QuickSight ML Insights. Dashboards can then be accessed from any device and embedded into applications, portals, and websites.

What is a Data Warehouse?

According to Amazon, a data warehouse is a central repository of information that can be analyzed to make better-informed decisions. Data flows into a data warehouse from transactional systems, relational databases, and other sources, typically on a regular cadence. Business analysts, data scientists, and decision-makers access the data through business intelligence tools, SQL clients, and other analytics applications.


Source Code

All the source code for this post can be found on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \


Use the two AWS CloudFormation templates, included in the project, to build two CloudFormation stacks. Please review the two templates and understand the costs of the resources before continuing.

The first CloudFormation template, redshift.yml, provisions a new Amazon VPC with associated network and security resources, a single-node Redshift cluster, and two S3 buckets.

The second CloudFormation template, kinesis-firehose.yml, provisions an Amazon Kinesis Data Firehose delivery stream, associated IAM Policy and Role, and an Amazon CloudWatch log group and two log streams.

Change the REDSHIFT_PASSWORD value to ensure your security. Optionally, change the REDSHIFT_USERNAME value. Make sure that the first stack completes successfully, before creating the second stack.

export AWS_DEFAULT_REGION=us-east-1
# Create resources
aws cloudformation create-stack \
–stack-name redshift-stack \
–template-body file://cloudformation/redshift.yml \
–parameters ParameterKey=MasterUsername,ParameterValue=${REDSHIFT_USERNAME} \
ParameterKey=MasterUserPassword,ParameterValue=${REDSHIFT_PASSWORD} \
ParameterKey=InboundTraffic,ParameterValue=$(curl -s)/32 \
# Wait for first stack to complete
aws cloudformation create-stack \
–stack-name kinesis-firehose-stack \
–template-body file://cloudformation/kinesis-firehose.yml \
–parameters ParameterKey=MasterUserPassword,ParameterValue=${REDSHIFT_PASSWORD} \

Review AWS Resources

To confirm all the AWS resources were created correctly, use the AWS Management Console.

Kinesis Data Firehose

In the Amazon Kinesis Dashboard, you should see the new Amazon Kinesis Data Firehose delivery stream, redshift-delivery-stream.


The Details tab of the new Amazon Kinesis Firehose delivery stream should look similar to the following. Note the IAM Role, FirehoseDeliveryRole, which was created and associated with the delivery stream by CloudFormation.


We are not performing any transformations of the incoming messages. Note the new S3 bucket that was created and associated with the stream by CloudFormation. The bucket name was randomly generated. This bucket is where the incoming messages will be written.


Note the buffer conditions of 1 MB and 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written in JSON format, using GZIP compression, to S3. These are the minimal buffer conditions, and as close to real-time streaming to Redshift as we can get.


Note the COPY command, which is used to copy the messages from S3 to the message table in Amazon Redshift. Kinesis uses the IAM Role, ClusterPermissionsRole, created by CloudFormation, for credentials. We are using a Manifest to copy the data to Redshift from S3. According to Amazon, a Manifest ensures that the COPY command loads all of the required files, and only the required files, for a data load. The Manifests are automatically generated and managed by the Kinesis Firehose delivery stream.


Redshift Cluster

In the Amazon Redshift Console, you should see a new single-node Redshift cluster consisting of one Redshift dc2.large Dense Compute node type.


Note the new VPC, Subnet, and VPC Security Group created by CloudFormation. Also, observe that the Redshift cluster is publicly accessible from outside the new VPC.


Redshift Ingress Rules

The single-node Redshift cluster is assigned to an AWS Availability Zone in the US East (N. Virginia) us-east-1 AWS Region. The cluster is associated with a VPC Security Group. The Security Group contains three inbound rules, all for Redshift port 5439. The IP addresses associated with the three inbound rules provide access to the following: 1) a /27 CIDR block for Amazon QuickSight in us-east-1, a /27 CIDR block for Amazon Kinesis Firehose in us-east-1, and to you, a /32 CIDR block with your current IP address. If your IP address changes or you do not use the us-east-1 Region, you will need to change one or all of these IP addresses. The list of Kinesis Firehose IP addresses is here. The list of QuickSight IP addresses is here.


If you cannot connect to Redshift from your local SQL client, most often, your IP address has changed and is incorrect in the Security Group’s inbound rule.

Redshift SQL Client

You can choose to use the Redshift Query Editor to interact with Redshift or use a third-party SQL client for greater flexibility. To access the Redshift Query Editor, use the user credentials specified in the redshift.yml CloudFormation template.


There is a lot of useful functionality in the Redshift Console and within the Redshift Query Editor. However, a notable limitation of the Redshift Query Editor, in my opinion, is the inability to execute multiple SQL statements at the same time. Whereas, most SQL clients allow multiple SQL queries to be executed at the same time.


I prefer to use JetBrains PyCharm IDE. PyCharm has out-of-the-box integration with Redshift. Using PyCharm, I can edit the project’s Python, SQL, AWS CLI shell, and CloudFormation code, all from within PyCharm.


If you use any of the common SQL clients, you will need to set-up a JDBC (Java Database Connectivity) or ODBC (Open Database Connectivity) connection to Redshift. The ODBC and JDBC connection strings can be found in the Redshift cluster’s Properties tab or in the Outputs tab from the CloudFormation stack, redshift-stack.


You will also need the Redshift database username and password you included in the aws cloudformation create-stack AWS CLI command you executed previously. Below, we see PyCharm’s Project Data Sources window containing a new data source for the Redshift dev database.


Database Schema and Tables

When CloudFormation created the Redshift cluster, it also created a new database, dev. Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL commands to create a new database schema, sensor, and six tables in the sensor schema.

Create new schema in Redshift DB
SET search_path = sensor;
Create (6) tables in Redshift DB
CREATE TABLE message streaming data table
id BIGINT IDENTITY (1, 1), message id
guid VARCHAR(36) NOT NULL, device guid
temp NUMERIC(5, 2) NOT NULL, temperature reading
created TIMESTAMP DEFAULT ('now'::text)::timestamp with time zone row created at
CREATE TABLE location dimension table
long NUMERIC(10, 7) NOT NULL, longitude
lat NUMERIC(10, 7) NOT NULL, latitude
description VARCHAR(256) location description
CREATE TABLE history dimension table
serviced BIGINT NOT NULL, service date
technician_id INTEGER NOT NULL, technician id
notes VARCHAR(256) notes
CREATE TABLE sensor dimension table
guid VARCHAR(36) NOT NULL, device guid
mac VARCHAR(18) NOT NULL, mac address
sku VARCHAR(18) NOT NULL, product sku
upc VARCHAR(12) NOT NULL, product upc
active BOOLEAN DEFAULT TRUE, active status
notes VARCHAR(256) notes
CREATE TABLE manufacturer dimension table
name VARCHAR(100) NOT NULL, company name
website VARCHAR(100) NOT NULL, company website
notes VARCHAR(256) notes
CREATE TABLE sensors fact table
sensor_id INTEGER NOT NULL, sensor id
manufacturer_id INTEGER NOT NULL, manufacturer id
location_id INTEGER NOT NULL, location id
history_id BIGINT NOT NULL, history id
message_guid VARCHAR(36) NOT NULL sensor guid

Star Schema

The tables represent denormalized data, taken from one or more relational database sources. The tables form a star schema.  The star schema is widely used to develop data warehouses. The star schema consists of one or more fact tables referencing any number of dimension tables. The location, manufacturer, sensor, and history tables are dimension tables. The sensors table is a fact table.

In the diagram below, the foreign key relationships are virtual, not physical. The diagram was created using PyCharm’s schema visualization tool. Note the schema’s star shape. The message table is where the streaming IoT data will eventually be written. The message table is related to the sensors fact table through the common guid field.


Sample Data to S3

Next, copy the sample data, included in the project, to the S3 data bucket created with CloudFormation. Each CSV-formatted data file corresponds to one of the tables we previously created. Since the bucket name is semi-random, we can use the AWS CLI and jq to get the bucket name, then use it to perform the copy commands.

# Get data bucket name
DATA_BUCKET=$(aws cloudformation describe-stacks \
–stack-name redshift-stack \
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "DataBucket") | .OutputValue')
# Copy data
aws s3 cp data/history.csv s3://${DATA_BUCKET}/history/history.csv
aws s3 cp data/location.csv s3://${DATA_BUCKET}/location/location.csv
aws s3 cp data/manufacturer.csv s3://${DATA_BUCKET}/manufacturer/manufacturer.csv
aws s3 cp data/sensor.csv s3://${DATA_BUCKET}/sensor/sensor.csv
aws s3 cp data/sensors.csv s3://${DATA_BUCKET}/sensors/sensors.csv

The output from the AWS CLI should look similar to the following.


Sample Data to Redshift

Whereas a relational database, such as Amazon RDS is designed for online transaction processing (OLTP), Amazon Redshift is designed for online analytic processing (OLAP) and business intelligence applications. To write data to Redshift we typically use the COPY command versus frequent, individual INSERT statements, as with OLTP, which would be prohibitively slow. According to Amazon, the Redshift COPY command leverages the Amazon Redshift massively parallel processing (MPP) architecture to read and load data in parallel from files on Amazon S3, from a DynamoDB table, or from text output from one or more remote hosts.

In the following series of SQL statements, replace the placeholder, your_bucket_name, in five places with your S3 data bucket name. The bucket name will start with the prefix, redshift-stack-databucket. The bucket name can be found in the Outputs tab of the CloudFormation stack, redshift-stack. Next, replace the placeholder, cluster_permissions_role_arn, with the ARN (Amazon Resource Name) of the ClusterPermissionsRole. The ARN is formatted as follows, arn:aws:iam::your-account-id:role/ClusterPermissionsRole. The ARN can be found in the Outputs tab of the CloudFormation stack, redshift-stack.

Using the Redshift Query Editor or your SQL client of choice, execute the SQL statements to copy the sample data from S3 to each of the corresponding tables in the Redshift dev database. The TRUNCATE commands guarantee there is no previous sample data present in the tables.

** MUST FIRST CHANGE your_bucket_name and cluster_permissions_role_arn **
sensor schema
SET search_path = sensor;
Copy sample data to tables from S3
COPY history (id, serviced, action, technician_id, notes)
FROM 's3://your_bucket_name/history/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
COPY location (id, long, lat, description)
FROM 's3://your_bucket_name/location/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
COPY sensor (id, guid, mac, sku, upc, active, notes)
FROM 's3://your_bucket_name/sensor/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
TRUNCATE TABLE manufacturer;
COPY manufacturer (id, name, website, notes)
FROM 's3://your_bucket_name/manufacturer/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
COPY sensors (sensor_id, manufacturer_id, location_id, history_id, message_guid)
FROM 's3://your_bucket_name/sensors/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
SELECT COUNT(*) FROM history; 30
SELECT COUNT(*) FROM location; 6
SELECT COUNT(*) FROM sensor; 6
SELECT COUNT(*) FROM manufacturer; 1
SELECT COUNT(*) FROM sensors; 30

Database Views

Next, create four Redshift database Views. These views may be used to analyze the data in Redshift, and later, in Amazon QuickSight.

  1. sensor_msg_detail: Returns aggregated sensor details, using the sensors fact table and all five dimension tables in a SQL Join.
  2. sensor_msg_count: Returns the number of messages received by Redshift, for each sensor.
  3. sensor_avg_temp: Returns the average temperature from each sensor, based on all the messages received from each sensor.
  4. sensor_avg_temp_current: View is identical for the previous view but limited to the last 30 minutes.

Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL statements.

sensor schema
SET search_path = sensor;
View 1: Sensor details
DROP VIEW IF EXISTS sensor_msg_detail;
CREATE OR REPLACE VIEW sensor_msg_detail AS
SELECT ('1970-01-01'::date + e.ts * interval '1 second') AS recorded,
l.description AS location,
('1970-01-01'::date + h.serviced * interval '1 second') AS installed,
e.created AS redshift
FROM sensors f
INNER JOIN sensor s ON (f.sensor_id =
INNER JOIN history h ON (f.history_id =
INNER JOIN location l ON (f.location_id =
INNER JOIN manufacturer m ON (f.manufacturer_id =
INNER JOIN message e ON (f.message_guid = e.guid)
AND h.action = 'INSTALLED'
View 2: Message count per sensor
DROP VIEW IF EXISTS sensor_msg_count;
CREATE OR REPLACE VIEW sensor_msg_count AS
SELECT count(e.temp) AS msg_count,
l.description AS location
FROM sensors f
INNER JOIN sensor s ON (f.sensor_id =
INNER JOIN history h ON (f.history_id =
INNER JOIN location l ON (f.location_id =
INNER JOIN message e ON (f.message_guid = e.guid)
AND h.action = 'INSTALLED'
GROUP BY s.guid, l.description,, l.long
ORDER BY msg_count, s.guid;
View 3: Average temperature per sensor (all data)
DROP VIEW IF EXISTS sensor_avg_temp;
SELECT avg(e.temp) AS avg_temp,
count(s.guid) AS msg_count,
l.description AS location
FROM sensors f
INNER JOIN sensor s ON (f.sensor_id =
INNER JOIN history h ON (f.history_id =
INNER JOIN location l ON (f.location_id =
INNER JOIN message e ON (f.message_guid = e.guid)
AND h.action = 'INSTALLED'
GROUP BY s.guid, l.description,, l.long
ORDER BY avg_temp, s.guid;
View 4: Average temperature per sensor (last 30 minutes)
DROP VIEW IF EXISTS sensor_avg_temp_current;
CREATE OR REPLACE VIEW sensor_avg_temp_current AS
SELECT avg(e.temp) AS avg_temp,
count(s.guid) AS msg_count,
l.description AS location
FROM sensors f
INNER JOIN sensor s ON (f.sensor_id =
INNER JOIN history h ON (f.history_id =
INNER JOIN location l ON (f.location_id =
INNER JOIN (SELECT ('1970-01-01'::date + ts * interval '1 second') AS recorded_time,
FROM message
WHERE DATEDIFF(minute, recorded_time, GETDATE()) <= 30) e ON (f.message_guid = e.guid)
AND h.action = 'INSTALLED'
GROUP BY s.guid, l.description,, l.long
ORDER BY avg_temp, s.guid;

At this point, you should have a total of six tables and four views in the sensor schema of the dev database in Redshift.

Test the System

With all the necessary AWS resources and Redshift database objects created and sample data in the Redshift database, we can test the system. The included Python script,, will generate a single test message and send it to Kinesis Data Firehose. If everything is working, the message should be delivered from Kinesis Data Firehose to S3, then copied to Redshift, and appear in the message table.

Install the required Python packages and then execute the Python script.

# Install required Python packages
python3 -m pip install –user -r scripts/requirements.txt
# Set default AWS Region for script
export AWS_DEFAULT_REGION=us-east-1
# Execute script in foreground
python3 ./scripts/

Run the following SQL query to confirm the record is in the message table of the dev database. It will take at least one minute for the message to appear in Redshift.


Once the message is confirmed to be present in the message table, delete the record by truncating the table.


Streaming Data

Assuming the test message worked, we can proceed with simulating the streaming IoT sensor data. The included Python script,, creates six concurrent threads, representing six temperature sensors.

#!/usr/bin/env python3
# Simulated multiple streaming time-series iot sensor data
# Author: Gary A. Stafford
# Date: Revised October 2020
import json
import random
from datetime import datetime
import boto3
import time as tm
import numpy as np
import threading
STREAM_NAME = 'redshift-delivery-stream'
client = boto3.client('firehose')
class MyThread(threading.Thread):
def __init__(self, thread_id, sensor_guid, temp_max):
self.thread_id = thread_id
self.sensor_id = sensor_guid
self.temp_max = temp_max
def run(self):
print("Starting Thread: " + str(self.thread_id))
print("Exiting Thread: " + str(self.thread_id))
def create_data(self):
start = 0
stop = 20
step = 0.1 # step size (e.g 0 to 20, step .1 = 200 steps in cycle)
repeat = 2 # how many times to repeat cycle
freq = 60 # frequency of temperature reading in seconds
max_range = int(stop * (1 / step))
time = np.arange(start, stop, step)
amplitude = np.sin(time)
for x in range(0, repeat):
for y in range(0, max_range):
temperature = round((((amplitude[y] + 1.0) * self.temp_max) + random.uniform(5, 5)) + 60, 2)
payload = {
'guid': self.sensor_id,
'ts': int('%s')),
'temp': temperature
def send_to_kinesis(payload):
_ = client.put_record(
'Data': json.dumps(payload)
def main():
sensor_guids = [
timeout = 300 # arbitrarily offset the start of threads (60 / 5 = 12)
# Create new threads
thread1 = MyThread(1, sensor_guids[0], 25)
thread2 = MyThread(2, sensor_guids[1], 10)
thread3 = MyThread(3, sensor_guids[2], 7)
thread4 = MyThread(4, sensor_guids[3], 30)
thread5 = MyThread(5, sensor_guids[4], 5)
thread6 = MyThread(6, sensor_guids[5], 12)
# Start new threads
tm.sleep(timeout * 1)
tm.sleep(timeout * 2)
tm.sleep(timeout * 1)
tm.sleep(timeout * 3)
tm.sleep(timeout * 2)
# Wait for threads to terminate
print("Exiting Main Thread")
if __name__ == '__main__':

The simulated data uses an algorithm that follows an oscillating sine wave or sinusoid, representing rising and falling temperatures. In the script, I have configured each thread to start with an arbitrary offset to add some randomness to the simulated data.


The variables within the script can be adjusted to shorten or lengthen the time it takes to stream the simulated data. By default, each of the six threads creates 400 messages per sensor, in one-minute increments. Including the offset start of each proceeding thread, the total runtime of the script is about 7.5 hours to generate 2,400 simulated IoT sensor temperature readings and push to Kinesis Data Firehose. Make sure you can guarantee you will maintain a connection to the Internet for the entire runtime of the script. I normally run the script in the background, from a small EC2 instance.

To use the Python script, execute either of the two following commands. Using the first command will run the script in the foreground. Using the second command will run the script in the background.

# Install required Python packages
python3 -m pip install –user -r scripts/requirements.txt
# Set default AWS Region for script
export AWS_DEFAULT_REGION=us-east-1
# Option #1: Execute script in foreground
python3 ./scripts/
# Option #2: execute script in background
nohup python3 -u ./scripts/ > output.log 2>&1 </dev/null &
# Check that the process is running
ps -aux | grep 'python3 -u ./scripts/'
# Wait 1-2 minutes, then check output to confirm script is working
cat output.log

Viewing the output.log file, you should see messages being generated on each thread and sent to Kinesis Data Firehose. Each message contains the GUID of the sensor, a timestamp, and a temperature reading.

Screen Shot 2020-10-14 at 10.00.37 AM

The messages are sent to Kinesis Data Firehose, which in turn writes the messages to S3. The messages are written in JSON format using GZIP compression. Below, we see an example of the GZIP compressed JSON files in S3. The JSON files are partitioned by year, month, day, and hour.


Confirm Data Streaming to Redshift

From the Amazon Kinesis Firehose Console Metrics tab, you should see incoming messages flowing to S3 and on to Redshift.


Executing the following SQL query should show an increasing number of messages.


How Near Real-time?

Earlier, we saw how the Amazon Kinesis Data Firehose delivery stream was configured to buffer data at the rate of 1 MB or 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written to S3. Each record in the message table has two timestamps. The first timestamp, ts, is when the temperature reading was recorded. The second timestamp, created, is when the message was written to Redshift, using the COPY command. We can calculate the delta in seconds between the two timestamps using the following SQL query in Redshift.

SELECT ('1970-01-01'::date + ts * interval '1 second') AS recorded_time,
created AS redshift_time,
DATEDIFF(seconds, recorded_time, redshift_time) AS diff_seconds
FROM message
ORDER BY diff_seconds;

Using the results of the Redshift query, we can visualize the results in Amazon QuickSight. In my own tests, we see that for 2,400 messages, over approximately 7.5 hours, the minimum delay was 1 second, and a maximum delay was 64 seconds. Hence, near real-time, in this case, is about one minute or less, with an average latency of roughly 30 seconds.


Analyzing the Data with Redshift

I suggest waiting at least thirty minutes for a significant number of messages copied into Redshift. With the data streaming into Redshift, execute each of the database views we created earlier. You should see the streaming message data, joined to the existing static data in Redshift. As data continues to stream into Redshift, the views will display different results based on the current message table contents.

Here, we see the first ten results of the sensor_msg_detail view.

recorded temp guid sku mac lat long location installed redshift
2020-03-04 03:31:59.000000 105.56 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:33:01.580147
2020-03-04 03:29:59.000000 95.93 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:31:01.388887
2020-03-04 03:26:58.000000 91.93 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:28:01.099796
2020-03-04 03:25:58.000000 88.70 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:26:00.196113
2020-03-04 03:22:58.000000 87.65 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:23:01.558514
2020-03-04 03:20:58.000000 77.35 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:21:00.691347
2020-03-04 03:16:57.000000 71.84 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:17:59.307510
2020-03-04 03:15:57.000000 72.35 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:15:59.813656
2020-03-04 03:14:57.000000 67.95 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:15:59.813656

Next, we see the results of the sensor_avg_temp view.

avg_temp guid lat long location
65.25 dbc05806-6872-4f0a-aca2-f794cc39bd9b 37.7066541 -122.4181399 Wafer Inspection Lab #0210A
67.23 d120422d-5789-435d-9dc6-73d8489b04c2 37.7072686 -122.4187016 Zone 4 Wafer Processing Area B3
70.23 fa565921-fddd-4bfb-a7fd-d617f816df4b 37.7071763 -122.4190397 Research Lab #2209
72.22 f9ade639-f936-4954-aa5a-1f2ed86c9bcf 37.7067618 -122.4186191 Wafer Inspection Lab #0211C
85.48 03e39872-e105-4be4-83c0-9ade818465dc 37.7068476 -122.4191599 Research Lab #2203
90.69 93238559-4d55-4b2a-bdcb-6aa3be0f3908 37.7070334 -122.4184393 Zone 2 Semiconductor Assembly Area A2

Amazon QuickSight

In a recent post, Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 2, I detailed getting started with Amazon QuickSight. In this post, I will assume you are familiar with QuickSight.

Amazon recently added a full set of aws quicksight APIs for interacting with QuickSight. Though, for this part of the demonstration, we will be working directly in the Amazon QuickSight Console, as opposed to the AWS CLI, AWS CDK, or CloudFormation.

Redshift Data Sets

To visualize the data from Amazon Redshift, we start by creating Data Sets in QuickSight. QuickSight supports a large number of data sources for creating data sets. We will use the Redshift data source. If you recall, we added an inbound rule for QuickSight, allowing us to connect to our Redshift cluster in us-east-1.


We will select the sensor schema, which is where the tables and views for this demonstration are located.


We can choose any of the tables or views in the Redshift dev database that we want to use for visualization.


Below, we see examples of two new data sets, shown in the QuickSight Data Prep Console. Note how QuickSight automatically recognizes field types, including dates, latitude, and longitude.




Using the data sets, QuickSight allows us to create a wide number of rich visualizations. Below, we see the simulated time-series data from the six temperature sensors.


Next, we see an example of QuickSight’s ability to show geospatial data. The Map shows the location of each sensor and the average temperature recorded by that sensor.


Cleaning Up

To remove the resources created for this post, use the following series of AWS CLI commands.

# Get data bucket name
DATA_BUCKET=$(aws cloudformation describe-stacks \
–stack-name redshift-stack \
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "DataBucket") | .OutputValue')
# Get log bucket name
LOG_BUCKET=$(aws cloudformation describe-stacks \
–stack-name redshift-stack \
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "LogBucket") | .OutputValue')
echo ${LOG_BUCKET}
# Delete demonstration resources
python3 ./scripts/
aws cloudformation delete-stack –stack-name kinesis-firehose-stack
# Wait for first stack to be deleted
aws cloudformation delete-stack –stack-name redshift-stack


In this brief post, we have learned how streaming data can be analyzed in near real-time, in Amazon Redshift, using Amazon Kinesis Data Firehose. Further, we explored how the results of those analyses can be visualized in Amazon QuickSight. For customers that depend on a data warehouse for data analytics, but who also have streaming data sources, the use of Amazon Kinesis Data Firehose or Amazon Redshift Spectrum is an excellent choice.

This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , , ,

Leave a comment

Event-driven, Serverless Architectures with AWS Lambda, SQS, DynamoDB, and API Gateway


In this post, we will explore modern application development using an event-driven, serverless architecture on AWS. To demonstrate this architecture, we will integrate several fully-managed services, all part of the AWS Serverless Computing platform, including Lambda, API Gateway, SQS, S3, and DynamoDB. The result will be an application composed of small, easily deployable, loosely coupled, independently scalable, serverless components.

What is ‘Event-Driven’?

According to Otavio Ferreira, Manager, Amazon SNS, and James Hood, Senior Software Development Engineer, in their AWS Compute Blog, Enriching Event-Driven Architectures with AWS Event Fork Pipelines, “Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.” This description of an event-driven architecture perfectly captures the essence of the following post. All interactions between application components in this post will be as a direct result of triggering an event.

What is ‘Serverless’?

Mistakingly, many of us think of serverless as just functions (aka Function-as-a-Service or FaaS). When it comes to functions on AWS, Lambda is just one of many fully-managed services that make up the AWS Serverless Computing platform. So, what is ‘serverless’? According to AWS, “Serverless applications don’t require provisioning, maintaining, and administering servers for backend components such as compute, databases, storage, stream processing, message queueing, and more.

As a Developer, one of my favorite features of serverless is the cost, or lack thereof. With serverless on AWS, you pay for consistent throughput or execution duration rather than by server unit, and, at least on AWS, you don’t pay for idle resources. This is not always true of ‘serverless’ offerings on other leading Cloud platforms. Remember, if you’re paying for it but not using it, it’s not serverless.

If you’re paying for it but not using it, it’s not serverless.


To demonstrate an event-driven, serverless architecture, we will build, package, and deploy an application capable of extracting messages from CSV files placed in S3, transforming those messages, queueing them to SQS, and finally, writing the messages to DynamoDB, using Lambda functions throughout. We will also expose a RESTful API, via API Gateway, to perform CRUD-like operations on those messages in DynamoDB.

AWS Technologies

In this demonstration, we will use several AWS serverless services, including the following.

Each Lambda will use function-specific execution roles, part of AWS Identity and Access Management (IAM). We will log the event details and monitor services using Amazon CloudWatch.

To codify, build, package, deploy, and manage the Lambda functions and other AWS resources in a fully automated fashion, we will also use the following AWS services:


The high-level architecture for the platform provisioned and deployed in this post is illustrated in the diagram below. There are two separate workflows. In the first workflow (top), data is extracted from CSV files placed in S3, transformed, queued to SQS, and written to DynamoDB, using Python-based Lambda functions throughout. In the second workflow (bottom), data is manipulated in DynamoDB through interactions with a RESTful API, exposed via an API Gateway, and backed by Node.js-based Lambda functions.


Using the vast array of current AWS services, there are several ways we could extract, transform, and load data from static files into DynamoDB. The demonstration’s event-driven, serverless architecture represents just one possible approach.

Source Code

All source code for this post is available on GitHub in a single public repository, serverless-sqs-dynamo-demo. To clone the GitHub repository, execute the following command.

git clone --branch master --single-branch --depth 1 --no-tags \

The project files relevant to this demonstration are organized as follows.

├── lambda_apigtw_to_dynamodb
│   ├── app.js
│   ├── events
│   ├── node_modules
│   ├── package.json
│   └── tests
├── lambda_s3_to_sqs
│   ├──
│   ├──
│   ├── requirements.txt
│   └── tests
├── lambda_sqs_to_dynamodb
│   ├──
│   ├──
│   ├── requirements.txt
│   └── tests
├── requirements.txt
├── template.yaml
└── sample_data
    ├── data.csv
    ├── data_bad_msg.csv
    └── data_good_msg.csv

Some source code samples in this post are GitHub Gists, which may not display correctly on all social media browsers, such as LinkedIn.


The demonstration assumes you already have an AWS account. You will need the latest copy of the AWS CLI, SAM CLI, and Python 3 installed on your development machine.

Additionally, you will need two existing S3 buckets. One bucket will be used to store the packaged project files for deployment. The second bucket is where we will place CSV data files, which, in turn, will trigger events that invoke multiple Lambda functions.

Deploying the Project

Before diving into the code, we will deploy the project to AWS. Conveniently, the entire project’s resources are codified in an AWS SAM template. We are using the AWS Serverless Application Model (SAM). AWS SAM is a model used to define serverless applications on AWS. According to the official SAM GitHub project documentation, AWS SAM is based on AWS CloudFormation. A serverless application is defined in a CloudFormation template and deployed as a CloudFormation stack.

Template Parameter

CloudFormation will create and uniquely name the SQS queues and the DynamoDB table. However, to avoid circular references, a common issue when creating resources associated with S3 event notifications, it is easier to use a pre-existing bucket. To start, you will need to change the SAM template’s DataBucketName parameter’s default value to your own S3 bucket name. Again, this bucket is where we will eventually push the CSV data files. Alternately, override the default values using the sam build command, next.

    Type: String
    Description: S3 bucket where CSV files are processed
    Default: your-data-bucket-name

SAM CLI Commands

With the DataBucketName parameter set, proceed to validate, build, package, and deploy the project using the SAM CLI and the commands below. In addition to the sam validate command, I also like to use the aws cloudformation validate-template command to validate templates and catch any potential, additional errors.

Note the S3_BUCKET_BUILD variable, below, refers to the name of the S3 bucket SAM will use package and deploy the project from, as opposed to the S3 bucket, which the CSV data files will be placed into (gist).

# variables
# validate
sam validate –template template.yaml
aws cloudformation validate-template \
–template-body file://template.yaml
# build
sam build –template template.yaml
# package
sam package \
–output-template-file packaged.yaml \
–s3-bucket $S3_BUILD_BUCKET