Posts Tagged AWS

Getting Started with Presto Federated Queries using Ahana’s PrestoDB Sandbox on AWS

Introduction

According to The Presto Foundation, Presto (aka PrestoDB), not to be confused with PrestoSQL, is an open-source, distributed, ANSI SQL compliant query engine. Presto is designed to run interactive ad-hoc analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto is used in production at an immense scale by many well-known organizations, including Facebook, Twitter, Uber, Alibaba, Airbnb, Netflix, Pinterest, Atlassian, Nasdaq, and more.

In the following post, we will gain a better understanding of Presto’s ability to execute federated queries, which join multiple disparate data sources without having to move the data. Additionally, we will explore Apache Hive, the Hive Metastore, Hive partitioned tables, and the Apache Parquet file format.

Presto on AWS

There are several options for Presto on AWS. AWS recommends Amazon EMR and Amazon Athena. Presto comes pre-installed on EMR 5.0.0 and later. The Athena query engine is a derivation of Presto 0.172 and does not support all of Presto’s native features. However, Athena has many comparable features and deep integrations with other AWS services. If you need full, fine-grain control, you could deploy and manage Presto, yourself, on Amazon EC2, Amazon ECS, or Amazon EKS. Lastly, you may decide to purchase a Presto distribution with commercial support from an AWS Partner, such as Ahana or Starburst. If your organization needs 24x7x365 production-grade support from experienced Presto engineers, this is an excellent choice.

Federated Queries

In a modern Enterprise, it is rare to find all data living in a monolithic datastore. Given the multitude of available data sources, internal and external to an organization, and the growing number of purpose-built databases, analytics engines must be able to join and aggregate data across many sources efficiently. AWS defines a federated query as a capability that ‘enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources.

Presto allows querying data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, and MongoDB. In fact, there are currently 24 different Presto data source connectors available. With Presto, we can write queries that join multiple disparate data sources, without moving the data. Below is a simple example of a Presto federated query statement that correlates a customer’s credit rating with their age and gender. The query federates two different data sources, a PostgreSQL database table, postgresql.public.customer, and an Apache Hive Metastore table, hive.default.customer_demographics, whose underlying data resides in Amazon S3.

WITH credit_demographics AS (
SELECT
(year (now()) c_birth_year) AS age,
cd_credit_rating AS credit_rating,
cd_gender AS gender,
count(cd_gender) AS gender_count
FROM
postgresql.public.customer
LEFT JOIN hive.default.customer_demographics ON c_current_cdemo_sk = cd_demo_sk
WHERE
c_birth_year IS NOT NULL
AND cd_credit_rating IS NOT NULL
AND lower(cd_credit_rating) != 'unknown'
AND cd_gender IS NOT NULL
GROUP BY
cd_credit_rating,
c_birth_year,
cd_gender
)
SELECT
age,
credit_rating,
gender,
gender_count
FROM
credit_demographics
WHERE
age BETWEEN 21 AND 65
ORDER BY
age,
credit_rating,
gender;

Ahana

The Linux Foundation’s Presto Foundation member, Ahana, was founded as the first company focused on bringing PrestoDB-based ad hoc analytics offerings to market and working to foster growth and evangelize the Presto community. Ahana’s mission is to simplify ad hoc analytics for organizations of all shapes and sizes. Ahana has been successful in raising seed funding, led by GV (formerly Google Ventures). Ahana’s founders have a wealth of previous experience in tech companies, including Alluxio, Kinetica, Couchbase, IBM, Apple, Splunk, and Teradata.

PrestoDB Sandbox

This post will use Ahana’s PrestoDB Sandbox, an Amazon Linux 2, AMI-based solution available on AWS Marketplace, to execute Presto federated queries.

Ahana’s PrestoDB Sandbox AMI allows you to easily get started with Presto to query data wherever your data resides. This AMI configures a single EC2 instance Sandbox to be both the Presto Coordinator and a Presto Worker. It comes with an Apache Hive Metastore backed by PostgreSQL bundled in. In addition, the following catalogs are bundled in to try, test, and prototype with Presto:

  • JMX: useful for monitoring and debugging Presto
  • Memory: stores data and metadata in RAM, which is discarded when Presto restarts
  • TPC-DS: provides a set of schemas to support the TPC Benchmark DS
  • TPC-H: provides a set of schemas to support the TPC Benchmark H

Apache Hive

In this demonstration, we will use Apache Hive and an Apache Hive Metastore backed by PostgreSQL. Apache Hive is data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. The structure can be projected onto data already in storage. A command-line tool and JDBC driver are provided to connect users to Hive. The Metastore provides two essential features of a data warehouse: data abstraction and data discovery. Hive accomplishes both features by providing a metadata repository that is tightly integrated with the Hive query processing system so that data and metadata are in sync.

Getting Started

To get started creating federated queries with Presto, we first need to create and configure our AWS environment, as shown below.

Architecture of the demonstration’s AWS environment and resources

Subscribe to Ahana’s PrestoDB Sandbox

To start, subscribe to Ahana’s PrestoDB Sandbox on AWS Marketplace. Make sure you are aware of the costs involved. The AWS current pricing for the default, Linux-based r5.xlarge on-demand EC2 instance hosted in US East (N. Virginia) is USD 0.252 per hour. For the demonstration, since performance is not an issue, you could try a smaller EC2 instance, such as r5.large instance costs USD 0.126 per hour.

The configuration process will lead you through the creation of an EC2 instance based on Ahana’s PrestoDB Sandbox AMI.

I chose to create the EC2 instance in my default VPC. Part of the demonstration includes connecting to Presto locally using JDBC. Therefore, it was also necessary to include a public IP address for the EC2 instance. If you chose to do so, I strongly recommend limiting the required ports 22 and 8080 in the instance’s Security Group to just your IP address (a /32 CIDR block).

Limiting access to ports 22 and 8080 from only my current IP address

Lastly, we need to assign an IAM Role to the EC2 instance, which has access to Amazon S3. I assigned the AWS managed policy, AmazonS3FullAccess, to the EC2’s IAM Role.

Attaching the AmazonS3FullAccess AWS managed policy to the Role

Part of the configuration also asks for a key pair. You can use an existing key or create a new key for the demo. For reference in future commands, I am using a key named ahana-presto and my key path of ~/.ssh/ahana-presto.pem. Be sure to update the commands to match your own key’s name and location.

Once complete, instructions for using the PrestoDB Sandbox EC2 are provided.

You can view the running EC2 instance, containing Presto, from the web-based AWS EC2 Management Console. Make sure to note the public IPv4 address or the public IPv4 DNS address as this value will be required during the demo.

AWS CloudFormation

We will use Amazon RDS for PostgreSQL and Amazon S3 as additional data sources for Presto. Included in the project files on GitHub is an AWS CloudFormation template, cloudformation/presto_ahana_demo.yaml. The template creates a single RDS for PostgreSQL instance in the default VPC and an encrypted Amazon S3 bucket.

AWSTemplateFormatVersion: "2010-09-09"
Description: "This template deploys a RDS PostgreSQL database and an Amazon S3 bucket"
Parameters:
DBInstanceIdentifier:
Type: String
Default: "ahana-prestodb-demo"
DBEngine:
Type: String
Default: "postgres"
DBEngineVersion:
Type: String
Default: "12.3"
DBAvailabilityZone:
Type: String
Default: "us-east-1f"
DBInstanceClass:
Type: String
Default: "db.t3.medium"
DBStorageType:
Type: String
Default: "gp2"
DBAllocatedStorage:
Type: Number
Default: 20
DBName:
Type: String
Default: "shipping"
DBUser:
Type: String
Default: "presto"
DBPassword:
Type: String
Default: "5up3r53cr3tPa55w0rd"
# NoEcho: True
Resources:
MasterDatabase:
Type: AWS::RDS::DBInstance
Properties:
DBInstanceIdentifier:
Ref: DBInstanceIdentifier
DBName:
Ref: DBName
AllocatedStorage:
Ref: DBAllocatedStorage
DBInstanceClass:
Ref: DBInstanceClass
StorageType:
Ref: DBStorageType
Engine:
Ref: DBEngine
EngineVersion:
Ref: DBEngineVersion
MasterUsername:
Ref: DBUser
MasterUserPassword:
Ref: DBPassword
AvailabilityZone: !Ref DBAvailabilityZone
PubliclyAccessible: true
Tags:
Key: Project
Value: "Demo of RDS PostgreSQL"
DataBucket:
DeletionPolicy: Retain
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Outputs:
Endpoint:
Description: "Endpoint of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Address
Port:
Description: "Port of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Port
JdbcConnString:
Description: "JDBC connection string of RDS PostgreSQL database"
Value: !Join
""
– "jdbc:postgresql://"
!GetAtt MasterDatabase.Endpoint.Address
":"
!GetAtt MasterDatabase.Endpoint.Port
"/"
!Ref DBName
"?user="
!Ref DBUser
"&password="
!Ref DBPassword
Bucket:
Description: "Name of Amazon S3 data bucket"
Value: !Ref DataBucket

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/presto-aws-federated-queries.git

To create the AWS CloudFormation stack from the template, cloudformation/rds_s3.yaml, execute the following aws cloudformation command. Make sure you change the DBAvailabilityZone parameter value (shown in bold) to match the AWS Availability Zone in which your Ahana PrestoDB Sandbox EC2 instance was created. In my case, us-east-1f.

aws cloudformation create-stack \
--stack-name ahana-prestodb-demo \
--template-body file://cloudformation/presto_ahana_demo.yaml \
--parameters ParameterKey=DBAvailabilityZone,ParameterValue=us-east-1f

To ensure the RDS for PostgreSQL database instance can be accessed by Presto running on the Ahana PrestoDB Sandbox EC2, manually add the PrestoDB Sandbox EC2’s Security Group to port 5432 within the database instance’s VPC Security Group’s Inbound rules. I have also added my own IP to port 5432, which enables me to connect to the RDS instance directly from my IDE using JDBC.

The AWS CloudFormation stack’s Outputs tab includes a set of values, including the JDBC connection string for the new RDS for PostgreSQL instance, JdbcConnString, and the Amazon S3 bucket’s name, Bucket. All these values will be required during the demonstration.

Preparing the PrestoDB Sandbox

There are a few steps we need to take to properly prepare the PrestoDB Sandbox EC2 for our demonstration. First, use your PrestoDB Sandbox EC2 SSH key to scp the properties and sql directories to the Presto EC2 instance. First, you will need to set the EC2_ENDPOINT value (shown in bold) to your EC2’s public IPv4 address or public IPv4 DNS value. You can hardcode the value or use the aws ec2 API command is shown below to retrieve the value programmatically.

# on local workstation
EC2_ENDPOINT=$(aws ec2 describe-instances \
--filters "Name=product-code,Values=ejee5zzmv4tc5o3tr1uul6kg2" \
"Name=product-code.type,Values=marketplace" \
--query "Reservations[*].Instances[*].{Instance:PublicDnsName}" \
--output text)
scp -i "~/.ssh/ahana-presto.pem" \
-r properties/ sql/ \
ec2-user@${EC2_ENDPOINT}:~/
ssh -i "~/.ssh/ahana-presto.pem" ec2-user@${EC2_ENDPOINT}

Environment Variables

Next, we need to set several environment variables. First, replace the DATA_BUCKET and POSTGRES_HOST values below (shown in bold) to match your environment. The PGPASSWORD value should be correct unless you changed it in the CloudFormation template. Then, execute the command to add the variables to your .bash_profile file.

echo """
export DATA_BUCKET=prestodb-demo-databucket-CHANGE_ME
export POSTGRES_HOST=presto-demo.CHANGE_ME.us-east-1.rds.amazonaws.com
export PGPASSWORD=5up3r53cr3tPa55w0rd
export JAVA_HOME=/usr
export HADOOP_HOME=/home/ec2-user/hadoop
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/tools/lib/*
export HIVE_HOME=/home/ec2-user/hive
export PATH=$HIVE_HOME/bin:$HADOOP_HOME/bin:$PATH
""" >>~/.bash_profile

Optionally, I suggest updating the EC2 instance with available updates and install your favorite tools, likehtop, to monitor the EC2 performance.

yes | sudo yum update
yes | sudo yum install htop
View of htop running on an r5.xlarge EC2 instance

Before further configuration for the demonstration, let’s review a few aspects of the Ahana PrestoDB EC2 instance. There are several applications pre-installed on the instance, including Java, Presto, Hadoop, PostgreSQL, and Hive. Versions shown are current as of early September 2020.

java -version
# openjdk version "1.8.0_252"
# OpenJDK Runtime Environment (build 1.8.0_252-b09)
# OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
hadoop version
# Hadoop 2.9.2
postgres --version
# postgres (PostgreSQL) 9.2.24
psql --version
# psql (PostgreSQL) 9.2.24
hive --version
# Hive 2.3.7
presto-cli --version
# Presto CLI 0.235-cb21100

The Presto configuration files are in the /etc/presto/ directory. The Hive configuration files are in the ~/hive/conf/ directory. Here are a few commands you can use to gain a better understanding of their configurations.

ls /etc/presto/
cat /etc/presto/jvm.config
cat /etc/presto/config.properties
cat /etc/presto/node.properties
# installed and configured catalogs
ls /etc/presto/catalog/
cat ~/hive/conf/hive-site.xml

Configure Presto

To configure Presto, we need to create and copy a new Presto postgresql catalog properties file for the newly created RDS for PostgreSQL instance. Modify the properties/rds_postgresql.properties file, replacing the value, connection-url (shown in bold), with your own JDBC connection string, shown in the CloudFormation Outputs tab.

connector.name=postgresql
connection-url=jdbc:postgresql://presto-demo.abcdefg12345.us-east-1.rds.amazonaws.com:5432/shipping
connection-user=presto
connection-password=5up3r53cr3tPa55w0rd

Move the rds_postgresql.properties file to its correct location using sudo.

sudo mv properties/rds_postgresql.properties /etc/presto/catalog/

We also need to modify the existing Hive catalog properties file, which will allow us to write to non-managed Hive tables from Presto.

connector.name=hive-hadoop2
hive.metastore.uri=thrift://localhost:9083
hive.non-managed-table-writes-enabled=true

The following command will overwrite the existing hive.properties file with the modified version containing the new property.

sudo mv properties/hive.properties |
/etc/presto/catalog/hive.properties

To finalize the configuration of the catalog properties files, we need to restart Presto. The easiest way is to reboot the EC2 instance, then SSH back into the instance. Since our environment variables are in the .bash_profile file, they will survive a restart and logging back into the EC2 instance.

sudo reboot

Add Tables to Apache Hive Metastore

We will use RDS for PostgreSQL and Apache Hive Metastore/Amazon S3 as additional data sources for our federated queries. The Ahana PrestoDB Sandbox instance comes pre-configured with Apache Hive and an Apache Hive Metastore, backed by PostgreSQL (a separate PostgreSQL 9.x instance pre-installed on the EC2).

The Sandbox’s instance of Presto comes pre-configured with schemas for the TPC Benchmark DS (TPC-DS). We will create identical tables in our Apache Hive Metastore, which correspond to three external tables in the TPC-DS data source’s sf1 schema: tpcds.sf1.customer, tpcds.sf1.customer_address, and tpcds.sf1.customer_demographics. A Hive external table describes the metadata/schema on external files. External table files can be accessed and managed by processes outside of Hive. As an example, here is the SQL statement that creates the external customer table in the Hive Metastore and whose data will be stored in the S3 bucket.

CREATE EXTERNAL TABLE IF NOT EXISTS `customer`(
`c_customer_sk` bigint,
`c_customer_id` char(16),
`c_current_cdemo_sk` bigint,
`c_current_hdemo_sk` bigint,
`c_current_addr_sk` bigint,
`c_first_shipto_date_sk` bigint,
`c_first_sales_date_sk` bigint,
`c_salutation` char(10),
`c_first_name` char(20),
`c_last_name` char(30),
`c_preferred_cust_flag` char(1),
`c_birth_day` integer,
`c_birth_month` integer,
`c_birth_year` integer,
`c_birth_country` char(20),
`c_login` char(13),
`c_email_address` char(50),
`c_last_review_date_sk` bigint)
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

The threeCREATE EXTERNAL TABLE SQL statements are included in the sql/ directory: sql/hive_customer.sql, sql/hive_customer_address.sql, and sql/hive_customer_demographics.sql. The bucket name (shown in bold above), needs to be manually updated to your own bucket name in all three files before continuing.

Next, run the following hive commands to create the external tables in the Hive Metastore within the existing default schema/database.

hive --database default -f sql/hive_customer.sql
hive --database default -f sql/hive_customer_address.sql
hive --database default -f sql/hive_customer_demographics.sql

To confirm the tables were created successfully, we could use a variety of hive commands.

hive --database default -e "SHOW TABLES;"
hive --database default -e "DESCRIBE FORMATTED customer;"
hive --database default -e "SELECT * FROM customer LIMIT 5;"
Using the ‘DESCRIBE FORMATTED customer_address;’ Hive command

Alternatively, you can also create the external table interactively from within Hive, using the hive command to access the CLI. Copy and paste the contents of the SQL files to the hive CLI. To exit hive use quit;.

Interactively querying within Apache Hive

Amazon S3 Data Source Setup

With the external tables created, we will now select all the data from each of the three tables in the TPC-DS data source and insert that data into the equivalent Hive tables. The physical data will be written to Amazon S3 as highly-efficient, columnar storage format, SNAPPY-compressed Apache Parquet files. Execute the following commands. I will explain why the customer_address table statements are a bit different, next.

# inserts 100,000 rows
presto-cli --execute """
INSERT INTO hive.default.customer
SELECT * FROM tpcds.sf1.customer;
"""
# inserts 50,000 rows across 52 partitions
presto-cli --execute """
INSERT INTO hive.default.customer_address
SELECT ca_address_sk, ca_address_id, ca_street_number,
ca_street_name, ca_street_type, ca_suite_number,
ca_city, ca_county, ca_zip, ca_country, ca_gmt_offset,
ca_location_type, ca_state
FROM tpcds.sf1.customer_address
ORDER BY ca_address_sk;
"""
# add new partitions in metastore
hive -e "MSCK REPAIR TABLE default.customer_address;"
# inserts 1,920,800 rows
presto-cli --execute """
INSERT INTO hive.default.customer_demographics
SELECT * FROM tpcds.sf1.customer_demographics;
"""

Confirm the data has been loaded into the correct S3 bucket locations and is in Parquet-format using the AWS Management Console or AWS CLI. Rest assured, the Parquet-format data is SNAPPY-compressed even though the S3 console incorrectly displays Compression as None. You can easily confirm the compression codec with a utility like parquet-tools.

Data organized by key prefixes in Amazon S3
Using S3’s ‘Select from’ feature to preview the SNAPPY-compressed Parquet format data

Partitioned Tables

The customer_address table is unique in that it has been partitioned by the ca_state column. Partitioned tables are created using the PARTITIONED BY clause.

CREATE EXTERNAL TABLE `customer_address`(
`ca_address_sk` bigint,
`ca_address_id` char(16),
`ca_street_number` char(10),
`ca_street_name` char(60),
`ca_street_type` char(15),
`ca_suite_number` char(10),
`ca_city` varchar(60),
`ca_county` varchar(30),
`ca_zip` char(10),
`ca_country` char(20),
`ca_gmt_offset` double precision,
`ca_location_type` char(20)
)
PARTITIONED BY (`ca_state` char(2))
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

According to Apache Hive, a table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Since the data for the Hive tables are stored in Amazon S3, that means that when the data is written to the customer_address table, it is automatically separated into different S3 key prefixes based on the state. The data is physically “partitioned”.

customer_address data, partitioned by the state, in Amazon S3

Whenever add new partitions in S3, we need to run the MSCK REPAIR TABLE command to add that table’s new partitions to the Hive Metastore.

hive -e "MSCK REPAIR TABLE default.customer_address;"

In SQL, a predicate is a condition expression that evaluates to a Boolean value, either true or false. Defining the partitions aligned with the attributes that are frequently used in the conditions/filters (predicates) of the queries can significantly increase query efficiency. When we execute a query that uses an equality comparison condition, such as ca_state = 'TN', partitioning means the query will only work with a slice of the data in the corresponding ca_state=TN prefix key. There are 50,000 rows of data in the customer_address table, but only 1,418 rows (2.8% of the total data) in the ca_state=TN partition. With the additional advantage of Parquet format with SNAPPY compression, partitioning can significantly reduce query execution time.

Adding Data to RDS for PostgreSQL Instance

For the demonstration, we will also replicate the schema and data of the tpcds.sf1.customer_address table to the new RDS for PostgreSQL instance’s shipping database.

CREATE TABLE customer_address (
ca_address_sk bigint,
ca_address_id char(16),
ca_street_number char(10),
ca_street_name char(60),
ca_street_type char(15),
ca_suite_number char(10),
ca_city varchar(60),
ca_county varchar(30),
ca_state char(2),
ca_zip char(10),
ca_country char(20),
ca_gmt_offset double precision,
ca_location_type char(20)
);

Like Hive and Presto, we can create the table programmatically from the command line or interactively; I prefer the programmatic approach. Use the following psql command, we can create the customer_address table in the public schema of the shipping database.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-f sql/postgres_customer_address.sql

Now, to insert the data into the new PostgreSQL table, run the following presto-cli command.

# inserts 50,000 rows
presto-cli --execute """
INSERT INTO rds_postgresql.public.customer_address
SELECT * FROM tpcds.sf1.customer_address;
"""

To confirm that the data was imported properly, we can use a variety of commands.

-- Should be 50000 rows in table
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT COUNT(*) FROM customer_address;"
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT * FROM customer_address LIMIT 5;"

Alternatively, you could use the PostgreSQL client interactively by copying and pasting the contents of the sql/postgres_customer_address.sql file to the psql command prompt. To interact with PostgreSQL from the psql command prompt, use the following command.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto

Use the \dt command to list the PostgreSQL tables and the \q command to exit the PostgreSQL client. We now have all the new data sources created and configured for Presto!

Interacting with Presto

Presto provides a web interface for monitoring and managing queries. The interface provides dashboard-like insights into the Presto Cluster and queries running on the cluster. The Presto UI is available on port 8080 using the public IPv4 address or the public IPv4 DNS.

There are several ways to interact with Presto, via the PrestoDB Sandbox. The post will demonstrate how to execute ad-hoc queries against Presto from an IDE using a JDBC connection and the Presto CLI. Other options include running queries against Presto from Java and Python applications, Tableau, or Apache Spark/PySpark.

Below, we see a query being run against Presto from JetBrains PyCharm, using a Java Database Connectivity (JDBC) connection. The advantage of using an IDE like JetBrains is having a single visual interface, including all the project files, multiple JDBC configurations, output results, and the ability to run multiple ad hoc queries.

Below, we see an example of configuring the Presto Data Source using the JDBC connection string, supplied in the CloudFormation stack Outputs tab.

Make sure to download and use the latest Presto JDBC driver JAR.

With JetBrains’ IDEs, we can even limit the databases/schemas displayed by the Data Source. This is helpful when we have multiple Presto catalogs configured, but we are only interested in certain data sources.

We can also run queries using the Presto CLI, three different ways. We can pass a SQL statement to the Presto CLI, pass a file containing a SQL statement to the Presto CLI, or work interactively from the Presto CLI. Below, we see a query being run, interactively from the Presto CLI.

As the query is running, we can observe the live Presto query statistics (not very user friendly in my terminal).

And finally, the view the query results.

Federated Queries

The example queries used in the demonstration and included in the project were mainly extracted from the scholarly article, Why You Should Run TPC-DS: A Workload Analysis, available as a PDF on the tpc.org website. I have modified the SQL queries to work with Presto.

In the first example, we will run the three versions of the same basic query statement. Version 1 of the query is not a federated query; it only queries a single data source. Version 2 of the query queries two different data sources. Finally, version 3 of the query queries three different data sources. Each of the three versions of the SQL statement should return the same results — 93 rows of data.

Version 1: Single Data Source

The first version of the query statement, sql/presto_query2.sql, is not a federated query. Each of the query’s four tables (catalog_returns, date_dim, customer, and customer_address) reference the TPC-DS data source, which came pre-installed with the PrestoDB Sandbox. Note table references on lines 11–13 and 41–42 are all associated with the tpcds.sf1 schema.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
catalog_returns,
date_dim,
customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
customer_address,
customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;
view raw presto_query2.sql hosted with ❤ by GitHub

We will run each query non-interactively using the presto-cli. We will choose the sf1 (scale factor of 1) tpcds schema. According to Presto, every unit in the scale factor (sf1, sf10, sf100) corresponds to a gigabyte of data.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2.sql \
--output-format ALIGNED \
--client-tags "presto_query2"

Below, we see the query results in the presto-cli.

Below, we see the first query running in Presto’s web interface.

Below, we see the first query’s results detailed in Presto’s web interface.

Version 2: Two Data Sources

In the second version of the query statement, sql/presto_query2_federated_v1.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. The other two tables (customer and customer_address) now reference the Apache Hive Metastore for their schema and underlying data in Amazon S3. Note table references on lines 11 and 12, as opposed to lines 13, 41, and 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
hive.default.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
hive.default.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v1.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v1"

Below, we see the second query’s results detailed in Presto’s web interface.

Even though the data is in two separate and physically different data sources, we can easily query it as though it were all in the same place.

Version 3: Three Data Sources

In the third version of the query statement, sql/presto_query2_federated_v2.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. One of the tables (hive.default.customer) references the Apache Hive Metastore. The fourth table (rds_postgresql.public.customer_address) references the new RDS for PostgreSQL database instance. The underlying data is in Amazon S3. Note table references on lines 11 and 12, and on lines 13 and 41, as opposed to line 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
rds_postgresql.public.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
rds_postgresql.public.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, we have run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v2.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v2"

Below, we see the third query’s results detailed in Presto’s web interface.

Again, even though the data is in three separate and physically different data sources, we can easily query it as though it were all in the same place.

Additional Query Examples

The project contains several additional query statements, which I have extracted from Why You Should Run TPC-DS: A Workload Analysis and modified work with Presto and federate across multiple data sources.

# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1.sql \
--output-format ALIGNED \
--client-tags "presto_query1"
# federated - two sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query1_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4.sql \
--output-format ALIGNED \
--client-tags "presto_query4"
# federated - three sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query4_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query5.sql \
--output-format ALIGNED \
--client-tags "presto_query5"

Conclusion

In this post, we gained a better understanding of Presto using Ahana’s PrestoDB Sandbox product from AWS Marketplace. We learned how Presto queries data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, MongoDB, etc. We also learned about Apache Hive and the Apache Hive Metastore, Apache Parquet file format, and how and why to partition Hive data in Amazon S3. Most importantly, we learned how to write federated queries that join multiple disparate data sources without having to move the data into a single monolithic data store.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

Leave a comment

Collecting and Analyzing IoT Data in Near Real-Time with AWS IoT, LoRa, and LoRaWAN

Introduction

In a recent post published on ITNEXT, LoRa and LoRaWAN for IoT: Getting Started with LoRa and LoRaWAN Protocols for Low Power, Wide Area Networking of IoT, we explored the use of the LoRa (Long Range) and LoRaWAN protocols to transmit and receive sensor data, over a substantial distance, between an IoT device, containing several embedded sensors, and an IoT gateway. In this post, we will extend that architecture to the Cloud, using AWS IoT, a broad and deep set of IoT services, from the edge to the Cloud. We will securely collect, transmit, and analyze IoT data using the AWS cloud platform.

LoRa and LoRaWAN

According to the LoRa Alliance, Low-Power, Wide-Area Networks (LPWAN) are projected to support a major portion of the billions of devices forecasted for the Internet of Things (IoT). LoRaWAN is designed from the bottom up to optimize LPWANs for battery lifetime, capacity, range, and cost. LoRa and LoRaWAN permit long-range connectivity for IoT devices in different types of industries. According to Wikipedia, LoRaWAN defines the communication protocol and system architecture for the network, while the LoRa physical layer enables the long-range communication link.

AWS IoT

AWS describes AWS IoT as a set of managed services that enable ‘internet-connected devices to connect to the AWS Cloud and lets applications in the cloud interact with internet-connected devices.’ AWS IoT services span three categories: Device Software, Connectivity and Control, and Analytics.

In this post, we will focus on three AWS IOT services, one from each category, including AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics. According to AWS, the AWS IoT Device SDKs include open-source libraries and developer and porting guides with samples to help you build innovative IoT products or solutions on your choice of hardware platforms. AWS IoT Core is a managed cloud service that lets connected devices easily and securely interact with cloud applications and other devices. AWS IoT Core can process and route messages to AWS endpoints and other devices reliably and securely. Finally, AWS IoT Analytics is a fully-managed IoT analytics service, designed specifically for IoT, which collects, pre-processes, enriches, stores, and analyzes IoT device data at scale.

To learn more about AWS IoT, specifically the AWS IoT services we will be exploring within this post, I recommend reading my recent post published on Towards Data Science, Getting Started with IoT Analytics on AWS.

Hardware Selection

In this post, we will use the following hardware.

IoT Device with Embedded Sensors

An Arduino single-board microcontroller will serve as our IoT device. The 3.3V AI-enabled Arduino Nano 33 BLE Sense board (Amazon: USD 36.00), released in August 2019, comes with the powerful nRF52840 processor from Nordic Semiconductors, a 32-bit ARM Cortex-M4 CPU running at 64 MHz, 1MB of CPU Flash Memory, 256KB of SRAM, and a NINA-B306 stand-alone Bluetooth 5 low energy (BLE) module.

The Sense contains an impressive array of embedded sensors:

  • 9-axis Inertial Sensor (LSM9DS1): 3D digital linear acceleration sensor, a 3D digital
    angular rate sensor, and a 3D digital magnetic sensor
  • Humidity and Temperature Sensor (HTS221): Capacitive digital sensor for relative humidity and temperature
  • Barometric Sensor (LPS22HB): MEMS nano pressure sensor: 260–1260 hectopascal (hPa) absolute digital output barometer
  • Microphone (MP34DT05): MEMS audio sensor omnidirectional digital microphone
  • Gesture, Proximity, Light Color, and Light Intensity Sensor (APDS9960): Advanced Gesture detection, Proximity detection, Digital Ambient Light Sense (ALS), and Color Sense (RGBC).

The Arduino Sense is an excellent, low-cost single-board microcontroller for learning about the collection and transmission of IoT sensor data.

IoT Gateway

An IoT Gateway, according to TechTarget, is a physical device or software program that serves as the connection point between the Cloud and controllers, sensors, and intelligent devices. All data moving to the Cloud, or vice versa, goes through the gateway, which can be either a dedicated hardware appliance or software program.

LoRa Gateways, to paraphrase The Things Network, form the bridge between devices and the Cloud. Devices use low power networks like LoRaWAN to connect to the Gateway, while the Gateway uses high bandwidth networks like WiFi, Ethernet, or Cellular to connect to the Cloud.

A third-generation Raspberry Pi 3 Model B+ single-board computer (SBC) will serve as our LoRa IoT Gateway. This Raspberry Pi model features a 1.4GHz Cortex-A53 (ARMv8) 64-bit quad-core processor System on a Chip (SoC), 1GB LPDDR2 SDRAM, dual-band wireless LAN, Bluetooth 4.2 BLE, and Gigabit Ethernet (Amazon: USD 42.99).

LoRa Transceiver Modules

To transmit the IoT sensor data between the IoT device, containing the embedded sensors, and the IoT gateway, I have used the REYAX RYLR896 LoRa transceiver module (Amazon: USD 19.50 x 2). The transceiver modules are commonly referred to as a universal asynchronous receiver-transmitter (UART). A UART is a computer hardware device for asynchronous serial communication in which the data format and transmission speeds are configurable.

According to the manufacturer, REYAX, the RYLR896 contains the Semtech SX1276 long-range, low power transceiver. The RYLR896 module provides ultra-long range spread spectrum communication and high interference immunity while minimizing current consumption. Each RYLR896 module contains a small, PCB integrated, helical antenna. This transceiver operates at both the 868 and 915 MHz frequency ranges. In this demonstration, we will be transmitting at 915 MHz for North America.

The Arduino Sense (IoT device) transmits data, using one of the RYLR896 modules (shown below front). The Raspberry Pi (IoT Gateway), connected to the other RYLR896 module (shown below rear), receives the data.

LoRaWAN Security

The RYLR896 is capable of AES 128-bit data encryption. Using the Advanced Encryption Standard (AES), we will encrypt the data sent from the IoT device to the IoT gateway, using a 32 hex digit password (128 bits / 4 bits/hex digit).

Provisioning AWS Resources

To start, we will create the necessary AWS IoT and associated resources on the AWS cloud platform. Once these resources are in place, we can then proceed to configure the IoT device and IoT gateway to securely transmit the sensor data to the Cloud.

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
  –branch master –single-branch –depth 1 –no-tags \
  https://github.com/garystafford/aws-iot-analytics-demo.git

AWS CloudFormation

The CloudFormation template, iot-analytics.yaml, will create an AWS IoT CloudFormation stack containing the following resources.

  • AWS IoT Thing
  • AWS IoT Thing Policy
  • AWS IoT Core Topic Rule
  • AWS IoT Analytics Channel, Pipeline, Data store, and Data set
  • AWS Lambda and Lambda Permission
  • Amazon S3 Bucket
  • Amazon SageMaker Notebook Instance
  • AWS IAM Roles

Please be aware of the costs involved with the AWS resources used in the CloudFormation template before continuing. To create the AWS CloudFormation stack from the included CloudFormation template, execute the following AWS CLI command.

aws cloudformation create-stack \
–stack-name lora-iot-demo \
–template-body file://cloudformation/iot-analytics.yaml \
–parameters ParameterKey=ProjectName,ParameterValue=lora-iot-demo \
ParameterKey=IoTTopicName,ParameterValue=lora-iot-demo \
–capabilities CAPABILITY_NAMED_IAM

The resulting CloudFormation stack should contain 16 AWS resources.

Additional Resources

Unfortunately, AWS CloudFormation cannot create all the AWS IoT resources we require for this demonstration. To complete the AWS provisioning process, execute the following series of AWS CLI commands, aws_cli_commands.md. These commands will create the remaining resources, including an AWS IoT Thing Type, Thing Group, Thing Billing Group, and an X.509 Certificate.

# LoRaWAN / AWS IoT Demo
# Author: Gary Stafford
# Run AWS CLI commands after CloudFormation stack completes successfully
# variables
thingName=lora-iot-gateway-01
thingPolicy=LoRaDevicePolicy
thingType=LoRaIoTGateway
thingGroup=LoRaIoTGateways
thingBillingGroup=LoRaIoTGateways
mkdir ${thingName}
aws iot create-keys-and-certificate \
–certificate-pem-outfile "${thingName}/${thingName}.cert.pem" \
–public-key-outfile "${thingName}/${thingName}.public.key" \
–private-key-outfile "${thingName}/${thingName}.private.key" \
–set-as-active
# assuming you only have one certificate registered
certificate=$(aws iot list-certificates | jq '.[][] | .certificateArn')
## alternately, for a specific certificate if you have more than one
# aws iot list-certificates
## then change the value below
# certificate=arn:aws:iot:us-east-1:123456789012:cert/<certificate>
aws iot attach-policy \
–policy-name $thingPolicy \
–target $certificate
aws iot attach-thing-principal \
–thing-name $thingName \
–principal $certificate
aws iot create-thing-type \
–thing-type-name $thingType \
–thing-type-properties "thingTypeDescription=LoRaWAN IoT Gateway"
aws iot create-thing-group \
–thing-group-name $thingGroup \
–thing-group-properties "thingGroupDescription=\"LoRaWAN IoT Gateway Thing Group\", attributePayload={attributes={Manufacturer=RaspberryPiFoundation}}"
aws iot add-thing-to-thing-group \
–thing-name $thingName \
–thing-group-name $thingGroup
aws iot create-billing-group \
–billing-group-name $thingBillingGroup \
–billing-group-properties "billingGroupDescription=\"Gateway Billing Group\""
aws iot add-thing-to-billing-group \
–thing-name $thingName \
–billing-group-name $thingBillingGroup
aws iot update-thing \
–thing-name $thingName \
–thing-type-name $thingType \
–attribute-payload "{\"attributes\": {\"GatewayMfr\":\"RaspberryPiFoundation\", \"LoRaMfr\":\"REYAX\", \"LoRaModel\":\"RYLR896\"}}"
aws iot describe-thing \
–thing-name $thingName
view raw aws_cli_commands.sh hosted with ❤ by GitHub

IoT Device Configuration

With the AWS resources deployed, we can configure the IoT device and IoT Gateway.

Arduino Sketch

For those not familiar with Arduino, a sketch is the name that Arduino uses for a program. It is the unit of code that is uploaded into non-volatile flash memory and runs on an Arduino board. The Arduino language is a set of C and C++ functions. All standard C and C++ constructs supported by the avr-g++ compiler should work in Arduino.

For this post, the sketch, lora_iot_demo_aws.ino, contains the code necessary to collect and securely transmit the environmental sensor data, including temperature, relative humidity, barometric pressure, Red, Green, and Blue (RGB) color, and ambient light intensity, using the LoRaWAN protocol.

/*
Description: Transmits Arduino Nano 33 BLE Sense sensor telemetry over LoRaWAN,
including temperature, humidity, barometric pressure, and color,
using REYAX RYLR896 transceiver modules
http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
Author: Gary Stafford
*/
#include <Arduino_HTS221.h>
#include <Arduino_LPS22HB.h>
#include <Arduino_APDS9960.h>
const int UPDATE_FREQUENCY = 5000; // update frequency in ms
const float CALIBRATION_FACTOR = –4.0; // temperature calibration factor (Celsius)
const int ADDRESS = 116;
const int NETWORK_ID = 6;
const String PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF";
const String DELIMITER = "|";
String uid = "";
void setup()
{
Serial.begin(9600);
Serial1.begin(115200); // default baud rate of module is 115200
delay(1000); // wait for LoRa module to be ready
// get unique transceiver id to identify iot device on network
Serial1.print((String)"AT+UID?\r\n");
uid = Serial1.readString();
uid.replace("+UID=", ""); // trim off '+UID=' at start of line
uid.replace("\r\n", ""); // trim off CR/LF at end of line
// needs all need to be same for receiver and transmitter
Serial1.print((String)"AT+ADDRESS=" + ADDRESS + "\r\n");
delay(200);
Serial1.print((String)"AT+NETWORKID=" + NETWORK_ID + "\r\n");
delay(200);
Serial1.print("AT+CPIN=" + PASSWORD + "\r\n");
delay(200);
Serial1.print("AT+CPIN?\r\n"); // confirm password is set
if (!HTS.begin())
{ // initialize HTS221 sensor
Serial.println("Failed to initialize humidity temperature sensor!");
while (1);
}
if (!BARO.begin())
{ // initialize LPS22HB sensor
Serial.println("Failed to initialize pressure sensor!");
while (1);
}
// avoid bad readings to start bug
// https://forum.arduino.cc/index.php?topic=660360.0
BARO.readPressure();
delay(1000);
if (!APDS.begin())
{ // initialize APDS9960 sensor
Serial.println("Failed to initialize color sensor!");
while (1);
}
}
void loop()
{
updateReadings();
delay(UPDATE_FREQUENCY);
}
void updateReadings()
{
float temperature = getTemperature(CALIBRATION_FACTOR);
float humidity = getHumidity();
float pressure = getPressure();
int colors[4];
getColor(colors);
String payload = buildPayload(temperature, humidity, pressure, colors);
Serial.println("Payload: " + payload); // display the payload for debugging
Serial1.print(payload); // send the payload over LoRaWAN WiFi
displayResults(temperature, humidity, pressure, colors); // display the results for debugging
}
float getTemperature(float calibration)
{
return HTS.readTemperature() + calibration;
}
float getHumidity()
{
return HTS.readHumidity();
}
float getPressure()
{
return BARO.readPressure();
}
void getColor(int c[])
{
// check if a color reading is available
while (!APDS.colorAvailable())
{
delay(5);
}
int r, g, b, a;
APDS.readColor(r, g, b, a);
c[0] = r;
c[1] = g;
c[2] = b;
c[3] = a;
}
// display for debugging purposes
void displayResults(float t, float h, float p, int c[])
{
Serial.println((String)"UID: " + uid);
Serial.print("Temperature: ");
Serial.println(t);
Serial.print("Humidity: ");
Serial.println(h);
Serial.print("Pressure: ");
Serial.println(p);
Serial.print("Color (r, g, b, a): ");
Serial.print(c[0]);
Serial.print(", ");
Serial.print(c[1]);
Serial.print(", ");
Serial.print(c[2]);
Serial.print(", ");
Serial.println(c[3]);
Serial.println("———-");
}
String buildPayload(float t, float h, float p, int c[])
{
String readings = "";
readings += uid;
readings += DELIMITER;
readings += t;
readings += DELIMITER;
readings += h;
readings += DELIMITER;
readings += p;
readings += DELIMITER;
readings += c[0];
readings += DELIMITER;
readings += c[1];
readings += DELIMITER;
readings += c[2];
readings += DELIMITER;
readings += c[3];
String payload = "";
payload += "AT+SEND=";
payload += ADDRESS;
payload += ",";
payload += readings.length();
payload += ",";
payload += readings;
payload += "\r\n";
return payload;
}

AT Commands

Communications with the RYLR896’s long-range modem is done using AT commands. AT commands are instructions used to control a modem. AT is the abbreviation of ATtention. Every command line starts with AT. That is why modem commands are called AT commands, according to Developer’s Home. A complete list of AT commands can be downloaded as a PDF from the RYLR896 product page.

To efficiently transmit the environmental sensor data from the IoT sensor to the IoT gateway, the sketch concatenates the sensor ID and the sensor values together in a single string. The string will be incorporated into an AT command, sent to the RYLR896 LoRa transceiver module. To make it easier to parse the sensor data on the IoT gateway, we will delimit the sensor values with a pipe (|), as opposed to a comma. According to REYAX, the maximum length of the LoRa payload is approximately 330 bytes.

Below, we see an example of an AT command used to send the sensor data from the IoT sensor and the corresponding unencrypted data received by the IoT gateway. Both contain the LoRa transmitter Address ID, payload length (62 bytes in the example), and the payload. The data received by the IoT gateway also has the Received signal strength indicator (RSSI), and Signal-to-noise ratio (SNR).

Receiving Data on IoT Gateway

The Raspberry Pi will act as a LoRa IoT gateway, receiving the environmental sensor data from the IoT device, the Arduino, and sending the data to AWS. The Raspberry Pi runs a Python script, rasppi_lora_receiver_aws.py, which will receive the data from the Arduino Sense, decrypt the data, parse the sensor values, and serialize the data to a JSON payload, and finally, transmit the payload in an MQTT-protocol message to AWS. The script uses the pyserial, the Python Serial Port Extension, which encapsulates the access for the serial port for communication with the RYLR896 module. The script uses the AWS IoT Device SDK for Python v2 to communicate with AWS.

import json
import logging
import sys
import threading
import time
from argparse import ArgumentParser
import serial
from awscrt import io, mqtt, auth, http, exceptions
from awsiot import mqtt_connection_builder
# LoRaWAN IoT Sensor Demo
# Using REYAX RYLR896 transceiver modules
# http://reyax.com/wp-content/uploads/2020/01/Lora-AT-Command-RYLR40x_RYLR89x_EN.pdf
# Author: Gary Stafford
# Requirements: python3 -m pip install –user -r requirements.txt
# Usage:
# sh ./rasppi_lora_receiver_aws.sh \
# a1d0wxnxn1hs7m-ats.iot.us-east-1.amazonaws.com
# constants
ADDRESS = 116
NETWORK_ID = 6
PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF"
# global variables
count = 0 # from args
received_count = 0
received_all_event = threading.Event()
def main():
# get args
logging.basicConfig(filename='output.log',
filemode='w', level=logging.DEBUG)
args = get_args() # get args
payload = ""
lora_payload = {}
# set log level
io.init_logging(getattr(io.LogLevel, args.verbosity), 'stderr')
# spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
# set MQTT connection
mqtt_connection = set_mqtt_connection(args, client_bootstrap)
logging.debug("Connecting to {} with client ID '{}'…".format(
args.endpoint, args.client_id))
connect_future = mqtt_connection.connect()
# future.result() waits until a result is available
connect_future.result()
logging.debug("Connecting to REYAX RYLR896 transceiver module…")
serial_conn = serial.Serial(
port=args.tty,
baudrate=int(args.baud_rate),
timeout=5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
if serial_conn.isOpen():
logging.debug("Connected!")
set_lora_config(serial_conn)
check_lora_config(serial_conn)
while True:
# read data from serial port
serial_payload = serial_conn.readline()
logging.debug(serial_payload)
if len(serial_payload) >= 1:
payload = serial_payload.decode(encoding="utf-8")
payload = payload[:2]
try:
data = parse_payload(payload)
lora_payload = {
"ts": time.time(),
"data": {
"device_id": str(data[0]),
"gateway_id": str(args.gateway_id),
"temperature": float(data[1]),
"humidity": float(data[2]),
"pressure": float(data[3]),
"color": {
"red": float(data[4]),
"green": float(data[5]),
"blue": float(data[6]),
"ambient": float(data[7])
}
}
}
logging.debug(lora_payload)
except IndexError:
logging.error("IndexError: {}".format(payload))
except ValueError:
logging.error("ValueError: {}".format(payload))
# publish mqtt message
message_json = json.dumps(
lora_payload,
sort_keys=True,
indent=None,
separators=(',', ':'))
try:
mqtt_connection.publish(
topic=args.topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE)
except mqtt.SubscribeError as err:
logging.error(".SubscribeError: {}".format(err))
except exceptions.AwsCrtError as err:
logging.error("AwsCrtError: {}".format(err))
def set_mqtt_connection(args, client_bootstrap):
if args.use_websocket:
proxy_options = None
if args.proxy_host:
proxy_options = http.HttpProxyOptions(
host_name=args.proxy_host, port=args.proxy_port)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(
client_bootstrap)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=args.endpoint,
client_bootstrap=client_bootstrap,
region=args.signing_region,
credentials_provider=credentials_provider,
websocket_proxy_options=proxy_options,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
else:
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=args.endpoint,
cert_filepath=args.cert,
pri_key_filepath=args.key,
client_bootstrap=client_bootstrap,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
return mqtt_connection
def get_args():
parser = ArgumentParser(
description="Send and receive messages through and MQTT connection.")
parser.add_argument("–tty", required=True,
help="serial tty", default="/dev/ttyAMA0")
parser.add_argument("–baud-rate", required=True,
help="serial baud rate", default=1152000)
parser.add_argument('–endpoint', required=True, help="Your AWS IoT custom endpoint, not including a port. " +
"Ex: \"abcd123456wxyz-ats.iot.us-east-1.amazonaws.com\"")
parser.add_argument('–cert', help="File path to your client certificate, in PEM format.")
parser.add_argument('–key', help="File path to your private key, in PEM format.")
parser.add_argument('–root-ca', help="File path to root certificate authority, in PEM format. " +
"Necessary if MQTT server uses a certificate that's not already in " +
"your trust store.")
parser.add_argument('–client-id', default='samples-client-id',
help="Client ID for MQTT connection.")
parser.add_argument('–topic', default="samples/test",
help="Topic to subscribe to, and publish messages to.")
parser.add_argument('–message', default="Hello World!", help="Message to publish. " +
"Specify empty string to publish nothing.")
parser.add_argument('–count', default=0, type=int, help="Number of messages to publish/receive before exiting. " +
"Specify 0 to run forever.")
parser.add_argument('–use-websocket', default=False, action='store_true',
help="To use a websocket instead of raw mqtt. If you specify this option you must "
"specify a region for signing, you can also enable proxy mode.")
parser.add_argument('–signing-region', default='us-east-1',
help="If you specify –use-web-socket, this is the region that will be used for computing "
"the Sigv4 signature")
parser.add_argument('–proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " +
"you will likely need to set –root-ca to the ca for your proxy.")
parser.add_argument('–proxy-port', type=int, default=8080,
help="Port for proxy to connect to.")
parser.add_argument('–verbosity', choices=[x.name for x in io.LogLevel], default=io.LogLevel.NoLogs.name,
help='Logging level')
parser.add_argument("–gateway-id", help="IoT Gateway serial number")
args = parser.parse_args()
return args
def parse_payload(payload):
# input: +RCV=116,29,0447383033363932003C0034|23.94|37.71|99.89|16|38|53|80,-61,56
# output: [0447383033363932003C0034, 23.94, 37.71, 99.89, 16.0, 38.0, 53.0, 80.0]
payload = payload.split(",")
payload = payload[2].split("|")
payload = [i for i in payload]
return payload
def set_lora_config(serial_conn):
# configures the REYAX RYLR896 transceiver module
serial_conn.write(str.encode("AT+ADDRESS=" + str(ADDRESS) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID=" + str(NETWORK_ID) + "\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network Id set? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN=" + PASSWORD + "\r\n"))
time.sleep(1)
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES-128 password set? {}".format(serial_payload.decode(encoding="utf-8")))
def check_lora_config(serial_conn):
serial_conn.write(str.encode("AT?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Module responding? {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+ADDRESS?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Address: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+NETWORKID?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Network id: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+IPR?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("UART baud rate: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+BAND?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF frequency: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CRFOP?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF output power: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+MODE?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("Work mode: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+PARAMETER?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("RF parameters: {}".format(serial_payload.decode(encoding="utf-8")))
serial_conn.write(str.encode("AT+CPIN?\r\n"))
serial_payload = (serial_conn.readline())[:2]
logging.debug("AES128 password of the network: {}".format(serial_payload.decode(encoding="utf-8")))
# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
logging.error("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
logging.warning("Connection resumed. return_code: {} session_present: {}".format(
return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
logging.warning("Session did not persist. Resubscribing to existing topics…")
resubscribe_future, _ = connection.resubscribe_existing_topics()
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
logging.warning("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
logging.debug("Received message from topic '{}': {}".format(topic, payload))
global received_count
received_count += 1
if received_count == count:
received_all_event.set()
if __name__ == "__main__":
sys.exit(main())

Running the IoT Gateway Python Script

To run the Python script on the Raspberry Pi, we will use a helper shell script, rasppi_lora_receiver_aws.sh. The shell script helps construct the arguments required to execute the Python script.

#!/bin/bash
# Author: Gary A. Stafford
# Start IoT data collector script and tails output
# Usage:
# sh ./rasppi_lora_receiver_aws.sh \
# a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com
if [[ $# -ne 1 ]]; then
echo "Script requires 1 parameter!"
exit 1
fi
# input parameters
ENDPOINT=$1 # e.g. a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com
DEVICE="lora-iot-gateway-01" # matches CloudFormation thing name
CERTIFICATE="${DEVICE}-certificate.pem.crt" # e.g. lora-iot-gateway-01-certificate.pem.crt
KEY="${DEVICE}-private.pem.key" # e.g. lora-iot-gateway-01-private.pem.key
GATEWAY_ID=$(< /proc/cpuinfo grep Serial | grep -oh "[a-z0-9]*$") # e.g. 00000000f62051ce
# output for debugging
echo "DEVICE: ${DEVICE}"
echo "ENDPOINT: ${ENDPOINT}"
echo "CERTIFICATE: ${CERTIFICATE}"
echo "KEY: ${KEY}"
echo "GATEWAY_ID: ${GATEWAY_ID}"
# call the python script
nohup python3 rasppi_lora_receiver_aws.py \
–endpoint "${ENDPOINT}" \
–cert "${DEVICE}-creds/${CERTIFICATE}" \
–key "${DEVICE}-creds/${KEY}" \
–root-ca "${DEVICE}-creds/AmazonRootCA1.pem" \
–client-id "${DEVICE}" \
–topic "lora-iot-demo" \
–gateway-id "${GATEWAY_ID}" \
–verbosity "Info" \
–tty "/dev/ttyAMA0" \
–baud-rate 115200 \
>collector.log 2>&1 </dev/null &
sleep 2
# tail the log (Control-C to exit)
tail -f collector.log

To run the helper script, we execute the following command, substituting the input parameter, the AWS IoT endpoint, with your endpoint.

sh ./rasppi_lora_receiver_aws.sh \
  a1b2c3d4e5678f-ats.iot.us-east-1.amazonaws.com

You should see the console output, similar to the following.

The script starts by configuring the RYLR896 module and outputting that configuration to a log file, output.log. If successful, we should see the following debug information logged.

DEBUG:root:Connecting to a1b2c3d4e5f6-ats.iot.us-east-1.amazonaws.com with client ID 'lora-iot-gateway-01'
DEBUG:root:Connecting to REYAX RYLR896 transceiver module
DEBUG:root:Connected!
DEBUG:root:Address set? +OK
DEBUG:root:Network Id set? +OK
DEBUG:root:AES-128 password set? +OK
DEBUG:root:Module responding? +OK
DEBUG:root:Address: +ADDRESS=116
DEBUG:root:Network id: +NETWORKID=6
DEBUG:root:UART baud rate: +IPR=115200
DEBUG:root:RF frequency: +BAND=915000000
DEBUG:root:RF output power: +CRFOP=15
DEBUG:root:Work mode: +MODE=0
DEBUG:root:RF parameters: +PARAMETER=12,7,1,4
DEBUG:root:AES128 password of the network: +CPIN=92A0ECEC9000DA0DCF0CAAB0ABA2E0EF

That sensor data is also written to the log file for debugging purposes. This first line in the log (shown below) is the raw decrypted data received from the IoT device via LoRaWAN. The second line is the JSON-serialized payload, sent securely to AWS, using the MQTT protocol.

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.46|41.89|99.38|230|692|833|1116,-48,39\r\n'

DEBUG:root:{'ts': 1598305503.7041512, 'data': {'humidity': 41.89, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 230.0, 'blue': 833.0, 'ambient': 1116.0, 'green': 692.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.46|41.63|99.38|236|696|837|1127,-49,35\r\n'

DEBUG:root:{'ts': 1598305513.7918658, 'data': {'humidity': 41.63, 'temperature': 23.46, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 236.0, 'blue': 837.0, 'ambient': 1127.0, 'green': 696.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.44|41.57|99.38|232|686|830|1113,-48,32\r\n'

DEBUG:root:{'ts': 1598305523.8556132, 'data': {'humidity': 41.57, 'temperature': 23.44, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 232.0, 'blue': 830.0, 'ambient': 1113.0, 'green': 686.0}}}

DEBUG:root:b'+RCV=116,59,0447383033363932003C0034|23.51|41.44|99.38|205|658|802|1040,-48,36\r\n'

DEBUG:root:{'ts': 1598305528.8890748, 'data': {'humidity': 41.44, 'temperature': 23.51, 'device_id': '0447383033363932003C0034', 'gateway_id': '00000000f62051ce', 'pressure': 99.38, 'color': {'red': 205.0, 'blue': 802.0, 'ambient': 1040.0, 'green': 658.0}}}

AWS IoT Core

The Raspberry Pi-based IoT gateway will be registered with AWS IoT Core. IoT Core allows users to connect devices quickly and securely to AWS.

Things

According to AWS, IoT Core can reliably scale to billions of devices and trillions of messages. Registered devices are referred to as things in AWS IoT Core. A thing is a representation of a specific device or logical entity. Information about a thing is stored in the registry as JSON data.

Below, we see an example of the Thing created by CloudFormation. The Thing, lora-iot-gateway-01, represents the physical IoT gateway. We have assigned the IoT gateway a Thing Type, LoRaIoTGateway, a Thing Group, LoRaIoTGateways, and a Thing Billing Group, IoTGateways.

In a real IoT environment, containing hundreds, thousands, even millions of IoT devices, gateways, and sensors, these classification mechanisms, Thing Type, Thing Group, and Thing Billing Group, will help to organize IoT assets.

Device Gateway and Message Broker

IoT Core provides a Device Gateway, which manages all active device connections. The Gateway currently supports MQTT, WebSockets, and HTTP 1.1 protocols. Behind the Message Gateway is a high-throughput pub/sub Message Broker, which securely transmits messages to and from all IoT devices and applications with low latency. Below, we see a typical AWS IoT Core architecture containing multiple Topics, Rules, and Actions.

AWS IoT Security

AWS IoT Core provides mutual authentication and encryption, ensuring all data is exchanged between AWS and the devices are secure by default. In the demonstration, all data is sent securely using Transport Layer Security (TLS) 1.2 with X.509 digital certificates on port 443. Below, we see an example of an X.509 certificate assigned to the Thing, lora-iot-gateway-01, which represents the physical IoT gateway. The X.509 certificate and the private key, generated using the AWS CLI, previously, are installed on the IoT gateway.

Authorization of the device to access any resource on AWS is controlled by AWS IoT Core Policies. These policies are similar to AWS IAM Policies. Below, we see an example of an AWS IoT Core Policy, LoRaDevicePolicy, which is assigned to the IoT gateway.

AWS IoT Core Rules

Once an MQTT message is received from the IoT gateway (a thing), we use AWS IoT Rules to send message data to an AWS IoT Analytics Channel. Rules give your devices the ability to interact with AWS services. Rules are analyzed, and Actions are performed based on the MQTT topic stream. Below, we see an example rule that forwards our messages to an IoT Analytics Channel.

Rule query statements are written in standard Structured Query Language (SQL). The datasource for the Rule query is an IoT Topic.

SELECT
data.device_id,
data.gateway_id,
data.temperature,
data.humidity,
data.pressure,
data.color.red,
data.color.green,
data.color.blue,
data.color.ambient,
ts,
Clientid () AS device,
parse_time ("yyyy-MM-dd'T'HH:mm:ss.SSSZ", timestamp(), "UTC") AS msg_received
FROM
"${IoTTopicName}"
view raw iot_rule.sql hosted with ❤ by GitHub

AWS IoT Analytics

AWS IoT Analytics is composed of five primary components: Channels, Pipelines, Data stores, Data sets, and Notebooks. These components enable you to collect, prepare, store, analyze, and visualize your IoT data.

Below, we see a typical AWS IoT Analytics architecture. IoT messages are received from AWS IoT Core, thought a Rule Action. Amazon QuickSight provides business intelligence, visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting.

IoT Analytics Channel

An AWS IoT Analytics Channel pulls messages or data into IoT Analytics from other AWS sources, such as Amazon S3, Amazon Kinesis, or Amazon IoT Core. Channels store data for IoT Analytics Pipelines. Both Channels and Data store support storing data in your own Amazon S3 bucket or an IoT Analytics service-managed S3 bucket. In the demonstration, we are using a service managed S3 bucket.

When creating a Channel, you also decide how long to retain the data. For the demonstration, we have set the data retention period for 21 days. Channels are generally not used for long term storage of data. Typically, you would only retain data in the Channel for the period you need to analyze. For long term storage of IoT message data, I recommend using an AWS IoT Core Rule to send a copy of the raw IoT data to Amazon S3, using a service such as Amazon Kinesis Data Firehose.

IoT Analytics Pipeline

An AWS IoT Analytics Pipeline consumes messages from one or more Channels. Pipelines transform, filter, and enrich the messages before storing them in IoT Analytics Data stores. A Pipeline is composed of an ordered list of activities. Logically, you must specify both a Channel (source) and a Datastore (destination) activity. Optionally, you may choose as many as 23 additional activities in the pipelineActivities array.

In our demonstration’s Pipeline, iot_analytics_pipeline, we have specified three additional activities, including DeviceRegistryEnrich, Filter, and Lambda. Other activity types include Math, SelectAttributes, RemoveAttributes, and AddAttributes.

The Filter activity ensures the sensor values are not Null or otherwise erroneous; if true, the message is dropped. The Lambda Pipeline activity executes an AWS Lambda function to transform the messages in the pipeline. Messages are sent in an event object to the Lambda. The message is modified, and the event object is returned to the activity.

The Python-based Lambda function easily handles typical IoT data transformation tasks, including converting the temperature from Celsius to Fahrenheit, pressure from kilopascals (kPa) to inches of Mercury (inHg), and 12-bit RGBA values to 8-bit color values (0–255). The Lambda function also rounds down all the values to between 0 and 2 decimal places of precision.

def lambda_handler(event, context):
for e in event:
e['temperature'] = round((e['temperature'] * 1.8) + 32, 2)
e['humidity'] = round(e['humidity'], 2)
e['pressure'] = round((e['pressure'] / 3.3864), 2)
e['red'] = int(round(e['red'] / (4097 / 255), 0))
e['green'] = int(round(e['green'] / (4097 / 255), 0))
e['blue'] = int(round(e['blue'] / (4097 / 255), 0))
e['ambient'] = int(round(e['ambient'] / (4097 / 255), 0))
return event

The demonstration’s Pipeline also enriches the IoT data with metadata from the IoT device’s AWS IoT Core Registry. The metadata includes additional information about the device that generated the IoT data, including the custom attributes such as LoRa transceiver manufacturer and model, and the IoT gateway manufacturer.

A notable feature of Pipelines is the ability to reprocess messages. If you make changes to the Pipeline, which often happens during the data preparation stage, you can reprocess any or all the IoT data in the associated Channel, and overwrite the IoT data in the Data set.

IoT Analytics Data store

An AWS IoT Analytics Data store stores prepared data from an AWS IoT Analytics Pipeline, in a fully-managed database. Both Channels and Data store support storing IoT data in your own Amazon S3 bucket or an IoT Analytics managed S3 bucket. In the demonstration, we are using a service-managed S3 bucket to store the IoT data in our Data store, iot_analytics_data_store.

IoT Analytics Data set

An AWS IoT Analytics Data set automatically provides regular, up-to-date insights for data analysts by querying a Data store using standard SQL. Periodic updates are implemented using a cron expression. For the demonstration, we are updating our Data set, iot_analytics_data_set, at a 15-minute interval. The time interval can be increased or reduced, depending on the desired ‘near real-time’ nature of the IoT data being analyzed.

Below, we see messages in the Result preview pane of the Data set. Note the SQL query used to obtain the messages, which queries the Data store. The Data store, as you will recall, contains the transformed messages from the Pipeline.

IoT Analytics Data sets also support sending content results, which are materialized views of your IoT Analytics data, to an Amazon S3 bucket.

The CloudFormation stack created an encrypted Amazon S3 Bucket. This bucket receives a copy of the messages from the IoT Analytics Data set whenever the cron expression runs the scheduled update.

IoT Analytics Notebook

An AWS IoT Analytics Notebook allows users to perform statistical analysis and machine learning on IoT Analytics Data sets using Jupyter Notebooks. The IoT Analytics Notebook service includes a set of notebook templates that contain AWS-authored machine learning models and visualizations. Notebook Instances can be linked to a GitHub or other source code repository. Notebooks created with IoT Analytics Notebook can also be accessed directly through Amazon SageMaker. For the demonstration, the Notebooks Instance is cloned from our project’s GitHub repository.

The repository contains a sample Jupyter Notebook, LoRa_IoT_Analytics_Demo.ipynb, based on the conda_python3 kernel. This preinstalled environment includes the default Anaconda installation and Python 3.

The Notebook uses pandas, matplotlib, and plotly to manipulate and visualize the sample IoT data stored in the IoT Analytics Data set.

The Notebook can be modified, and the changes pushed back to GitHub. You could easily fork the demonstration’s GitHub repository and modify the CloudFormation template to point to your source code repository.

Amazon QuickSight

Amazon QuickSight provides business intelligence (BI) and visualization. Amazon QuickSight ML Insights adds anomaly detection and forecasting. We can use Amazon QuickSight to visualize the IoT message data, stored in the IoT Analytics Data set.

Amazon QuickSight has both a Standard and an Enterprise Edition. AWS provides a detailed product comparison of each edition. For the post, I am demonstrating the Enterprise Edition, which includes additional features, such as ML Insights, hourly refreshes of SPICE (super-fast, parallel, in-memory, calculation engine), and theme customization.

Please be aware of the costs of Amazon QuickSight if you choose to follow along with this part of the demo. Although there is an Amazon QuickSight API, Amazon QuickSight is not automatically enabled or configured with CloudFormation or using the AWS CLI in this demonstration.

QuickSight Data Sets

Amazon QuickSight has a wide variety of data source options for creating Amazon QuickSight Data sets, including the ones shown below. Do not confuse Amazon QuickSight Data sets with IoT Analytics Data sets; they are two different service features.

For the demonstration, we will create an Amazon QuickSight Data set that will use our IoT Analytics Data set, iot_analytics_data_set.

Amazon QuickSight gives you the ability to view and modify QuickSight Data sets before visualizing. QuickSight even provides a wide variety of functions, enabling us to perform dynamic calculations on the field values. For this demonstration, we will leave the data unchanged since all transformations were already completed in the IoT Analytics Pipeline.

QuickSight Analysis

Using the QuickSight Data set, built from the IoT Analytics Data set as a data source, we create a QuickSight Analysis. The QuickSight Analysis console is shown below. An Analysis is primarily a collection of Visuals (aka Visual types). QuickSight provides several Visual types. Each visual is associated with a Data set. Data for the QuickSight Analysis or each visual within the Analysis can be filtered. For the demo, I have created a simple QuickSight Analysis, including a few typical QuickSight visuals.

QuickSight Dashboards

To share a QuickSight Analysis, we can create a QuickSight Dashboard. Below, we see a few views of the QuickSight Analysis, shown above, as a Dashboard. Although viewers of the Dashboard cannot edit the visuals, they can apply filtering and interactively drill-down into data in the Visuals.

Amazon QuickSight ML Insights

According to Amazon, ML Insights leverages AWS’s machine learning (ML) and natural language capabilities to gain deeper insights from data. QuickSight’s ML-powered Anomaly Detection continuously analyze data to discover anomalies and variations inside of the aggregates, giving you the insights to act when business changes occur. QuickSight’s ML-powered Forecasting can be used to predict your business metrics accurately, and perform interactive what-if analysis with point-and-click simplicity. QuickSight’s built-in algorithms make it easy for anyone to use ML that learns from your data patterns to provide you with accurate predictions based on historical trends.

Below, we see the ML Insights tab (left) in the demonstration’s QuickSight Analysis. Individually detected anomalies can be added to the QuickSight Analysis, like Visuals, and configured to tune the detection parameters. Observe the temperature, humidity, and barometric pressure anomalies, identified by ML Insights, based on their Anomaly Score, which is higher or lower, given a minimum delta of five percent. These anomalies accurately reflected an actual failure of the IoT device, caused by overheated during testing, which resulted in abnormal sensor readings.

Receiving the Messages on AWS

To confirm the IoT gateway is sending messages, we can use a packet analyzer, like tcpdump, on the IoT gateway. Running tcpdump on the IoT gateway, below, we see outbound encrypted MQTT messages being sent to AWS on port 443.

To confirm those messages are being received from the IoT gateway on AWS, we can use the AWS IoT Core Test feature and subscribe to the lora-iot-demo topic. We should see messages flowing in from the IoT gateway at approximately 5-second intervals.

The JSON payload structure of the incoming MQTT messages will look similar to the below example. The device_id is the unique id of the IoT device that transmitted the message using LoRaWAN. The gateway_id is the unique id of the IoT gateway that received the message using LoRaWAN and sent it to AWS. A single IoT gateway would usually manage messages from multiple IoT devices, each with a unique id.

{
"data": {
"color": {
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281
},
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6
},
"ts": 1598543131.9861386
}

The SQL query used by the AWS IoT Rule described earlier, transforms and flattens the nested JSON payload structure, before passing it to the AWS IoT Analytics Channel, as shown below.

{
"ambient": 1057,
"blue": 650,
"green": 667,
"red": 281,
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"humidity": 45.73,
"pressure": 98.65,
"temperature": 23.6,
"ts": 1598543131.9861386,
"msg_received": "2020-08-27T11:45:32.074+0000",
"device": "lora-iot-gateway-01"
}

We can measure the near real-time nature of the IoT data using the ts and msg_received data fields. The ts data field is date and time when the sensor reading occurred on the IoT device, while the msg_received data field is the date and time when the message was received on AWS. The delta between the two values is a measure of how near real-time the sensor readings are being streamed to the AWS IoT Analytics Channel. In the below example, the difference between ts (2020–08–27T11:45:31.986) and msg_received (2020–08–27T11:45:32.074) is 88 ms.

Final IoT Data Message Structure

Once the message payload passes through the AWS IoT Analytics Pipeline and lands in the AWS IoT Analytics Data set, its final data structure looks as follows. Note that the device’s attribute metadata has been added from the AWS IoT Core device registry. Regrettably, the metadata is not well-formatted JSON and will require additional transformation to be usable.

{
"device_id": "0447383033363932003C0034",
"gateway_id": "00000000f62051ce",
"temperature": 74.48,
"humidity": 45.73,
"pressure": 29.13,
"red": 17,
"green": 42,
"blue": 40,
"ambient": 66,
"ts": 1598543131.9861386,
"device": "lora-iot-gateway-01",
"msg_received": "2020-08-27T15:45:32.024+0000",
"metadata": {
"defaultclientid": "lora-iot-gateway-01",
"thingname": "lora-iot-gateway-01",
"thingid": "017db4b8-7fca-4617-aa58-7125dd94ab36",
"thingarn": "arn:aws:iot:us-east-1:123456789012:thing/lora-iot-gateway-01",
"thingtypename": "LoRaIoTGateway",
"attributes": {
"loramfr": "REYAX",
"gatewaymfr": "RaspberryPiFoundation",
"loramodel": "RYLR896"
},
"version": "2",
"billinggroupname": "LoRaIoTGateways"
},
"__dt": "2020-08-27 00:00:00.000"
}

A set of sample messages is included in the GitHub project’s sample_messages directory.

Conclusion

In this post, we explored the use of the LoRa and LoRaWAN protocols to transmit environmental sensor data from an IoT device to an IoT gateway. Given its low energy consumption, long-distance transmission capabilities, and well-developed protocols, LoRaWAN is an ideal long-range wireless protocol for IoT devices. We then demonstrated how to use AWS IoT Device SDKs, AWS IoT Core, and AWS IoT Analytics to securely collect, analyze, and visualize streaming messages from the IoT device, in near real-time.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , ,

Leave a comment

Architecting a Successful SaaS: Understanding Cloud-based SaaS Models

Originally published on the AWS APN Blog.

Introduction

You’re a startup with an idea for a revolutionary new software product. You quickly build a beta version and deploy it to the cloud. After a successful social-marketing campaign and concerted sales effort, dozens of customers subscribe to your SaaS-based product. You’re ecstatic…until you realize you never architected your product for this level of success. You were so busy coding, raising capital, marketing, and selling, you never planned how you would scale your Sass product. How you would ensure your customer’s security, as well as your own. How you would meet the product reliability, compliance, and performance you promised. And, how you would monitor and meter your customer’s usage, no matter how fast you or they grew.

I’ve often heard budding entrepreneurs jest, if only success was their biggest problem. Certainly, success won’t be their biggest problem. For many, the problems come afterward, when they disappoint their customers by failing to deliver the quality product they promised. Or worse, damaging their customer’s reputation (and their own) by losing or exposing sensitive data. As the old saying goes, ‘you never get a second chance to make a first impression.’ Customer trust is hard-earned and easily lost. Properly architecting a scalable and secure SaaS-based product is just as important as feature development and sales. No one wants to fail on Day 1—you worked too hard to get there.

Architecting a Successful SaaS

In this series of posts, Architecting a Successful SaaS, we will explore how to properly plan and architect a SaaS product offering, designed for hosting on the cloud. We will start by answering basic questions, like, what is SaaS, what are the alternatives to SaaS for software distribution, and what are the most common SaaS product models. We will then examine different high-level SaaS architectures, review tenant isolation strategies, and explore how SaaS vendors securely interact with their customer’s cloud accounts. Finally, we will discuss how SaaS providers can meet established best practices, like those from AWS SaaS Factory and the AWS Well-Architected Framework.

For this post, I have chosen many examples of cloud services from AWS and vendors from AWS Marketplace. However, the principals discussed may be applied to other leading cloud providers, SaaS products, and cloud-based software marketplaces. All information in this post is publicly available.

What is SaaS?

According to AWS Marketplace, ‘SaaS [Software as a Service] is a delivery model for software applications whereby the vendor hosts and operates the application over the Internet. Customers pay for using the software without owning the underlying infrastructure.’ Another definition from AWS, ‘SaaS is a licensing and delivery model whereby software is centrally managed and hosted by a provider and available to customers on a subscription basis.’

A SaaS product, like other forms of software, is produced by what is commonly referred to as an Independent Software Vendor (ISV). According to Wikipedia, an Independent Software Vendor ‘is an organization specializing in making and selling software, as opposed to hardware, designed for mass or niche markets. This is in contrast to in-house software, which is developed by the organization that will use it, or custom software, which is designed or adapted for a single, specific third party. Although ISV-provided software is consumed by end-users, it remains the property of the vendor.’

Although estimates vary greatly, according to The Software as a Service (SaaS) Global Market Report 2020, the global SaaS market was valued at about $134.44B in 2018 and is expected to grow to $220.21B at a compound annual growth rate (CAGR) of 13.1% through 2022. Statista predicts SaaS revenues will grow even faster, forecasting revenues of $266B by 2022, with continued strong positive growth to $346B by 2027.

Cloud-based Usage Models

Let’s start by reviewing the three most common ways that individuals, businesses, academic institutions, the public sector, and government consume services from cloud providers such as Amazon Web Services (AWS), Microsoft Azure, Google Cloud, and IBM Cloud (now includes Red Hat).

Indirect Consumer

Indirect consumers are customers who consume cloud-based SaaS products. Indirect users are often unlikely to know which cloud provider host’s the SaaS products to which they subscribe. Many SaaS products can import and export data, as well as integrate with other SaaS products. Many successful companies run their entire business in the cloud using a combination of SaaS products from multiple vendors.

SaaS-28

Examples

  • An advertising firm that uses Google G Suite for day-to-day communications and collaboration between its employees and clients.
  • A large automotive parts manufacturer that runs its business using the Workday cloud-based Enterprise Resource Management (ERP) suite.
  • A software security company that uses Zendesk for customer support. They also use the Slack integration for Zendesk to view, create, and take action on support tickets, using Slack channels.
  • A recruiting firm that uses Zoom Meetings & Chat to interview remote candidates. They also use the Zoom integration with Lever recruiting software, to schedule interviews.

Direct Consumer

Direct consumers are customers who use cloud-based Infrastructure as a Service (IaaS) and Platform as a Service (PaaS) services to build and run their software; the DIY (do it yourself) model. The software deployed in the customer’s account may be created by the customer or purchased from a third-party software vendor and deployed within the customer’s cloud account. Direct users may purchase IaaS and PaaS services from multiple cloud providers.

SaaS-18

Examples

Hybrid Consumers

Hybrid consumers are customers who use a combination of IaaS, PaaS, and SaaS services. Customers often connect multiple IaaS, PaaS, and SaaS services as part of larger enterprise software application platforms.

SaaS-27

Examples

  • A payroll company that hosts its proprietary payroll software product, using IaaS products like Amazon EC2 and Elastic Load Balancing. In addition, they use an integrated SaaS-based fraud detection product, like Cequence Security CQ botDefense, to ensure the safety and security of payroll customers.
  • An online gaming company that operates its applications using the fully-managed container-based PaaS service, Amazon ECS. To promote their gaming products, they use a SaaS-based marketing product, like Mailchimp Marketing CRM.

Cloud-based Software

Most cloud-based software is sold in one of two ways, Customer-deployed or SaaS. Below, we see a breakdown by the method of product delivery on AWS Marketplace. All items in the chart, except SaaS, represent Customer-deployed products. Serverless applications are available elsewhere on AWS and are not represented in the AWS Marketplace statistics.

DeliveryTypes
AWS Marketplace: All Products – Delivery Methods (February 2020)

Customer-deployed

An ISV who sells customer-deployed software products to consumers of cloud-based IaaS and PaaS services. Products are installed by the customer, Systems Integrator (SI), or the ISV into the customer’s cloud account. Customer-deployed products are reminiscent of traditional ‘boxed’ software.

Customers typically pay a reoccurring hourly, monthly, or annual subscription fee for the software product, commonly referred to as pay-as-you-go (PAYG). The subscription fee paid to the vendor is in addition to the fees charged to the customer by the cloud service provider for the underlying compute resources on which the customer-deployed product runs in the customer’s cloud account.

Some customer-deployed products may also require a software license. Software licenses are often purchased separately through other channels. Applying a license you already own to a newly purchased product is commonly referred to as bring your own license (BYOL). BYOL is common in larger enterprise customers, who may have entered into an Enterprise License Agreement (ELA) with the ISV.

PlanTypesCD
AWS Marketplace: Customer-deployed Product Subscription Types (February 2020)

Customer-deployed cloud-based software products can take a variety of forms. The most common deliverables include some combination of virtual machines (VMs) such as Amazon Machine Images (AMIs), Docker images, Amazon SageMaker models, or Infrastructure as Code such as AWS CloudFormationHashiCorp Terraform, or Helm Charts. Customers usually pull these deliverables from a vendor’s AWS account or other public or private source code or binary repositories. Below, we see the breakdown of customer-deployed products, by the method of delivery, on AWS Marketplace.

DeliveryTypesCD2
AWS Marketplace: Customer-deployed Product Delivery Methods (February 2020)

Although historically, AMIs have been the predominant method of customer-deployed software delivery, newer technologies, such as Docker images, serverless, SageMaker models, and AWS Data Exchange datasets will continue to grow in this segment. The AWS Serverless Application Repository (SAR), currently contains over 500 serverless applications, not reflected in this chart. AWS appears to be moving toward making it easier to sell serverless software applications in AWS Marketplace, according to one recent post.

Customer-deployed cloud-based software products may require a connection between the installed product and the ISV for product support, license verification, product upgrades, or security notifications.

SaaS-17

Examples

SaaS

An ISV who sells SaaS software products to customers. The SaaS product is deployed, managed, and sold by the ISV and hosted by a cloud provider, such as AWS. A SaaS product may or may not interact with a customer’s cloud account. SaaS products are similar to customer-deployed products with respect to their subscription-based fee structure. Subscriptions may be based on a unit of measure, often a period of time. Subscriptions may also be based on the number of users, requests, hosts, or the volume of data.

AWS Marketplace: SaaS Products - Delivery Methods (February 2020)
AWS Marketplace: SaaS Products – Pricing Plans (February 2020)

A significant difference between SaaS products and customer-deployed products is the lack of direct customer costs from the underlying cloud provider. The underlying costs are bundled into the subscription fee for the SaaS product.

Similar to Customer-deployed products, SaaS products target both consumers and businesses. SaaS products span a wide variety of consumer, business, industry-specific, and technical categories. AWS Marketplace offers products from vendors covering eight major categories and over 70 sub-categories.

SaaSCats
AWS Marketplace: SaaS Product Categories (February 2020)

SaaS Product Variants

I regularly work with a wide variety of cloud-based software vendors. In my experience, most cloud-based SaaS products fit into one of four categories, based on the primary way a customer interacts with the SaaS product:

  • Stand-alone: A SaaS product that has no interaction with the customer’s cloud account;
  • Data Access: A SaaS product that connects to the customer’s cloud account to only obtain data;
  • Augmentation: A SaaS product that connects to the customer’s cloud account, interacting with and augmenting the customer’s software;
  • Discrete Service: A variation of augmentation, a SaaS product that provides a discrete service or function as opposed to a more complete software product;

Stand-alone

A stand-alone SaaS product has no interaction with a customer’s cloud account. Customers of stand-alone SaaS products interact with the product through an interface provided by the SaaS vendor. Many stand-alone SaaS products can import and export customer data, as well as integrate with other cloud-based SaaS products. Stand-alone SaaS products may target consumers, known as Business-to-Consumer (B2C SaaS). They may also target businesses, known as Business-to-Business (B2B SaaS).

SaaS-29

Examples

Data Access

A SaaS product that connects to a customer’s data sources in their cloud account or on-prem. These SaaS products often fall into the categories of Big Data and Data Analytics, Machine Learning and Artificial Intelligence, and IoT (Internet of Things). Products in these categories work with large quantities of data. Given the sheer quantity of data or real-time nature of the data, importing or manually inputting data directly into the SaaS product, through the SaaS vendor’s user interface is unrealistic. Often, these SaaS products will cache some portion of the customer’s data to reduce customer’s data transfer costs.

Similar to the previous stand-alone SaaS products, customers of these SaaS products interact with the product thought a user interface provided by the SaaS vendor.

SaaS-14

Examples

  • Zepl provides an enterprise data science analytics platform, which enables data exploration, analysis, and collaboration. Zepl sells its Zepl Science and Analytics Platform SaaS product on AWS Marketplace. The Zepl product provides integration to many types of customer data sources including Snowflake, Amazon S3, Amazon Redshift, Amazon Athena, Google BigQuery, Apache Cassandra (Amazon MCS), and other SQL databases.
  • Sisense provides an enterprise-grade, cloud-native business intelligence and analytics platform, powered by AI. Sisense offers its Sisense Business Intelligence SaaS product on AWS Marketplace. This product lets customers prepare and analyze disparate big datasets using Sisense’s Data Connectors. The wide array of connectors provide connectivity to dozens of different cloud-based and on-prem data sources.
  • Databricks provides a unified data analytics platform, designed for massive-scale data engineering and collaborative data science. Databricks offers its Databricks Unified Analytics Platform SaaS product on AWS Marketplace. Databricks allows customers to interact with data across many different data sources, data storage types, and data types, including batch and streaming.
  • DataRobot provides an enterprise AI platform, which enables global enterprises to collaboratively harness the power of AI. DataRobot sells its DataRobot Automated Machine Learning for AWS SaaS product on AWS Marketplace. Using connectors, like Skyvia’s OData connector, customers can connect their data sources to the DataRobot product.

Augmentation

A SaaS product that interacts with, or augments a customer’s application, which is managed by the customer in their own cloud account. These SaaS products often maintain secure, loosely-coupled, unidirectional or bidirectional connections between the vendor’s SaaS product and the customer’s account. Vendors on AWS often use services like Amazon EventBridgeAWS PrivateLink, VPC Peering, Amazon S3, Amazon Kinesis, Amazon SQS, and Amazon SNS to interact with customer’s accounts and exchange data. Often, these SaaS products fall within the categories of Security, Logging and Monitoring, and DevOps.

Customers of these types of SaaS products generally interact with their own software, as well as the SaaS product thought an interface provided by the SaaS vendor.

SaaS-24

Examples

  • CloudCheckr provides solutions that enable clients to optimize costs, security, and compliance on leading cloud providers. CloudCheckr sells its Cloud Management Platform SaaS product on AWS Marketplace. CloudCheckr uses an AWS IAM cross-account role and Amazon S3 to exchange data between the customer’s account and their SaaS product.
  • Splunk provides the leading software platform for real-time Operational Intelligence. Splunk sells its Splunk Cloud SaaS product on AWS Marketplace. Splunk Cloud enables rapid application troubleshooting, ensures security and compliance, and provides monitoring of business-critical services in real-time. According to their documentation, Splunk uses a combination of AWS S3, Amazon SQS, and Amazon SNS services to transfer AWS CloudTrail logs from the customer’s accounts to Splunk Cloud.

Discrete Service

Discrete SaaS products are a variation of SaaS augmentation products. Discrete SaaS products provide specific, distinct functionality to a customer’s software application. These products may be an API, data source, or machine learning model, which is often accessed completely through a vendor’s API. The products have a limited or no visual user interface. These SaaS products are sometimes referred to as a ‘Service as a Service’. Discrete SaaS products often fall into the categories of Artificial Intelligence and Machine Learning, Financial Services, Reference Data, and Authentication and Authorization.

SaaS-30

Examples

AWS Data Exchange

There is a new category of products on AWS Marketplace. Released in November 2019, AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. According to AWS, Data Exchange vendors can publish new data, as well as automatically publish revisions to existing data and notify subscribers. Once subscribed to a data product, customers can use the AWS Data Exchange API to load data into Amazon S3 and then analyze it with a wide variety of AWS analytics and machine learning services.

SaaS-25

Data Exchange seems to best fit the description of a customer-deployed product. However, given the nature of the vendor-subscriber relationship, where data may be regularly exchanged—revised and published by the vendor and pulled by the subscriber—I would consider Data Exchange a cloud-based hybrid product.

AWS Data Exchange products are available on AWS Marketplace. The list of qualified data providers is growing and includes Reuters, Foursquare, TransUnion, Pitney Bowes, IMDb, Epsilon, ADP, Dun & Bradstreet, and others. As illustrated below, data sets are available in the categories of financial services, public sector, healthcare, media, telecommunications, and more.

DataTypes
AWS Marketplace: Data Exchange Product Categories (February 2020)

Examples

Conclusion

In this first post, we’ve become familiar with the common ways in which customers consume cloud-based IaaS, PaaS, and SaaS products and services. We also explored the different ways in which ISVs sell their software products to customers. In future posts, we will examine different high-level SaaS architectures, review tenant isolation strategies, and explore how SaaS vendors securely interact with their customer’s cloud accounts. Finally, we will discuss how SaaS providers can meet best-practices, like those from AWS SaaS Factory and the AWS Well-Architected Framework.

References

Here are some great references to learn more about building and managing SaaS products on AWS.

This blog represents my own view points and not of my employer, Amazon Web Services.

, , , , , ,

Leave a comment

Using Amazon Polly Text-to-Speech Service to Expand your Blog’s Audience

Introduction

Writing a blog about your latest product’s features? Case studies on your latest customer integrations? Opinions and insights into your industry, or your customer’s industries? Well written blog posts not only inform, they can also showcase you or your organization’s experience and expertise. However, if you blog on a regular basis, you know there is a lot of content out there to steal a reader’s interest. Your audience’s interest in your post can be short-lived.

A great way to extend the reach of your posts and maintain interest longer is to produce an audio version. Audio versions can be included along with the written post. Audio versions may also be shared separately on popular services such as YouTube and SoundCloud. Many multi-tasking readers may prefer to listen to a post as opposed to reading. I often listen while commuting, working out, or running.

screen_shot_2020-03-26_at_9.55.47_pm

screen_shot_2020-03-26_at_10.24.53_pm

Amazon Polly’s text-to-speech (TTS) capabilities make it easy to convert a written post to a lifelike, professionally-sounding audio version. In this brief post, we will learn how to use Amazon Polly to convert blog posts to audio.

screen_shot_2020-03-25_at_3.58.19_pm

Preparing Post for Audio

Most posts only require minor modifications to optimize them for conversion to audio.

  • Adding an opening statement to your post, like ‘Audio version of the post’, followed by the post’s title, provides a good way to start the audio. For example, ‘Audio Introduction to: Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker’.
  • If your post includes code, you probably want to exclude those sections. If the post contains a large amount of code, you might consider only creating an audio introduction to the post.
  • If your post contains graphs or charts, which are referenced in the post, I suggest adding text, such as ‘See the Chart’, along with the caption of the graph or chart. For example, ‘See the Chart – AWS Marketplace: Product Delivery Methods (February 2020)’.
  • Create a simple URL for your post and add it to the audio, either at the beginning or end of the post. For example, ‘To read the full version of this post, including code samples, please go to tiny url dot com forward slash streaming warehouse’.

Custom Lexicons

Amazon Polly offers the ability to use custom lexicons, or vocabularies. According to AWS, you can modify the pronunciation of particular words, such as company names, acronyms, foreign words, and neologisms. If you write industry-specific or highly technical blogs, you will find creating a lexicon is probably necessary to ensure your accompanying audio sounds accurate. In my own technical posts, I most often use a custom lexicon file for acronyms and company names. While many acronyms are spelled out, others are not and have unique pronunciations. Likewise, many company names have a unique pronunciation.

Take for example the following acronyms, which I used in my last few posts: PaaS, BYOL, ELA, PAYG, IPv4, IPv6, IAM, ENI. Using the default lexicon of Amazon Polly, we end up with incorrect pronunciations for all these acronyms.

Now listen to the pronunciation of the same acronyms, after we apply a custom lexicon.

Lexicons must conform to the Pronunciation Lexicon Specification (PLS) W3C recommendation. The lexicon files are in XML format. Below is a snippet of a sample lexicon files.

<?xml version="1.0" encoding="UTF-8"?>
<lexicon
version="1.0"
xmlns="http://www.w3.org/2005/01/pronunciation-lexicon"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.w3.org/2005/01/pronunciation-lexicon
http://www.w3.org/TR/2007/CR-pronunciation-lexicon-20071212/pls.xsd"
alphabet="ipa"
xml:lang="en-US">
<lexeme>
<grapheme>PaaS</grapheme>
<alias>pass</alias>
</lexeme>
<lexeme>
<grapheme>BYOL</grapheme>
<alias>b-y-o-l</alias>
</lexeme>
<lexeme>
<grapheme>ELA</grapheme>
<alias>e-l-a</alias>
</lexeme>
<lexeme>
<grapheme>PAYG</grapheme>
<alias>p-a-y-g</alias>
</lexeme>
<lexeme>
<grapheme>IPv4</grapheme>
<alias>i-p-v-4</alias>
</lexeme>
<lexeme>
<grapheme>IPv6</grapheme>
<alias>i-p-v-6</alias>
</lexeme>
<lexeme>
<grapheme>IAM</grapheme>
<alias>i-a-m</alias>
</lexeme>
<lexeme>
<grapheme>ENI</grapheme>
<alias>e-n-i</alias>
</lexeme>
</lexicon>

view raw
lexicon.xml
hosted with ❤ by GitHub

Amazon Polly Console

Amazon Polly supports synthesizing speech from either plain text or SSML input. From the Amazon Polly’s Management Console, copy and paste your post’s prepared text into the ‘Plain text’ tab.

Next, choose the Voice Engine. If you are using English, I suggest ‘Neural’. According to AWS, Amazon Polly has a Neural TTS (NTTS) system that can produce even higher quality voices than its standard voices. The NTTS system produces the most natural and human-like text-to-speech voices possible.

Choose your Language and Region. Then, select your Voice; I prefer ‘Joanna’. In my opinion, her voice has a natural, lifelike sound. If you prefer a male voice, ‘Matthew’ is quite natural sounding. Lastly, upload your lexicon file(s).

screen_shot_2020-03-25_at_4.00.52_pm

To start the process, choose ‘Synthesize to S3’. Indicate the S3 bucket you would like the mp3 format audio file, output into. You can also add a prefix to the mp3 files. For most average length posts, the text synthesis process takes less than one minute. To be notified, you can include an Amazon SNS topic ARN. Select ‘Synthesize’.

screen_shot_2020-03-25_at_4.04.35_pm

Polly creates a synthesis task.

screen_shot_2020-03-25_at_4.04.42_pm

The synthesis tasks may be viewed from the ‘S3 synthesis tasks’ tab.

screen_shot_2020-03-25_at_4.05.42_pm

Once the synthesis task is complete, the resulting mp3 audio file may be viewed and downloaded from the S3 Management Console. If you are using a Mac, QuickTime Player works great to review the audio file.

screen_shot_2020-03-25_at_4.06.22_pm

AWS CLI and SDK

Amazon Polly may also be used from the AWS CLI or using the AWS SDK. In the example below, we have replicated the same operations performed in the Console, this time using the AWS CLI. First, upload your lexicon file(s) using the polly put-lexicon command. Each lexicon can only be up to 4,000 characters in size. Then call the polly start-speech-synthesis-task command to create a synthesis task.

TEXT_FILE_CONTENTS=$(cat path/to/my/blog_text_file.txt)
OUTPUT_BUCKET=my_bucket_name
TOPIC=blog
aws polly put-lexicon \
–name blogvocab \
–content file://path/to/my/blogvocab.pls
aws polly put-lexicon \
–name techterms \
–content file://path/to/my/techterms.pls
aws polly start-speech-synthesis-task \
–engine neural \
–language-code en-US \
–lexicon-names blogvocab techterms \
–output-format mp3 \
–output-s3-bucket-name ${OUTPUT_BUCKET} \
–output-s3-key-prefix ${TOPIC} \
–text ${TEXT_FILE_CONTENTS} \
–text-type text \
–voice-id Joanna

view raw
polly.sh
hosted with ❤ by GitHub

The output should look similar to the screengrab, below. The results will be identical to using the Console.

screen_shot_2020-03-25_at_10.56.05_pm

You can check the task’s results using the polly list-speech-synthesis-tasks command.

Conclusion

In this brief post, we saw a great use case for Amazon Polly, converting your written blog posts into audio. Creating audio versions of our blogs is a great way to extend the reach of the post to a potentially new audience and maintain your current audience’s interest a little longer. Amazon Polly has several other features and capabilities to explore.

This blog represents my own view points and not of my employer, Amazon Web Services.

, , , , ,

2 Comments

Streaming Analytics with Data Warehouses, using Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight

Introduction

Databases are ideal for storing and organizing data that requires a high volume of transaction-oriented query processing while maintaining data integrity. In contrast, data warehouses are designed for performing data analytics on vast amounts of data from one or more disparate sources. In our fast-paced, hyper-connected world, those sources often take the form of continuous streams of web application logs, e-commerce transactions, social media feeds, online gaming activities, financial trading transactions, and IoT sensor readings. Streaming data must be analyzed in near real-time, while often first requiring cleansing, transformation, and enrichment.

In the following post, we will demonstrate the use of Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight to analyze streaming data. We will simulate time-series data, streaming from a set of IoT sensors to Kinesis Data Firehose. Kinesis Data Firehose will write the IoT data to an Amazon S3 Data Lake, where it will then be copied to Redshift in near real-time. In Amazon Redshift, we will enhance the streaming sensor data with data contained in the Redshift data warehouse, which has been gathered and denormalized into a star schema.

Streaming-Kinesis-Redshift

In Redshift, we can analyze the data, asking questions like, what is the min, max, mean, and median temperature over a given time period at each sensor location. Finally, we will use Amazon Quicksight to visualize the Redshift data using rich interactive charts and graphs, including displaying geospatial sensor data.

screen_shot_2020-03-04_at_9.27.33_pm

Featured Technologies

The following AWS services are discussed in this post.

Amazon Kinesis Data Firehose

According to Amazon, Amazon Kinesis Data Firehose can capture, transform, and load streaming data into data lakes, data stores, and analytics tools. Direct Kinesis Data Firehose integrations include Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. Kinesis Data Firehose enables near real-time analytics with existing business intelligence (BI) tools and dashboards.

Amazon Redshift

According to Amazon, Amazon Redshift is the most popular and fastest cloud data warehouse. With Redshift, users can query petabytes of structured and semi-structured data across your data warehouse and data lake using standard SQL. Redshift allows users to query and export data to and from data lakes. Redshift can federate queries of live data from Redshift, as well as across one or more relational databases.

Amazon Redshift Spectrum

According to Amazon, Amazon Redshift Spectrum can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables. Redshift Spectrum tables are created by defining the structure for data files and registering them as tables in an external data catalog. The external data catalog can be AWS Glue or an Apache Hive metastore. While Redshift Spectrum is an alternative to copying the data into Redshift for analysis, we will not be using Redshift Spectrum in this post.

Amazon QuickSight

According to Amazon, Amazon QuickSight is a fully managed business intelligence service that makes it easy to deliver insights to everyone in an organization. QuickSight lets users easily create and publish rich, interactive dashboards that include Amazon QuickSight ML Insights. Dashboards can then be accessed from any device and embedded into applications, portals, and websites.

What is a Data Warehouse?

According to Amazon, a data warehouse is a central repository of information that can be analyzed to make better-informed decisions. Data flows into a data warehouse from transactional systems, relational databases, and other sources, typically on a regular cadence. Business analysts, data scientists, and decision-makers access the data through business intelligence tools, SQL clients, and other analytics applications.

Demonstration

Source Code

All the source code for this post can be found on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/kinesis-redshift-streaming-demo.git

CloudFormation

Use the two AWS CloudFormation templates, included in the project, to build two CloudFormation stacks. Please review the two templates and understand the costs of the resources before continuing.

The first CloudFormation template, redshift.yml, provisions a new Amazon VPC with associated network and security resources, a single-node Redshift cluster, and two S3 buckets.

The second CloudFormation template, kinesis-firehose.yml, provisions an Amazon Kinesis Data Firehose delivery stream, associated IAM Policy and Role, and an Amazon CloudWatch log group and two log streams.

Change the REDSHIFT_PASSWORD value to ensure your security. Optionally, change the REDSHIFT_USERNAME value. Make sure that the first stack completes successfully, before creating the second stack.

export AWS_DEFAULT_REGION=us-east-1
REDSHIFT_USERNAME=awsuser
REDSHIFT_PASSWORD=5up3r53cr3tPa55w0rd
# Create resources
aws cloudformation create-stack \
–stack-name redshift-stack \
–template-body file://cloudformation/redshift.yml \
–parameters ParameterKey=MasterUsername,ParameterValue=${REDSHIFT_USERNAME} \
ParameterKey=MasterUserPassword,ParameterValue=${REDSHIFT_PASSWORD} \
ParameterKey=InboundTraffic,ParameterValue=$(curl ifconfig.me -s)/32 \
–capabilities CAPABILITY_NAMED_IAM
# Wait for first stack to complete
aws cloudformation create-stack \
–stack-name kinesis-firehose-stack \
–template-body file://cloudformation/kinesis-firehose.yml \
–parameters ParameterKey=MasterUserPassword,ParameterValue=${REDSHIFT_PASSWORD} \
–capabilities CAPABILITY_NAMED_IAM

Review AWS Resources

To confirm all the AWS resources were created correctly, use the AWS Management Console.

Kinesis Data Firehose

In the Amazon Kinesis Dashboard, you should see the new Amazon Kinesis Data Firehose delivery stream, redshift-delivery-stream.

screen_shot_2020-03-02_at_3.58.12_pm

The Details tab of the new Amazon Kinesis Firehose delivery stream should look similar to the following. Note the IAM Role, FirehoseDeliveryRole, which was created and associated with the delivery stream by CloudFormation.

screen_shot_2020-03-02_at_6.30.25_pm

We are not performing any transformations of the incoming messages. Note the new S3 bucket that was created and associated with the stream by CloudFormation. The bucket name was randomly generated. This bucket is where the incoming messages will be written.

screen_shot_2020-03-02_at_6.31.13_pm

Note the buffer conditions of 1 MB and 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written in JSON format, using GZIP compression, to S3. These are the minimal buffer conditions, and as close to real-time streaming to Redshift as we can get.

screen_shot_2020-03-02_at_6.31.28_pm

Note the COPY command, which is used to copy the messages from S3 to the message table in Amazon Redshift. Kinesis uses the IAM Role, ClusterPermissionsRole, created by CloudFormation, for credentials. We are using a Manifest to copy the data to Redshift from S3. According to Amazon, a Manifest ensures that the COPY command loads all of the required files, and only the required files, for a data load. The Manifests are automatically generated and managed by the Kinesis Firehose delivery stream.

screen_shot_2020-03-02_at_6.31.43_pm

Redshift Cluster

In the Amazon Redshift Console, you should see a new single-node Redshift cluster consisting of one Redshift dc2.large Dense Compute node type.

screen_shot_2020-03-02_at_7.09.35_pm

Note the new VPC, Subnet, and VPC Security Group created by CloudFormation. Also, observe that the Redshift cluster is publicly accessible from outside the new VPC.

screen_shot_2020-03-02_at_7.09.41_pm

Redshift Ingress Rules

The single-node Redshift cluster is assigned to an AWS Availability Zone in the US East (N. Virginia) us-east-1 AWS Region. The cluster is associated with a VPC Security Group. The Security Group contains three inbound rules, all for Redshift port 5439. The IP addresses associated with the three inbound rules provide access to the following: 1) a /27 CIDR block for Amazon QuickSight in us-east-1, a /27 CIDR block for Amazon Kinesis Firehose in us-east-1, and to you, a /32 CIDR block with your current IP address. If your IP address changes or you do not use the us-east-1 Region, you will need to change one or all of these IP addresses. The list of Kinesis Firehose IP addresses is here. The list of QuickSight IP addresses is here.

screen_shot_2020-03-02_at_7.09.59_pm

If you cannot connect to Redshift from your local SQL client, most often, your IP address has changed and is incorrect in the Security Group’s inbound rule.

Redshift SQL Client

You can choose to use the Redshift Query Editor to interact with Redshift or use a third-party SQL client for greater flexibility. To access the Redshift Query Editor, use the user credentials specified in the redshift.yml CloudFormation template.

screen_shot_2020-03-02_at_4.01.49_pm

There is a lot of useful functionality in the Redshift Console and within the Redshift Query Editor. However, a notable limitation of the Redshift Query Editor, in my opinion, is the inability to execute multiple SQL statements at the same time. Whereas, most SQL clients allow multiple SQL queries to be executed at the same time.

screen_shot_2020-03-04_at_8.48.39_am

I prefer to use JetBrains PyCharm IDE. PyCharm has out-of-the-box integration with Redshift. Using PyCharm, I can edit the project’s Python, SQL, AWS CLI shell, and CloudFormation code, all from within PyCharm.

screen_shot_2020-03-02_at_7.12.11_pm

If you use any of the common SQL clients, you will need to set-up a JDBC (Java Database Connectivity) or ODBC (Open Database Connectivity) connection to Redshift. The ODBC and JDBC connection strings can be found in the Redshift cluster’s Properties tab or in the Outputs tab from the CloudFormation stack, redshift-stack.

screen_shot_2020-03-04_at_9.07.14_am

You will also need the Redshift database username and password you included in the aws cloudformation create-stack AWS CLI command you executed previously. Below, we see PyCharm’s Project Data Sources window containing a new data source for the Redshift dev database.

screen_shot_2020-03-02_at_6.33.01_pm

Database Schema and Tables

When CloudFormation created the Redshift cluster, it also created a new database, dev. Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL commands to create a new database schema, sensor, and six tables in the sensor schema.

Create new schema in Redshift DB
DROP SCHEMA IF EXISTS sensor CASCADE;
CREATE SCHEMA sensor;
SET search_path = sensor;
Create (6) tables in Redshift DB
CREATE TABLE message streaming data table
(
id BIGINT IDENTITY (1, 1), message id
guid VARCHAR(36) NOT NULL, device guid
ts BIGINT NOT NULL DISTKEY SORTKEY, epoch in seconds
temp NUMERIC(5, 2) NOT NULL, temperature reading
created TIMESTAMP DEFAULT ('now'::text)::timestamp with time zone row created at
);
CREATE TABLE location dimension table
(
id INTEGER NOT NULL DISTKEY SORTKEY, location id
long NUMERIC(10, 7) NOT NULL, longitude
lat NUMERIC(10, 7) NOT NULL, latitude
description VARCHAR(256) location description
);
CREATE TABLE history dimension table
(
id INTEGER NOT NULL DISTKEY SORTKEY, history id
serviced BIGINT NOT NULL, service date
action VARCHAR(20) NOT NULL, INSTALLED, CALIBRATED, FIRMWARE UPGRADED, DECOMMISSIONED, OTHER
technician_id INTEGER NOT NULL, technician id
notes VARCHAR(256) notes
);
CREATE TABLE sensor dimension table
(
id INTEGER NOT NULL DISTKEY SORTKEY, sensor id
guid VARCHAR(36) NOT NULL, device guid
mac VARCHAR(18) NOT NULL, mac address
sku VARCHAR(18) NOT NULL, product sku
upc VARCHAR(12) NOT NULL, product upc
active BOOLEAN DEFAULT TRUE, active status
notes VARCHAR(256) notes
);
CREATE TABLE manufacturer dimension table
(
id INTEGER NOT NULL DISTKEY SORTKEY, manufacturer id
name VARCHAR(100) NOT NULL, company name
website VARCHAR(100) NOT NULL, company website
notes VARCHAR(256) notes
);
CREATE TABLE sensors fact table
(
id BIGINT IDENTITY (1, 1) DISTKEY SORTKEY, fact id
sensor_id INTEGER NOT NULL, sensor id
manufacturer_id INTEGER NOT NULL, manufacturer id
location_id INTEGER NOT NULL, location id
history_id BIGINT NOT NULL, history id
message_guid VARCHAR(36) NOT NULL sensor guid
);

Star Schema

The tables represent denormalized data, taken from one or more relational database sources. The tables form a star schema.  The star schema is widely used to develop data warehouses. The star schema consists of one or more fact tables referencing any number of dimension tables. The location, manufacturer, sensor, and history tables are dimension tables. The sensors table is a fact table.

In the diagram below, the foreign key relationships are virtual, not physical. The diagram was created using PyCharm’s schema visualization tool. Note the schema’s star shape. The message table is where the streaming IoT data will eventually be written. The message table is related to the sensors fact table through the common guid field.

schema-light-2

Sample Data to S3

Next, copy the sample data, included in the project, to the S3 data bucket created with CloudFormation. Each CSV-formatted data file corresponds to one of the tables we previously created. Since the bucket name is semi-random, we can use the AWS CLI and jq to get the bucket name, then use it to perform the copy commands.

# Get data bucket name
DATA_BUCKET=$(aws cloudformation describe-stacks \
–stack-name redshift-stack \
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "DataBucket") | .OutputValue')
echo $DATA_BUCKET
# Copy data
aws s3 cp data/history.csv s3://${DATA_BUCKET}/history/history.csv
aws s3 cp data/location.csv s3://${DATA_BUCKET}/location/location.csv
aws s3 cp data/manufacturer.csv s3://${DATA_BUCKET}/manufacturer/manufacturer.csv
aws s3 cp data/sensor.csv s3://${DATA_BUCKET}/sensor/sensor.csv
aws s3 cp data/sensors.csv s3://${DATA_BUCKET}/sensors/sensors.csv

The output from the AWS CLI should look similar to the following.

screen_shot_2020-03-02_at_7.18.22_pm

Sample Data to Redshift

Whereas a relational database, such as Amazon RDS is designed for online transaction processing (OLTP), Amazon Redshift is designed for online analytic processing (OLAP) and business intelligence applications. To write data to Redshift we typically use the COPY command versus frequent, individual INSERT statements, as with OLTP, which would be prohibitively slow. According to Amazon, the Redshift COPY command leverages the Amazon Redshift massively parallel processing (MPP) architecture to read and load data in parallel from files on Amazon S3, from a DynamoDB table, or from text output from one or more remote hosts.

In the following series of SQL statements, replace the placeholder, your_bucket_name, in five places with your S3 data bucket name. The bucket name will start with the prefix, redshift-stack-databucket. The bucket name can be found in the Outputs tab of the CloudFormation stack, redshift-stack. Next, replace the placeholder, cluster_permissions_role_arn, with the ARN (Amazon Resource Name) of the ClusterPermissionsRole. The ARN is formatted as follows, arn:aws:iam::your-account-id:role/ClusterPermissionsRole. The ARN can be found in the Outputs tab of the CloudFormation stack, redshift-stack.

Using the Redshift Query Editor or your SQL client of choice, execute the SQL statements to copy the sample data from S3 to each of the corresponding tables in the Redshift dev database. The TRUNCATE commands guarantee there is no previous sample data present in the tables.

** MUST FIRST CHANGE your_bucket_name and cluster_permissions_role_arn **
sensor schema
SET search_path = sensor;
Copy sample data to tables from S3
TRUNCATE TABLE history;
COPY history (id, serviced, action, technician_id, notes)
FROM 's3://your_bucket_name/history/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
CSV IGNOREHEADER 1;
TRUNCATE TABLE location;
COPY location (id, long, lat, description)
FROM 's3://your_bucket_name/location/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
CSV IGNOREHEADER 1;
TRUNCATE TABLE sensor;
COPY sensor (id, guid, mac, sku, upc, active, notes)
FROM 's3://your_bucket_name/sensor/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
CSV IGNOREHEADER 1;
TRUNCATE TABLE manufacturer;
COPY manufacturer (id, name, website, notes)
FROM 's3://your_bucket_name/manufacturer/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
CSV IGNOREHEADER 1;
TRUNCATE TABLE sensors;
COPY sensors (sensor_id, manufacturer_id, location_id, history_id, message_guid)
FROM 's3://your_bucket_name/sensors/'
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn'
CSV IGNOREHEADER 1;
SELECT COUNT(*) FROM history; 30
SELECT COUNT(*) FROM location; 6
SELECT COUNT(*) FROM sensor; 6
SELECT COUNT(*) FROM manufacturer; 1
SELECT COUNT(*) FROM sensors; 30

Database Views

Next, create four Redshift database Views. These views may be used to analyze the data in Redshift, and later, in Amazon QuickSight.

  1. sensor_msg_detail: Returns aggregated sensor details, using the sensors fact table and all five dimension tables in a SQL Join.
  2. sensor_msg_count: Returns the number of messages received by Redshift, for each sensor.
  3. sensor_avg_temp: Returns the average temperature from each sensor, based on all the messages received from each sensor.
  4. sensor_avg_temp_current: View is identical for the previous view but limited to the last 30 minutes.

Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL statements.

sensor schema
SET search_path = sensor;
View 1: Sensor details
DROP VIEW IF EXISTS sensor_msg_detail;
CREATE OR REPLACE VIEW sensor_msg_detail AS
SELECT ('1970-01-01'::date + e.ts * interval '1 second') AS recorded,
e.temp,
s.guid,
s.sku,
s.mac,
l.lat,
l.long,
l.description AS location,
('1970-01-01'::date + h.serviced * interval '1 second') AS installed,
e.created AS redshift
FROM sensors f
INNER JOIN sensor s ON (f.sensor_id = s.id)
INNER JOIN history h ON (f.history_id = h.id)
INNER JOIN location l ON (f.location_id = l.id)
INNER JOIN manufacturer m ON (f.manufacturer_id = m.id)
INNER JOIN message e ON (f.message_guid = e.guid)
WHERE s.active IS TRUE
AND h.action = 'INSTALLED'
ORDER BY f.id;
View 2: Message count per sensor
DROP VIEW IF EXISTS sensor_msg_count;
CREATE OR REPLACE VIEW sensor_msg_count AS
SELECT count(e.temp) AS msg_count,
s.guid,
l.lat,
l.long,
l.description AS location
FROM sensors f
INNER JOIN sensor s ON (f.sensor_id = s.id)
INNER JOIN history h ON (f.history_id = h.id)
INNER JOIN location l ON (f.location_id = l.id)
INNER JOIN message e ON (f.message_guid = e.guid)
WHERE s.active IS TRUE
AND h.action = 'INSTALLED'
GROUP BY s.guid, l.description, l.lat, l.long
ORDER BY msg_count, s.guid;
View 3: Average temperature per sensor (all data)
DROP VIEW IF EXISTS sensor_avg_temp;
CREATE OR REPLACE VIEW sensor_avg_temp AS
SELECT avg(e.temp) AS avg_temp,
count(s.guid) AS msg_count,
s.guid,
l.lat,
l.long,
l.description AS location
FROM sensors f
INNER JOIN sensor s ON (f.sensor_id = s.id)
INNER JOIN history h ON (f.history_id = h.id)
INNER JOIN location l ON (f.location_id = l.id)
INNER JOIN message e ON (f.message_guid = e.guid)
WHERE s.active IS TRUE
AND h.action = 'INSTALLED'
GROUP BY s.guid, l.description, l.lat, l.long
ORDER BY avg_temp, s.guid;
View 4: Average temperature per sensor (last 30 minutes)
DROP VIEW IF EXISTS sensor_avg_temp_current;
CREATE OR REPLACE VIEW sensor_avg_temp_current AS
SELECT avg(e.temp) AS avg_temp,
count(s.guid) AS msg_count,
s.guid,
l.lat,
l.long,
l.description AS location
FROM sensors f
INNER JOIN sensor s ON (f.sensor_id = s.id)
INNER JOIN history h ON (f.history_id = h.id)
INNER JOIN location l ON (f.location_id = l.id)
INNER JOIN (SELECT ('1970-01-01'::date + ts * interval '1 second') AS recorded_time,
guid,
temp
FROM message
WHERE DATEDIFF(minute, recorded_time, GETDATE()) <= 30) e ON (f.message_guid = e.guid)
WHERE s.active IS TRUE
AND h.action = 'INSTALLED'
GROUP BY s.guid, l.description, l.lat, l.long
ORDER BY avg_temp, s.guid;

At this point, you should have a total of six tables and four views in the sensor schema of the dev database in Redshift.

Test the System

With all the necessary AWS resources and Redshift database objects created and sample data in the Redshift database, we can test the system. The included Python script, kinesis_put_test_msg.py, will generate a single test message and send it to Kinesis Data Firehose. If everything is working, the message should be delivered from Kinesis Data Firehose to S3, then copied to Redshift, and appear in the message table.

Install the required Python packages and then execute the Python script.

# Install required Python packages
python3 -m pip install –user -r scripts/requirements.txt
# Set default AWS Region for script
export AWS_DEFAULT_REGION=us-east-1
# Execute script in foreground
python3 ./scripts/kinesis_put_test_msg.py

Run the following SQL query to confirm the record is in the message table of the dev database. It will take at least one minute for the message to appear in Redshift.

SELECT COUNT(*) FROM message;

Once the message is confirmed to be present in the message table, delete the record by truncating the table.

TRUNCATE TABLE message;

Streaming Data

Assuming the test message worked, we can proceed with simulating the streaming IoT sensor data. The included Python script, kinesis_put_streaming_data.py, creates six concurrent threads, representing six temperature sensors.

#!/usr/bin/env python3
# Simulated multiple streaming time-series iot sensor data
# Author: Gary A. Stafford
# Date: Revised October 2020
import json
import random
from datetime import datetime
import boto3
import time as tm
import numpy as np
import threading
STREAM_NAME = 'redshift-delivery-stream'
client = boto3.client('firehose')
class MyThread(threading.Thread):
def __init__(self, thread_id, sensor_guid, temp_max):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.sensor_id = sensor_guid
self.temp_max = temp_max
def run(self):
print("Starting Thread: " + str(self.thread_id))
self.create_data()
print("Exiting Thread: " + str(self.thread_id))
def create_data(self):
start = 0
stop = 20
step = 0.1 # step size (e.g 0 to 20, step .1 = 200 steps in cycle)
repeat = 2 # how many times to repeat cycle
freq = 60 # frequency of temperature reading in seconds
max_range = int(stop * (1 / step))
time = np.arange(start, stop, step)
amplitude = np.sin(time)
for x in range(0, repeat):
for y in range(0, max_range):
temperature = round((((amplitude[y] + 1.0) * self.temp_max) + random.uniform(5, 5)) + 60, 2)
payload = {
'guid': self.sensor_id,
'ts': int(datetime.now().strftime('%s')),
'temp': temperature
}
print(json.dumps(payload))
self.send_to_kinesis(payload)
tm.sleep(freq)
@staticmethod
def send_to_kinesis(payload):
_ = client.put_record(
DeliveryStreamName=STREAM_NAME,
Record={
'Data': json.dumps(payload)
}
)
def main():
sensor_guids = [
"03e39872-e105-4be4-83c0-9ade818465dc",
"fa565921-fddd-4bfb-a7fd-d617f816df4b",
"d120422d-5789-435d-9dc6-73d8489b04c2",
"93238559-4d55-4b2a-bdcb-6aa3be0f3908",
"dbc05806-6872-4f0a-aca2-f794cc39bd9b",
"f9ade639-f936-4954-aa5a-1f2ed86c9bcf"
]
timeout = 300 # arbitrarily offset the start of threads (60 / 5 = 12)
# Create new threads
thread1 = MyThread(1, sensor_guids[0], 25)
thread2 = MyThread(2, sensor_guids[1], 10)
thread3 = MyThread(3, sensor_guids[2], 7)
thread4 = MyThread(4, sensor_guids[3], 30)
thread5 = MyThread(5, sensor_guids[4], 5)
thread6 = MyThread(6, sensor_guids[5], 12)
# Start new threads
thread1.start()
tm.sleep(timeout * 1)
thread2.start()
tm.sleep(timeout * 2)
thread3.start()
tm.sleep(timeout * 1)
thread4.start()
tm.sleep(timeout * 3)
thread5.start()
tm.sleep(timeout * 2)
thread6.start()
# Wait for threads to terminate
thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
thread6.join()
print("Exiting Main Thread")
if __name__ == '__main__':
main()

The simulated data uses an algorithm that follows an oscillating sine wave or sinusoid, representing rising and falling temperatures. In the script, I have configured each thread to start with an arbitrary offset to add some randomness to the simulated data.

screen_shot_2020-03-04_at_9.27.33_pm

The variables within the script can be adjusted to shorten or lengthen the time it takes to stream the simulated data. By default, each of the six threads creates 400 messages per sensor, in one-minute increments. Including the offset start of each proceeding thread, the total runtime of the script is about 7.5 hours to generate 2,400 simulated IoT sensor temperature readings and push to Kinesis Data Firehose. Make sure you can guarantee you will maintain a connection to the Internet for the entire runtime of the script. I normally run the script in the background, from a small EC2 instance.

To use the Python script, execute either of the two following commands. Using the first command will run the script in the foreground. Using the second command will run the script in the background.

# Install required Python packages
python3 -m pip install –user -r scripts/requirements.txt
# Set default AWS Region for script
export AWS_DEFAULT_REGION=us-east-1
# Option #1: Execute script in foreground
python3 ./scripts/kinesis_put_streaming_data.py
# Option #2: execute script in background
nohup python3 -u ./scripts/kinesis_put_streaming_data.py > output.log 2>&1 </dev/null &
# Check that the process is running
ps -aux | grep 'python3 -u ./scripts/kinesis_put_streaming_data.py'
# Wait 1-2 minutes, then check output to confirm script is working
cat output.log

Viewing the output.log file, you should see messages being generated on each thread and sent to Kinesis Data Firehose. Each message contains the GUID of the sensor, a timestamp, and a temperature reading.

Screen Shot 2020-10-14 at 10.00.37 AM

The messages are sent to Kinesis Data Firehose, which in turn writes the messages to S3. The messages are written in JSON format using GZIP compression. Below, we see an example of the GZIP compressed JSON files in S3. The JSON files are partitioned by year, month, day, and hour.

screen_shot_2020-03-04_at_11.04.36_pm

Confirm Data Streaming to Redshift

From the Amazon Kinesis Firehose Console Metrics tab, you should see incoming messages flowing to S3 and on to Redshift.

screen_shot_2020-03-03_at_6.17.14_pm

Executing the following SQL query should show an increasing number of messages.

SELECT COUNT(*) FROM message;

How Near Real-time?

Earlier, we saw how the Amazon Kinesis Data Firehose delivery stream was configured to buffer data at the rate of 1 MB or 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written to S3. Each record in the message table has two timestamps. The first timestamp, ts, is when the temperature reading was recorded. The second timestamp, created, is when the message was written to Redshift, using the COPY command. We can calculate the delta in seconds between the two timestamps using the following SQL query in Redshift.

SELECT ('1970-01-01'::date + ts * interval '1 second') AS recorded_time,
created AS redshift_time,
DATEDIFF(seconds, recorded_time, redshift_time) AS diff_seconds
FROM message
ORDER BY diff_seconds;

Using the results of the Redshift query, we can visualize the results in Amazon QuickSight. In my own tests, we see that for 2,400 messages, over approximately 7.5 hours, the minimum delay was 1 second, and a maximum delay was 64 seconds. Hence, near real-time, in this case, is about one minute or less, with an average latency of roughly 30 seconds.

latency

Analyzing the Data with Redshift

I suggest waiting at least thirty minutes for a significant number of messages copied into Redshift. With the data streaming into Redshift, execute each of the database views we created earlier. You should see the streaming message data, joined to the existing static data in Redshift. As data continues to stream into Redshift, the views will display different results based on the current message table contents.

Here, we see the first ten results of the sensor_msg_detail view.


recorded temp guid sku mac lat long location installed redshift
2020-03-04 03:31:59.000000 105.56 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:33:01.580147
2020-03-04 03:29:59.000000 95.93 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:31:01.388887
2020-03-04 03:26:58.000000 91.93 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:28:01.099796
2020-03-04 03:25:58.000000 88.70 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:26:00.196113
2020-03-04 03:22:58.000000 87.65 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:23:01.558514
2020-03-04 03:20:58.000000 77.35 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:21:00.691347
2020-03-04 03:16:57.000000 71.84 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:17:59.307510
2020-03-04 03:15:57.000000 72.35 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:15:59.813656
2020-03-04 03:14:57.000000 67.95 03e39872-e105-4be4-83c0-9ade818465dc PR49-24A 8e:fa:46:09:14:b2 37.7068476 -122.4191599 Research Lab #2203 2018-01-31 12:00:00.000000 2020-03-04 03:15:59.813656

Next, we see the results of the sensor_avg_temp view.


avg_temp guid lat long location
65.25 dbc05806-6872-4f0a-aca2-f794cc39bd9b 37.7066541 -122.4181399 Wafer Inspection Lab #0210A
67.23 d120422d-5789-435d-9dc6-73d8489b04c2 37.7072686 -122.4187016 Zone 4 Wafer Processing Area B3
70.23 fa565921-fddd-4bfb-a7fd-d617f816df4b 37.7071763 -122.4190397 Research Lab #2209
72.22 f9ade639-f936-4954-aa5a-1f2ed86c9bcf 37.7067618 -122.4186191 Wafer Inspection Lab #0211C
85.48 03e39872-e105-4be4-83c0-9ade818465dc 37.7068476 -122.4191599 Research Lab #2203
90.69 93238559-4d55-4b2a-bdcb-6aa3be0f3908 37.7070334 -122.4184393 Zone 2 Semiconductor Assembly Area A2

Amazon QuickSight

In a recent post, Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 2, I detailed getting started with Amazon QuickSight. In this post, I will assume you are familiar with QuickSight.

Amazon recently added a full set of aws quicksight APIs for interacting with QuickSight. Though, for this part of the demonstration, we will be working directly in the Amazon QuickSight Console, as opposed to the AWS CLI, AWS CDK, or CloudFormation.

Redshift Data Sets

To visualize the data from Amazon Redshift, we start by creating Data Sets in QuickSight. QuickSight supports a large number of data sources for creating data sets. We will use the Redshift data source. If you recall, we added an inbound rule for QuickSight, allowing us to connect to our Redshift cluster in us-east-1.

screen_shot_2020-03-02_at_10.21.35_pm

We will select the sensor schema, which is where the tables and views for this demonstration are located.

screen_shot_2020-03-02_at_10.21.49_pm

We can choose any of the tables or views in the Redshift dev database that we want to use for visualization.

screen_shot_2020-03-02_at_10.22.11_pm

Below, we see examples of two new data sets, shown in the QuickSight Data Prep Console. Note how QuickSight automatically recognizes field types, including dates, latitude, and longitude.

screen_shot_2020-03-02_at_10.23.12_pm

screen_shot_2020-03-02_at_10.59.32_pm

Visualizations

Using the data sets, QuickSight allows us to create a wide number of rich visualizations. Below, we see the simulated time-series data from the six temperature sensors.

screen_shot_2020-03-04_at_9.26.26_pm

Next, we see an example of QuickSight’s ability to show geospatial data. The Map shows the location of each sensor and the average temperature recorded by that sensor.

screen_shot_2020-03-04_at_9.26.51_pm

Cleaning Up

To remove the resources created for this post, use the following series of AWS CLI commands.

# Get data bucket name
DATA_BUCKET=$(aws cloudformation describe-stacks \
–stack-name redshift-stack \
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "DataBucket") | .OutputValue')
echo ${DATA_BUCKET}
# Get log bucket name
LOG_BUCKET=$(aws cloudformation describe-stacks \
–stack-name redshift-stack \
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "LogBucket") | .OutputValue')
echo ${LOG_BUCKET}
# Delete demonstration resources
python3 ./scripts/delete_buckets.py
aws cloudformation delete-stack –stack-name kinesis-firehose-stack
# Wait for first stack to be deleted
aws cloudformation delete-stack –stack-name redshift-stack

Conclusion

In this brief post, we have learned how streaming data can be analyzed in near real-time, in Amazon Redshift, using Amazon Kinesis Data Firehose. Further, we explored how the results of those analyses can be visualized in Amazon QuickSight. For customers that depend on a data warehouse for data analytics, but who also have streaming data sources, the use of Amazon Kinesis Data Firehose or Amazon Redshift Spectrum is an excellent choice.

This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , , ,

Leave a comment

Event-driven, Serverless Architectures with AWS Lambda, SQS, DynamoDB, and API Gateway

Introduction

In this post, we will explore modern application development using an event-driven, serverless architecture on AWS. To demonstrate this architecture, we will integrate several fully-managed services, all part of the AWS Serverless Computing platform, including Lambda, API Gateway, SQS, S3, and DynamoDB. The result will be an application composed of small, easily deployable, loosely coupled, independently scalable, serverless components.

What is ‘Event-Driven’?

According to Otavio Ferreira, Manager, Amazon SNS, and James Hood, Senior Software Development Engineer, in their AWS Compute Blog, Enriching Event-Driven Architectures with AWS Event Fork Pipelines, “Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.” This description of an event-driven architecture perfectly captures the essence of the following post. All interactions between application components in this post will be as a direct result of triggering an event.

What is ‘Serverless’?

Mistakingly, many of us think of serverless as just functions (aka Function-as-a-Service or FaaS). When it comes to functions on AWS, Lambda is just one of many fully-managed services that make up the AWS Serverless Computing platform. So, what is ‘serverless’? According to AWS, “Serverless applications don’t require provisioning, maintaining, and administering servers for backend components such as compute, databases, storage, stream processing, message queueing, and more.

As a Developer, one of my favorite features of serverless is the cost, or lack thereof. With serverless on AWS, you pay for consistent throughput or execution duration rather than by server unit, and, at least on AWS, you don’t pay for idle resources. This is not always true of ‘serverless’ offerings on other leading Cloud platforms. Remember, if you’re paying for it but not using it, it’s not serverless.

If you’re paying for it but not using it, it’s not serverless.

Demonstration

To demonstrate an event-driven, serverless architecture, we will build, package, and deploy an application capable of extracting messages from CSV files placed in S3, transforming those messages, queueing them to SQS, and finally, writing the messages to DynamoDB, using Lambda functions throughout. We will also expose a RESTful API, via API Gateway, to perform CRUD-like operations on those messages in DynamoDB.

AWS Technologies

In this demonstration, we will use several AWS serverless services, including the following.

Each Lambda will use function-specific execution roles, part of AWS Identity and Access Management (IAM). We will log the event details and monitor services using Amazon CloudWatch.

To codify, build, package, deploy, and manage the Lambda functions and other AWS resources in a fully automated fashion, we will also use the following AWS services:

Architecture

The high-level architecture for the platform provisioned and deployed in this post is illustrated in the diagram below. There are two separate workflows. In the first workflow (top), data is extracted from CSV files placed in S3, transformed, queued to SQS, and written to DynamoDB, using Python-based Lambda functions throughout. In the second workflow (bottom), data is manipulated in DynamoDB through interactions with a RESTful API, exposed via an API Gateway, and backed by Node.js-based Lambda functions.

new-01-sqs-dynamodb

Using the vast array of current AWS services, there are several ways we could extract, transform, and load data from static files into DynamoDB. The demonstration’s event-driven, serverless architecture represents just one possible approach.

Source Code

All source code for this post is available on GitHub in a single public repository, serverless-sqs-dynamo-demo. To clone the GitHub repository, execute the following command.

git clone --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/serverless-sqs-dynamo-demo.git

The project files relevant to this demonstration are organized as follows.

.
├── README.md
├── lambda_apigtw_to_dynamodb
│   ├── app.js
│   ├── events
│   ├── node_modules
│   ├── package.json
│   └── tests
├── lambda_s3_to_sqs
│   ├── __init__.py
│   ├── app.py
│   ├── requirements.txt
│   └── tests
├── lambda_sqs_to_dynamodb
│   ├── __init__.py
│   ├── app.py
│   ├── requirements.txt
│   └── tests
├── requirements.txt
├── template.yaml
└── sample_data
    ├── data.csv
    ├── data_bad_msg.csv
    └── data_good_msg.csv

Some source code samples in this post are GitHub Gists, which may not display correctly on all social media browsers, such as LinkedIn.

Prerequisites

The demonstration assumes you already have an AWS account. You will need the latest copy of the AWS CLI, SAM CLI, and Python 3 installed on your development machine.

Additionally, you will need two existing S3 buckets. One bucket will be used to store the packaged project files for deployment. The second bucket is where we will place CSV data files, which, in turn, will trigger events that invoke multiple Lambda functions.

Deploying the Project

Before diving into the code, we will deploy the project to AWS. Conveniently, the entire project’s resources are codified in an AWS SAM template. We are using the AWS Serverless Application Model (SAM). AWS SAM is a model used to define serverless applications on AWS. According to the official SAM GitHub project documentation, AWS SAM is based on AWS CloudFormation. A serverless application is defined in a CloudFormation template and deployed as a CloudFormation stack.

Template Parameter

CloudFormation will create and uniquely name the SQS queues and the DynamoDB table. However, to avoid circular references, a common issue when creating resources associated with S3 event notifications, it is easier to use a pre-existing bucket. To start, you will need to change the SAM template’s DataBucketName parameter’s default value to your own S3 bucket name. Again, this bucket is where we will eventually push the CSV data files. Alternately, override the default values using the sam build command, next.

Parameters:
  DataBucketName:
    Type: String
    Description: S3 bucket where CSV files are processed
    Default: your-data-bucket-name

SAM CLI Commands

With the DataBucketName parameter set, proceed to validate, build, package, and deploy the project using the SAM CLI and the commands below. In addition to the sam validate command, I also like to use the aws cloudformation validate-template command to validate templates and catch any potential, additional errors.

Note the S3_BUCKET_BUILD variable, below, refers to the name of the S3 bucket SAM will use package and deploy the project from, as opposed to the S3 bucket, which the CSV data files will be placed into (gist).

# variables
S3_BUILD_BUCKET=your_build_bucket_name
STACK_NAME=your_cloudformation_stack_name
# validate
sam validate –template template.yaml
aws cloudformation validate-template \
–template-body file://template.yaml
# build
sam build –template template.yaml
# package
sam package \
–output-template-file packaged.yaml \
–s3-bucket $S3_BUILD_BUCKET
# deploy
sam deploy –template-file packaged.yaml \
–stack-name $STACK_NAME \
–capabilities CAPABILITY_IAM \
–debug

After validating the template, SAM will build and package each individual Lambda function and its associated dependencies. Below, we see each individual Lambda function being packaged with a copy of its dependencies.

screen_shot_2019-09-30_at_8_42_41_pm

Once packaged, SAM will deploy the project and create the AWS resources as a CloudFormation stack.

screen_shot_2019-09-30_at_8_43_11_pm

Once the stack creation is complete, use the CloudFormation management console to review the AWS resources created by SAM. There are approximately 14 resources defined in the SAM template, which result in 33 individual resources deployed as part of the CloudFormation stack.

screen_shot_2019-09-30_at_8_44_46_pm

Note the stack’s output values. You will need these values to interact with the deployed platform, later in the demonstration.

screen_shot_2019-09-30_at_8_45_13_pm

Test the Deployed Application

Once the CloudFormation stack has deployed without error, copying a CSV file to the S3 bucket is the quickest way to confirm everything is working. The project includes test data files with 20 rows of test message data. Below is a sample of the CSV file, which is included in the project. The data was collected from IoT devices that measured response time from wired versus wireless devices on a LAN network; the message details are immaterial to this demonstration (gist).


timestamp location source local_dest local_avg remote_dest remote_avg
1559040909.3853335 location-03 wireless router-1 4.39 device-1 9.09
1559040919.5273902 location-03 wireless router-1 0.49 device-1 16.75
1559040929.6446512 location-03 wireless router-1 0.56 device-1 8.31
1559040939.7712135 location-03 wireless router-1 1.64 device-1 9.4
1559040949.891723 location-03 wireless router-1 1.18 device-1 9.07
1559040960.011338 location-03 wireless router-1 0.42 device-1 8.4
1559040970.1319716 location-03 wireless router-1 1.73 device-1 8.66
1559040980.2533505 location-03 wireless router-1 0.67 device-1 8.61
1559040990.3816211 location-03 wireless router-1 1.27 device-1 10.87
1559041000.5105414 location-03 wireless router-1 1.63 device-1 10.08

view raw
message_data.csv
hosted with ❤ by GitHub

Run the following commands to copy the test data file to your S3 bucket.

S3_DATA_BUCKET=your_data_bucket_name
aws s3 cp sample_data/data.csv s3://$S3_DATA_BUCKET

Visit the DynamoDB management console. You should observe a new DynamoDB table.

screen_shot_2019-09-30_at_8_47_10_pm

Within the new DynamoDB table, you should observe twenty items, corresponding to each of the twenty rows in the CSV file, uploaded to S3.

screen_shot_2019-09-30_at_8_51_07_pm

Drill into an individual item within the table and review its attributes. They should match the rows in the CSV file.

screen_shot_2019-09-30_at_8_51_54_pm

Both the Python- and Node.js-based Lambda functions have their default logging levels set to debug. The debug-level output from each Lambda function is streamed to individual Amazon CloudWatch Log Groups. We can use the CloudWatch logs to troubleshoot any issues with the deployed application. Below we see an example of CloudWatch log entries for the request and response payloads generated from GetMessageFunction Lambda function, which is querying DynamoDB for a single Item.

screen_shot_2019-10-03_at_9_16_22_pm

Event-Driven Patterns

There are three distinct and discrete event-driven dataflows within the demonstration’s architecture

  1. S3 Event Source for Lambda (S3 to SQS)
  2. SQS Event Source for Lambda (SQS to DynamoDB)
  3. API Gateway Event Source for Lambda (API Gateway to DynamoDB)

Let’s examine each event-driven dataflow and the Lambda code associated with that part of the architecture.

S3 Event Source for Lambda

Whenever a file is copied into the target S3 bucket, an S3 Event Notification triggers an asynchronous invocation of a Lambda. According to AWS, when you invoke a function asynchronously, the Lambda sends the event to the SQS queue. A separate process reads events from the queue and executes your Lambda function.

new-02-sqs-dynamodb

The Lambda’s function handler, written in Python, reads the CSV file, whose filename is contained in the event. The Lambda extracts the rows in the CSV file, transforms the data, and pushes each message to the SQS queue (gist).

def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(
event['Records'][0]['s3']['object']['key'],
encoding='utf-8'
)
messages = read_csv_file(bucket, key)
process_messages(messages)

view raw
app.py
hosted with ❤ by GitHub

Below is an example of a message body, part an SQS message, extracted from a single row of the CSV file, and sent by the Lambda to the SQS queue. The timestamp has been converted to separate date and time fields by the Lambda. The DynamoDB table is part of the SQS message body. The key/value pairs in the Item JSON object reflect the schema of the DynamoDB table (gist).

{
"TableName": "your-dynamodb-table-name",
"Item": {
"date": {
"S": "2001-01-01"
},
"time": {
"S": "09:01:05"
},
"location": {
"S": "location-03"
},
"source": {
"S": "wireless"
},
"local_dest": {
"S": "router-1"
},
"local_avg": {
"N": "5.55"
},
"remote_dest": {
"S": "device-1"
},
"remote_avg": {
"N": "10.10"
}
}
}

view raw
sqs_message.json
hosted with ❤ by GitHub

SQS Event Source for Lambda

According to AWS, SQS offers two types of message queues, Standard and FIFO (First-In-First-Out). An SQS FIFO queue is designed to guarantee that messages are processed exactly once, in the exact order that they are sent. A Standard SQS queue offers maximum throughput, best-effort ordering, and at-least-once delivery.

Examining the SQS management console, you should observe that the CloudFormation stack creates two SQS Standard queues—a primary queue and a Dead Letter Queue (DLQ). According to AWS, Amazon SQS supports dead-letter queues, which other queues (source queues) can target for messages that cannot be processed (consumed) successfully.

screen_shot_2019-09-30_at_8_55_33_pm

Examining the SQS Lambda Triggers tab, you should observe the Lambda, which will be triggered by the SQS events.

screen_shot_2019-09-30_at_8_56_25_pm

When a message is pushed into the SQS queue by the previous process, an SQS event is fired, which synchronously triggers an invocation of the Lambda using the SQS Event Source for Lambda functionality. When a function is invoked synchronously, Lambda runs the function and waits for a response.

new-03-sqs-dynamodb

In the demonstration, the Lambda’s function handler, also written in Python, pulls the message off of the SQS queue and writes the message (DynamoDB put) to the DynamoDB table. Although writing is the primary use case in this demonstration, an event could also trigger a get, scan, update, or delete command to be executed on the DynamoDB table (gist).

def lambda_handler(event, context):
operations = {
'DELETE': lambda dynamo, x: dynamo.delete_item(**x),
'POST': lambda dynamo, x: dynamo.put_item(**x),
'PUT': lambda dynamo, x: dynamo.update_item(**x),
'GET': lambda dynamo, x: dynamo.get_item(**x),
'GET_ALL': lambda dynamo, x: dynamo.scan(**x),
}
for record in event['Records']:
payload = loads(record['body'], parse_float=str)
operation = record['messageAttributes']['Method']['stringValue']
if operation in operations:
try:
operations[operation](dynamo_client, payload)
except Exception as e:
logger.error(e)
else:
logger.error('Unsupported method \'{}\''.format(operation))

view raw
app.py
hosted with ❤ by GitHub

API Gateway Event Source for Lambda

Examining the API Gateway management console, you should observe that CloudFormation created a new Edge-optimized API. The API contains several resources and their associated HTTP methods.

screen_shot_2019-09-30_at_9_02_52_pm

Each API resource is associated with a deployed Lambda function. Switching to the Lambda console, you should observe a total of seven new Lambda functions. There are five Lambda functions related to the API, in addition to the Lambda called by the S3 event notifications and the Lambda called by the SQS event notifications.

screen_shot_2019-09-30_at_8_46_24_pm

Examining one of the Lambda functions associated with the API Gateway, we should observe that the API Gateway trigger for the Lambda (lower left and bottom).

screen_shot_2019-09-30_at_8_59_39_pm

When an end-user makes an HTTP(S) request via the RESTful API exposed by the API Gateway, an event is fired, which synchronously invokes a Lambda using the API Gateway Event Source for Lambda functionality. The event contains details about the HTTP request that is received. The event triggers any one of five different Lambda functions, depending on the HTTP request method.

new-04-sqs-dynamodb

The Lambda code, written in Node.js, contains five function handlers. Each handler corresponds to an HTTP method, including GET (DynamoDB get) POST (put), PUT (update), DELETE (delete), and SCAN (scan). Below is an example of the getMessage handler function. The function accepts two inputs. First, a path parameter, the date, which is the primary partition key for the DynamoDB table. Second, a query parameter, the time, which is the primary sort key for the DynamoDB table. Both the primary partition key and sort key must be passed to DynamoDB to retrieve the requested record (gist).

exports.getMessage = async (event, context) => {
if (tableName == null) {
tableName = process.env.TABLE_NAME;
}
params = {
TableName: tableName,
Key: {
"date": event.pathParameters.date,
"time": event.queryStringParameters.time
}
};
console.debug(params.Key);
return await new Promise((resolve, reject) => {
docClient.get(params, (error, data) => {
if (error) {
console.error(`getMessage ERROR=${error.stack}`);
resolve({
statusCode: 400,
error: `Could not get messages: ${error.stack}`
});
} else {
console.info(`getMessage data=${JSON.stringify(data)}`);
resolve({
statusCode: 200,
body: JSON.stringify(data)
});
}
});
});
};

view raw
app.js
hosted with ❤ by GitHub

Test the API

To test the Lambda functions, called by our API, we can use the sam local invoke command, part of the SAM CLI. Using this command, we can test the local Lambda functions, without them being deployed to AWS. The command allows us to trigger events, which the Lambda functions will handle. This is useful as we continue to develop, test, and re-deploy the Lambda functions to our Development, Staging, and Production environments.

The local Node.js-based, API-related Lambda functions, just like their deployed copies, will execute commands against the actual DynamoDB on AWS. The Github project contains a set of five sample events, corresponding to the five Lambda functions, which in turn are associated with five different HTTP methods and API resources. For example, the event_getMessage.json event is associated with the GET HTTP method and calls the /message/{date}?time={time} resource endpoint, to return a single item. This event, shown below, triggers the GetMessageFunction Lambda (gist).

{
"body": "",
"resource": "/",
"path": "/message",
"httpMethod": "GET",
"isBase64Encoded": false,
"queryStringParameters": {
"time": "06:45:43"
},
"pathParameters": {
"date": "2000-01-01"
},
"stageVariables": {}
}

We can trigger all the events from using the CLI. The local Lambda expects the DynamoDB table name to exist as an environment variable. Make sure you set it locally, first, before executing the sam local invoke commands (gist).

# variables (required by local lambda functions)
TABLE_NAME=your-dynamodb-table-name
# local testing (All CRUD functions)
sam local invoke PostMessageFunction \
–event lambda_apigtw_to_dynamodb/events/event_postMessage.json
sam local invoke GetMessageFunction \
–event lambda_apigtw_to_dynamodb/events/event_getMessage.json
sam local invoke GetMessagesFunction \
–event lambda_apigtw_to_dynamodb/events/event_getMessages.json
sam local invoke PutMessageFunction \
–event lambda_apigtw_to_dynamodb/events/event_putMessage.json
sam local invoke DeleteMessageFunction \
–event lambda_apigtw_to_dynamodb/events/event_deleteMessage.json

view raw
sam-local-invoke.sh
hosted with ❤ by GitHub

If the events were successfully handled by the local Lambda functions, in the terminal, you should see the same HTTP response status codes you would expect from calling the RESTful resources via the API Gateway. Below, for example, we see the POST event being handled by the PostMessageFunction Lambda, adding a record to the DynamoDB table, and returning a successful status of 201 Created.

screen_shot_2019-10-04_at_2_50_41_pm.png

Testing the Deployed API

To test the actual deployed API, we can call one of the API’s resources using an HTTP client, such as Postman. To locate the URL used to invoke the API resource, look at the ‘Prod’ Stage for the new API. This can be found in the Stages tab of the API Gateway console. For example, note the Invoke URL for the POST HTTP method of the /message resource, shown below.

screen_shot_2019-10-04_at_3_02_21_pm

Below, we see an example of using Postman to make an HTTP GET request the /message/{date}?time={time} resource. We pass the required query and path parameters for date and for time. The request should receive a single item in response from DynamoDB, via the API Gateway and the associated Lambda. Here, the request was successful, and the Lambda function returns a 200 OK status.

screen_shot_2019-09-30_at_9_20_45_pm

Similarly, below, we see an example of calling the same /message endpoint using the HTTP POST method. In the body of the POST request, we pass the DynamoDB table name and the Item object. Again, the POST is successful, and the Lambda function returns a 201 Created status.

screen_shot_2019-10-03_at_10_05_31_pm

Cleaning Up

To complete the demonstration and remove the AWS resources, run the following commands. It is necessary to delete all objects from the S3 data bucket, first, before deleting the CloudFormation stack. Else, the stack deletion will fail.

S3_DATA_BUCKET=your_data_bucket_name
STACK_NAME=your_stack_name

aws s3 rm s3://$S3_DATA_BUCKET/data.csv # and any other objects

aws cloudformation delete-stack \
  --stack-name $STACK_NAME

Conclusion

In this post, we explored a simple example of building a modern application using an event-driven serverless architecture on AWS. We used several services, all part of the AWS Serverless Computing platform, including Lambda, API Gateway, SQS, S3, and DynamoDB. In addition to these, AWS has additional serverless services, which could enhance this demonstration, in particular, Amazon Kinesis, AWS Step Functions, Amazon SNS, and AWS AppSync.

In a future post, we will look at how to further test the individual components within this demonstration’s application stack, and how to automate its deployment using DevOps and CI/CD principals on AWS.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

Leave a comment

Getting Started with PostgreSQL using Amazon RDS, CloudFormation, pgAdmin, and Python

Introduction

In the following post, we will explore how to get started with Amazon Relational Database Service (RDS) for PostgreSQL. CloudFormation will be used to build a PostgreSQL master database instance and a single read replica in a new VPC. AWS Systems Manager Parameter Store will be used to store our CloudFormation configuration values. Amazon RDS Event Notifications will send text messages to our mobile device to let us know when the RDS instances are ready for use. Once running, we will examine a variety of methods to interact with our database instances, including pgAdmin, Adminer, and Python.

Technologies

The primary technologies used in this post include the following.

PostgreSQL

Image result for postgres logoAccording to its website, PostgreSQL, commonly known as Postgres, is the world’s most advanced Open Source relational database. Originating at UC Berkeley in 1986, PostgreSQL has more than 30 years of active core development. PostgreSQL has earned a strong reputation for its proven architecture, reliability, data integrity, robust feature set, extensibility. PostgreSQL runs on all major operating systems and has been ACID-compliant since 2001.

Amazon RDS for PostgreSQL

Image result for amazon rds logoAccording to Amazon, Amazon Relational Database Service (RDS) provides six familiar database engines to choose from, including Amazon Aurora, PostgreSQL, MySQL, MariaDB, Oracle Database, and SQL Server. RDS is available on several database instance types - optimized for memory, performance, or I/O.

Amazon RDS for PostgreSQL makes it easy to set up, operate, and scale PostgreSQL deployments in the cloud. Amazon RDS supports the latest PostgreSQL version 11, which includes several enhancements to performance, robustness, transaction management, query parallelism, and more.

AWS CloudFormation

Deployment__Management_copy_AWS_CloudFormation-512

According to Amazon, CloudFormation provides a common language to describe and provision all the infrastructure resources within AWS-based cloud environments. CloudFormation allows you to use a JSON- or YAML-based template to model and provision all the resources needed for your applications across all AWS regions and accounts, in an automated and secure manner.

Demonstration

Architecture

Below, we see an architectural representation of what will be built in the demonstration. This is not a typical three-tier AWS architecture, wherein the RDS instances would be placed in private subnets (data tier) and accessible only by the application tier, running on AWS. The architecture for the demonstration is designed for interacting with RDS through external database clients such as pgAdmin, and applications like our local Python scripts, detailed later in the post.

RDS AWS Arch Diagram

Source Code

All source code for this post is available on GitHub in a single public repository, postgres-rds-demo.

.
├── LICENSE.md
├── README.md
├── cfn-templates
│   ├── event.template
│   ├── rds.template
├── parameter_store_values.sh
├── python-scripts
│   ├── create_pagila_data.py
│   ├── database.ini
│   ├── db_config.py
│   ├── query_postgres.py
│   ├── requirements.txt
│   └── unit_tests.py
├── sql-scripts
│   ├── pagila-insert-data.sql
│   └── pagila-schema.sql
└── stack.yml

To clone the GitHub repository, execute the following command.

git clone --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/aws-rds-postgres.git

Prerequisites

For this demonstration, I will assume you already have an AWS account. Further, that you have the latest copy of the AWS CLI and Python 3 installed on your development machine. Optionally, for pgAdmin and Adminer, you will also need to have Docker installed.

Steps

In this demonstration, we will perform the following steps.

  • Put CloudFormation configuration values in Parameter Store;
  • Execute CloudFormation templates to create AWS resources;
  • Execute SQL scripts using Python to populate the new database with sample data;
  • Configure pgAdmin and Python connections to RDS PostgreSQL instances;

AWS Systems Manager Parameter Store

With AWS, it is typical to use services like AWS Systems Manager Parameter Store and AWS Secrets Manager to store overt, sensitive, and secret configuration values. These values are utilized by your code, or from AWS services like CloudFormation. Parameter Store allows us to follow the proper twelve-factor, cloud-native practice of separating configuration from code.

To demonstrate the use of Parameter Store, we will place a few of our CloudFormation configuration items into Parameter Store. The demonstration’s GitHub repository includes a shell script, parameter_store_values.sh, which will put the necessary parameters into Parameter Store.

Below, we see several of the demo’s configuration values, which have been put into Parameter Store.

screen_shot_2019-08-08_at_7_00_17_pm

SecureString

Whereas our other parameters are stored in Parameter Store as String datatypes, the database’s master user password is stored as a SecureString data-type. Parameter Store uses an AWS Key Management Service (KMS) customer master key (CMK) to encrypt the SecureString parameter value.

screen_shot_2019-08-08_at_7_01_15_pm

SMS Text Alert Option

Before running the Parameter Store script, you will need to change the /rds_demo/alert_phone parameter value in the script (shown below) to your mobile device number, including country code, such as ‘+12038675309’. Amazon SNS will use it to send SMS messages, using Amazon RDS Event Notification. If you don’t want to use this messaging feature, simply ignore this parameter and do not execute the event.template CloudFormation template in the proceeding step.

aws ssm put-parameter \
  --name /rds_demo/alert_phone \
  --type String \
  --value "your_phone_number_here" \
  --description "RDS alert SMS phone number" \
  --overwrite

Run the following command to execute the shell script, parameter_store_values.sh, which will put the necessary parameters into Parameter Store.

sh ./parameter_store_values.sh

CloudFormation Templates

The GitHub repository includes two CloudFormation templates, cfn-templates/event.template and cfn-templates/rds.template. This event template contains two resources, which are an AWS SNS Topic and an AWS RDS Event Subscription. The RDS template also includes several resources, including a VPC, Internet Gateway, VPC security group, two public subnets, one RDS master database instance, and an AWS RDS Read Replica database instance.

The resources are split into two CloudFormation templates so we can create the notification resources, first, independently of creating or deleting the RDS instances. This will ensure we get all our SMS alerts about both the creation and deletion of the databases.

Template Parameters

The two CloudFormation templates contain a total of approximately fifteen parameters. For most, you can use the default values I have set or chose to override them. Four of the parameters will be fulfilled from Parameter Store. Of these, the master database password is treated slightly differently because it is secure (encrypted in Parameter Store). Below is a snippet of the template showing both types of parameters. The last two are fulfilled from Parameter Store.

DBInstanceClass:
  Type: String
  Default: "db.t3.small"
DBStorageType:
  Type: String
  Default: "gp2"
DBUser:
  Type: String
  Default: "{{resolve:ssm:/rds_demo/master_username:1}}"
DBPassword:
  Type: String
  Default: "{{resolve:ssm-secure:/rds_demo/master_password:1}}"
  NoEcho: True

Choosing the default CloudFormation parameter values will result in two minimally-configured RDS instances running the PostgreSQL 11.4 database engine on a db.t3.small instance with 10 GiB of General Purpose (SSD) storage. The db.t3 DB instance is part of the latest generation burstable performance instance class. The master instance is not configured for Multi-AZ high availability. However, the master and read replica each run in a different Availability Zone (AZ) within the same AWS Region.

Parameter Versioning

When placing parameters into Parameter Store, subsequent updates to a parameter result in the version number of that parameter being incremented. Note in the examples above, the version of the parameter is required by CloudFormation, here, ‘1’. If you chose to update a value in Parameter Store, thus incrementing the parameter’s version, you will also need to update the corresponding version number in the CloudFormation template’s parameter.

{
    "Parameter": {
        "Name": "/rds_demo/rds_username",
        "Type": "String",
        "Value": "masteruser",
        "Version": 1,
        "LastModifiedDate": 1564962073.774,
        "ARN": "arn:aws:ssm:us-east-1:1234567890:parameter/rds_demo/rds_username"
    }
}

Validating Templates

Although I have tested both templates, I suggest validating the templates yourself, as you usually would for any CloudFormation template you are creating. You can use the AWS CLI CloudFormation validate-template CLI command to validate the template. Alternately, or I suggest additionally, you can use CloudFormation Lintercfn-lint command.

aws cloudformation validate-template \
  --template-body file://cfn-templates/rds.template

cfn-lint -t cfn-templates/cfn-templates/rds.template

Create the Stacks

To execute the first CloudFormation template and create a CloudFormation Stack containing the two event notification resources, run the following create-stack CLI command.

aws cloudformation create-stack \
  --template-body file://cfn-templates/event.template \
  --stack-name RDSEventDemoStack

The first stack only takes less than one minute to create. Using the AWS CloudFormation Console, make sure the first stack completes successfully before creating the second stack with the command, below.

aws cloudformation create-stack \
  --template-body file://cfn-templates/rds.template \
  --stack-name RDSDemoStack

screen_shot_2019-08-04_at_10_35_20_pm

Wait for my Text

In my tests, the CloudFormation RDS stack takes an average of 25–30 minutes to create and 15–20 minutes to delete, which can seem like an eternity. You could use the AWS CloudFormation console (shown below) or continue to use the CLI to follow the progress of the RDS stack creation.

screen_shot_2019-08-08_at_7_39_45_pm.png

However, if you recall, the CloudFormation event template creates an AWS RDS Event Subscription. This resource will notify us when the databases are ready by sending text messages to our mobile device.

screen_shot_2019-08-04_at_11_06_31_pm

In the CloudFormation events template, the RDS Event Subscription is configured to generate Amazon Simple Notification Service (SNS) notifications for several specific event types, including RDS instance creation and deletion.

  MyEventSubscription:
    Properties:
      Enabled: true
      EventCategories:
        - availability
        - configuration change
        - creation
        - deletion
        - failover
        - failure
        - recovery
      SnsTopicArn:
        Ref: MyDBSNSTopic
      SourceType: db-instance
    Type: AWS::RDS::EventSubscription

Amazon SNS will send SMS messages to the mobile number you placed into Parameter Store. Below, we see messages generated during the creation of the two instances, displayed on an Apple iPhone.

img-2839

Amazon RDS Dashboard

Once the RDS CloudFormation stack has successfully been built, the easiest way to view the results is using the Amazon RDS Dashboard, as shown below. Here we see both the master and read replica instances have been created and are available for our use.

screen_shot_2019-08-08_at_7_05_24_pm

The RDS dashboard offers CloudWatch monitoring of each RDS instance.

screen_shot_2019-08-08_at_7_06_17_pm

The RDS dashboard also provides detailed configuration information about each RDS instance.

screen_shot_2019-08-08_at_7_06_26_pm

The RDS dashboard’s Connection & security tab is where we can obtain connection information about our RDS instances, including the RDS instance’s endpoints. Endpoints information will be required in the next part of the demonstration.

screen_shot_2019-08-08_at_7_06_01_pm

Sample Data

Now that we have our PostgreSQL database instance and read replica successfully provisioned and configured on AWS, with an empty database, we need some test data. There are several sources of sample PostgreSQL databases available on the internet to explore. We will use the Pagila sample movie rental database by pgFoundry. Although the database is several years old, it provides a relatively complex relational schema (table relationships shown below) and plenty of sample data to query, about 100 database objects and 46K rows of data.

pagila_tablespng

In the GitHub repository, I have included the two Pagila database SQL scripts required to install the sample database’s data structures (DDL), sql-scripts/pagila-schema.sql, and the data itself (DML), sql-scripts/pagila-insert-data.sql.

To execute the Pagila SQL scripts and install the sample data, I have included a Python script. If you do not want to use Python, you can skip to the Adminer section of this post. Adminer also has the capability to import SQL scripts.

Before running any of the included Python scripts, you will need to install the required Python packages and configure the database.ini file.

Python Packages

To install the required Python packages using the supplied python-scripts/requirements.txt file, run the below commands.

cd python-scripts
pip3 install --upgrade -r requirements.txt

We are using two packages, psycopg2 and configparser, for the scripts. Psycopg is a PostgreSQL database adapter for Python. According to their website, Psycopg is the most popular PostgreSQL database adapter for the Python programming language. The configparser module allows us to read configuration from files similar to Microsoft Windows INI files. The unittest package is required for a set of unit tests includes the project, but not discussed as part of the demo.

screen_shot_2019-08-13_at_11_06_10_pm

Database Configuration

The python-scripts/database.ini file, read by configparser, provides the required connection information to our RDS master and read replica instance’s databases. Use the input parameters and output values from the CloudFormation RDS template, or the Amazon RDS Dashboard to obtain the required connection information, as shown in the example, below. Your host values will be unique for your master and read replica. The host values are the instance’s endpoint, listed in the RDS Dashboard’s Configuration tab.

[docker]
host=localhost
port=5432
database=pagila
user=masteruser
password=5up3r53cr3tPa55w0rd

[master]
host=demo-instance.dkfvbjrazxmd.us-east-1.rds.amazonaws.com
port=5432
database=pagila
user=masteruser
password=5up3r53cr3tPa55w0rd

[replica]
host=demo-replica.dkfvbjrazxmd.us-east-1.rds.amazonaws.com
port=5432
database=pagila
user=masteruser
password=5up3r53cr3tPa55w0rd

With the INI file configured, run the following command, which executes a supplied Python script, python-scripts/create_pagila_data.py, to create the data structure and insert sample data into the master RDS instance’s Pagila database. The database will be automatically replicated to the RDS read replica instance. From my local laptop, I found the Python script takes approximately 40 seconds to create all 100 database objects and insert 46K rows of movie rental data. That is compared to about 13 seconds locally, using a Docker-based instance of PostgreSQL.

python3 ./create_pagila_data.py --instance master

The Python script’s primary function, create_pagila_db(), reads and executes the two external SQL scripts.

def create_pagila_db():
    """
    Creates Pagila database by running DDL and DML scripts
    """

    try:
        global conn
        with conn:
            with conn.cursor() as curs:
                curs.execute(open("../sql-scripts/pagila-schema.sql", "r").read())
                curs.execute(open("../sql-scripts/pagila-insert-data.sql", "r").read())
                conn.commit()
                print('Pagila SQL scripts executed')
    except (psycopg2.OperationalError, psycopg2.DatabaseError, FileNotFoundError) as err:
        print(create_pagila_db.__name__, err)
        close_conn()
        exit(1)

If the Python script executes correctly, you should see output indicating there are now 28 tables in our master RDS instance’s database.

screen_shot_2019-08-08_at_7_13_11_pm

pgAdmin

pgAdmin is a favorite tool for interacting with and managing PostgreSQL databases. According to its website, pgAdmin is the most popular and feature-rich Open Source administration and development platform for PostgreSQL.

The project includes an optional Docker Swarm stack.yml file. The stack will create a set of three Docker containers, including a local copy of PostgreSQL 11.4, Adminer, and pgAdmin 4. Having a local copy of PostgreSQL, using the official Docker image, is helpful for development and trouble-shooting RDS issues.

screen_shot_2019-08-10_at_1_43_24_pm.png

Use the following commands to deploy the Swarm stack.

# create stack
docker swarm init
docker stack deploy -c stack.yml postgres

# get status of new containers
docker stack ps postgres --no-trunc
docker container ls

If you do not want to spin up the whole Docker Swarm stack, you could use the docker run command to create just a single pgAdmin Docker container. The pgAdmin 4 Docker image being used is the image recommended by pgAdmin.

docker pull dpage/pgadmin4

docker run -p 81:80 \
  -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" \
  -e "PGADMIN_DEFAULT_PASSWORD=SuperSecret" \
  -d dpage/pgadmin4

docker container ls | grep pgadmin4

Database Server Configuration

Once pgAdmin is up and running, we can configure the master and read replica database servers (RDS instances) using the connection string information from your database.ini file or from the Amazon RDS Dashboard. Below, I am configuring the master RDS instance (server).

screen_shot_2019-08-08_at_7_25_27_pm

With that task complete, below, we see the master RDS instance and the read replica, as well as my local Docker instance configured in pgAdmin (left side of screengrab). Note how the Pagila database has been replicated automatically, from the RDS master to the read replica instance.

screen_shot_2019-08-08_at_7_29_00_pm

Building SQL Queries

Switching to the Query tab, we can run regular SQL queries against any of the database instances. Below, I have run a simple SELECT query against the master RDS instance’s Pagila database, returning the complete list of movie titles, along with their genre and release date.

screen_shot_2019-08-08_at_7_27_58_pm

The pgAdmin Query tool even includes an Explain tab to view a graphical representation of the same query, very useful for optimization. Here we see the same query, showing an analysis of the execution order. A popup window displays information about the selected object.