Posts Tagged Amazon
Running PySpark Applications on Amazon EMR: Methods for Interacting with PySpark on Amazon Elastic MapReduce
Posted by Gary A. Stafford in AWS, Build Automation, Cloud, Python, Software Development on December 2, 2020
Introduction
According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.
With EMR, individuals and teams can also use notebooks, including EMR Notebooks, based on JupyterLab, the web-based interactive development environment for Jupyter notebooks for ad-hoc data analytics. Apache Zeppelin is also available to collaborate and interactively explore, process, and visualize data. With EMR notebooks and the EMR API, users can programmatically execute a notebook without the need to interact with the EMR console, referred to as headless execution.
AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. One major difference between EMR versions relevant to this post is EMR 6.x’s support for the latest Hadoop and Spark 3.x frameworks. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.
PySpark on EMR
In the following series of posts, we will focus on the options available to interact with Amazon EMR using the Python API for Apache Spark, known as PySpark. We will divide the methods for accessing PySpark on EMR into two categories: PySpark applications and notebooks. We will explore both interactive and automated patterns for running PySpark applications (Python scripts) and PySpark-based notebooks. In this first post, I will cover the first four PySpark Application Methods listed below. In part two, I will cover Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and in part three, the use of notebooks.
PySpark Application Methods
- Add Job Flow Steps: Remote execution of EMR Steps on an existing EMR cluster using the
add_job_flow_steps
method; - EMR Master Node: Remote execution over SSH of PySpark applications using
spark-submit
on an existing EMR cluster’s Master node; - Run Job Flow: Remote execution of EMR Steps on a newly created long-lived or auto-terminating EMR cluster using the
run_job_flow
method; - AWS Step Functions: Remote execution of EMR Steps using AWS Step Functions on an existing or newly created long-lived or auto-terminating EMR cluster;
- Apache Airflow: Remote execution of EMR Steps using the recently released Amazon MWAA on an existing or newly created long-lived or auto-terminating EMR cluster (see part two of this series);
Notebook Methods
- EMR Notebooks for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Jupyter Notebooks on an existing EMR cluster;
- Headless Execution of EMR Notebooks: Headless execution of notebooks from an existing EMR cluster or newly created auto-terminating cluster;
- Apache Zeppelin for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Zeppelin notebooks on an existing EMR cluster;
Note that wherever the AWS SDK for Python (boto3
) is used in this post, we can substitute the AWS CLI or AWS Tools for PowerShell. Typically, these commands and Python scripts would be run as part of a DevOps or DataOps deployment workflow, using CI/CD platforms like AWS CodePipeline, Jenkins, Harness, CircleCI, Travis CI, or Spinnaker.
Preliminary Tasks
To prepare the AWS EMR environment for this post, we need to perform a few preliminary tasks.
- Download a copy of this post’s GitHub repository;
- Download three Kaggle datasets and organize locally;
- Create an Amazon EC2 key pair;
- Upload the EMR bootstrap script and create the CloudFormation Stack;
- Allow your IP address access to the EMR Master node on port 22;
- Upload CSV data files and PySpark applications to S3;
- Crawl the raw data and create a Data Catalog using AWS Glue;
Step 1: GitHub Repository
Using this git clone
command, download a copy of this post’s GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \
https://github.com/garystafford/emr-demo.git
Step 2: Kaggle Datasets
Kaggle is a well-known data science resource with 50,000 public datasets and 400,000 public notebooks. We will be using three Kaggle datasets in this post. You will need to join Kaggle to access these free datasets. Download the following three Kaggle datasets as CSV files. Since we are working with (moderately) big data, the total size of the datasets will be approximately 1 GB.
- Movie Ratings: https://www.kaggle.com/rounakbanik/the-movies-dataset
- Bakery: https://www.kaggle.com/sulmansarwar/transactions-from-a-bakery
- Stocks: https://www.kaggle.com/timoboz/stock-data-dow-jones
Organize the (38) downloaded CSV files into the raw_data
directory of the locally cloned GitHub repository, exactly as shown below. We will upload these files to Amazon S3, in the proceeding step.
> tree raw_data --si -v -A
raw_data
├── [ 128] bakery
│ ├── [711k] BreadBasket_DMS.csv
├── [ 320] movie_ratings
│ ├── [190M] credits.csv
│ ├── [6.2M] keywords.csv
│ ├── [989k] links.csv
│ ├── [183k] links_small.csv
│ ├── [ 34M] movies_metadata.csv
│ ├── [710M] ratings.csv
│ └── [2.4M] ratings_small.csv
└── [1.1k] stocks
├── [151k] AAPL.csv
├── [146k] AXP.csv
├── [150k] BA.csv
├── [147k] CAT.csv
├── [146k] CSCO.csv
├── [149k] CVX.csv
├── [147k] DIS.csv
├── [ 42k] DWDP.csv
├── [150k] GS.csv
└── [...] abrdiged...
In this post, we will be using three different datasets. However, if you want to limit the potential costs associated with big data analytics on AWS, you can choose to limit job submissions to only one or two of the datasets. For example, the bakery and stocks datasets are fairly small yet effectively demonstrate most EMR features. In contrast, the movie rating dataset has nearly 27 million rows of ratings data, which starts to demonstrate the power of EMR and PySpark for big data.
Step 3: Amazon EC2 key pair
According to AWS, a key pair, consisting of a private key and a public key, is a set of security credentials that you use to prove your identity when connecting to an [EC2] instance. Amazon EC2 stores the public key, and you store the private key. To SSH into the EMR cluster, you will need an Amazon key pair. If you do not have an existing Amazon EC2 key pair, create one now. The easiest way to create a key pair is from the AWS Management Console.

Your private key is automatically downloaded when you create a key pair in the console. Store your private key somewhere safe. If you use an SSH client on a macOS or Linux computer to connect to EMR, use the following chmod
command to set the correct permissions of your private key file so that only you can read it.
chmod 0400 /path/to/my-key-pair.pem
Step 4: Bootstrap Script and CloudFormation Stack
The bulk of the resources that are used as part of this demonstration are created using the CloudFormation stack, emr-dem-dev
. The CloudFormation template that creates the stack, cloudformation/emr-demo.yml
, is included in the repository. Please review all resources and understand the cost and security implications before continuing.
There is also a JSON-format CloudFormation parameters file, cloudformation/emr-demo-params-dev.json
, containing values for all but two of the parameters in the CloudFormation template. The two parameters not in the parameter file are the name of the EC2 key pair you just created and the bootstrap bucket’s name. Both will be passed along with the CloudFormation template using the Python script, create_cfn_stack.py
. For each type of environment, such as Development, Test, and Production, you could have a separate CloudFormation parameters file, with different configurations.
The template will create approximately (39) AWS resources, including a new AWS VPC, a public subnet, an internet gateway, route tables, a 3-node EMR v6.2.0 cluster, a series of Amazon S3 buckets, AWS Glue data catalog, AWS Glue crawlers, several Systems Manager Parameter Store parameters, and so forth.
The CloudFormation template includes the location of the EMR bootstrap script located on Amazon S3. Before creating the CloudFormation stack, the Python script creates an S3 bootstrap bucket and copies the bootstrap script, bootstrap_actions.sh
, from the local project repository to the S3 bucket. The script will be used to install additional packages on EMR cluster nodes, which are required by our PySpark applications. The script also sets the default AWS Region for boto3
.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create the bootstrap bucket, copy the bootstrap script, and provision the CloudFormation stack. You will need to pass the name of your EC2 key pair to the script as a command-line argument.
python3 ./scripts/create_cfn_stack.py \ --environment dev \ --ec2-key-name <my-key-pair-name>
The CloudFormation template should create a CloudFormation stack, emr-demo-dev
, as shown below.

Step 5: SSH Access to EMR
For this demonstration, we will need access to the new EMR cluster’s Master EC2 node, using SSH and your key pair, on port 22. The easiest way to add a new inbound rule to the correct AWS Security Group is to use the AWS Management Console. First, find your EC2 Security Group named ElasticMapReduce-master
.

Then, add a new Inbound rule for SSH (port 22) from your IP address, as shown below.

Alternately, you could use the AWS CLI or AWS SDK to create a new security group ingress rule.
export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
jq -r '.SecurityGroups[] | select(.GroupName=="ElasticMapReduce-master").GroupId')
aws ec2 authorize-security-group-ingress \
--group-id ${EMR_MASTER_SG_ID} \
--protocol tcp \
--port 22 \
--cidr $(curl ipinfo.io/ip)/32
Step 6: Raw Data and PySpark Apps to S3
As part of the emr-demo-dev
CloudFormation stack, we now have several new Amazon S3 buckets within our AWS Account. The naming conventions and intended usage of these buckets follow common organizational patterns for data lakes. The data buckets use the common naming convention of raw
, processed
, and analyzed
data in reference to the data stored within them. We also use a widely used, corresponding naming convention of ‘bronze’, ‘silver’, and ‘gold’ when referring to these data buckets as parameters.
> aws s3api list-buckets | \ jq -r '.Buckets[] | select(.Name | startswith("emr-demo-")).Name' emr-demo-raw-123456789012-us-east-1 emr-demo-processed-123456789012-us-east-1 emr-demo-analyzed-123456789012-us-east-1 emr-demo-work-123456789012-us-east-1 emr-demo-logs-123456789012-us-east-1 emr-demo-glue-db-123456789012-us-east-1 emr-demo-bootstrap-123456789012-us-east-1
There is a raw
data bucket (aka bronze) that will contain the original CSV files. There is a processed
data bucket (aka silver) that will contain data that might have had any number of actions applied: data cleansing, obfuscation, data transformation, file format changes, file compression, and data partitioning. Finally, there is an analyzed
data bucket (aka gold) that has the results of the data analysis. We also have a work
bucket that holds the PySpark applications, a logs
bucket that holds EMR logs, and a glue-db
bucket to hold the Glue Data Catalog metadata.
Whenever we submit PySpark jobs to EMR, the PySpark application files and data will always be accessed from Amazon S3. From the GitHub repository’s local copy, run the following command, which will execute a Python script to upload the approximately (38) Kaggle dataset CSV files to the raw
S3 data bucket.
python3 ./scripts/upload_csv_files_to_s3.py
Next, run the following command, which will execute a Python script to upload a series of PySpark application files to the work
S3 data bucket.
python3 ./scripts/upload_apps_to_s3.py
Step 7: Crawl Raw Data with Glue
The last preliminary step to prepare the EMR demonstration environment is to catalog the raw CSV data into an AWS Glue data catalog database, using one of the two Glue Crawlers we created. The three kaggle dataset’s data will reside in Amazon S3, while their schema and metadata will reside within tables in the Glue data catalog database, emr_demo
. When we eventually query the data from our PySpark applications, we will be querying the Glue data catalog’s database tables, which reference the underlying data in S3.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the raw data’s schema and metadata information into the Glue data catalog database, emr_demo
.
python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-raw
Once the crawler is finished, from the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo
, all prefixed with raw_
. The tables hold metadata and schema information for the three CSV-format Kaggle datasets loaded into S3.

Alternately, we can use the glue get-tables
AWS CLI command to review the tables.
> aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("raw_")).Name' raw_bakery raw_credits_csv raw_keywords_csv raw_links_csv raw_links_small_csv raw_movies_metadata_csv raw_ratings_csv raw_ratings_small_csv raw_stocks
PySpark Applications
Let’s explore four methods to run PySpark applications on EMR.

1. Add Job Flow Steps to an Existing EMR Cluster
We will start by looking at running PySpark applications using EMR Steps. According to AWS, we can use Amazon EMR steps to submit work to the Spark framework installed on an EMR cluster. The EMR step for PySpark uses a spark-submit
command. According to Spark’s documentation, the spark-submit
script, located in Spark’s bin
directory, is used to launch applications on a [EMR] cluster. A typical spark-submit
command we will be using resembles the following example. This command runs a PySpark application in S3, bakery_sales_ssm.py
.
We will target the existing EMR cluster created by CloudFormation earlier to execute our PySpark applications using EMR Steps. We have two sets of PySpark applications. The first set of three PySpark applications will transform the raw CSV-format datasets into Apache Parquet, a more efficient file format for big data analytics. Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data.
There are two versions of each PySpark application. Files with suffix _ssm
use the AWS Systems Manager (SSM) Parameter Store service to obtain dynamic parameter values at runtime on EMR. Corresponding non-SSM applications require those same parameter values to be passed on the command line when they are submitted to Spark. Therefore, these PySpark applications are not tightly coupled to boto3
or the SSM Parameter Store. We will use _ssm
versions of the scripts in this post’s demonstration.
> tree pyspark_apps --si -v -A
pyspark_apps
├── [ 320] analyze
│ ├── [1.4k] bakery_sales.py
│ ├── [1.5k] bakery_sales_ssm.py
│ ├── [2.6k] movie_choices.py
│ ├── [2.7k] movie_choices_ssm.py
│ ├── [2.0k] movies_avg_ratings.py
│ ├── [2.3k] movies_avg_ratings_ssm.py
│ ├── [2.2k] stock_volatility.py
│ └── [2.3k] stock_volatility_ssm.py
└── [ 256] process
├── [1.1k] bakery_csv_to_parquet.py
├── [1.3k] bakery_csv_to_parquet_ssm.py
├── [1.3k] movies_csv_to_parquet.py
├── [1.5k] movies_csv_to_parquet_ssm.py
├── [1.9k] stocks_csv_to_parquet.py
└── [2.0k] stocks_csv_to_parquet_ssm.py
We will start by executing the three PySpark processing applications. They will convert the CSV data to Parquet. Below, we see an example of one of the PySpark applications we will run, bakery_csv_to_parquet_ssm.py
. The PySpark application will convert the Bakery Sales dataset’s CSV file to Parquet and write it to S3.
The three PySpark data processing application’s spark-submit
commands are defined in a separate JSON-format file, job_flow_steps_process.json
, a snippet of which is shown below. The same goes for the four analytics applications.
Using this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python script, add_job_flow_steps_process.py
, shown below. Note line 31, where the Steps are injected into the add_job_flow_steps
method’s parameters.
The Python script used for this task takes advantage of AWS Systems Manager Parameter Store parameters. The parameters were placed in the Parameter Store, within the /emr_demo
path, by CloudFormation. We will reference these parameters in several scripts throughout the post.
> aws ssm get-parameters-by-path --path '/emr_demo' | \ jq -r ".Parameters[] | {Name: .Name, Value: .Value}"
From the GitHub repository’s local copy, run the following command, which will execute a Python script to load the three spark-submit
commands from JSON-format file, job_flow_steps_process.json
, and run the PySpark processing applications on the existing EMR cluster.
python3 ./scripts/add_job_flow_steps.py --job-type process
While the three Steps are running concurrently, the view from the Amazon EMR Console’s Cluster Steps tab should look similar to the example below.

Once the three Steps have been completed, we should note three sub-directories in the processed
data bucket containing Parquet-format files.

Of special note is the Stocks dataset, which has been converted to Parquet and partitioned by stock symbol. According to AWS, by partitioning your data, we can restrict the amount of data scanned by each query by specifying filters based on the partition, thus improving performance and reducing cost.

Lastly, the movie ratings dataset has been divided into sub-directories, based on the schema of each table. Each sub-directory contains Parquet files specific to that unique schema.

Crawl Processed Data with Glue
Similar to the raw data earlier, catalog the newly processed Parquet data into the same AWS Glue data catalog database using one of the two Glue Crawlers we created. Similar to the raw data, earlier, processed data will reside in the Amazon S3 processed
data bucket while their schemas and metadata will reside within tables in the Glue data catalog database, emr_demo
.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the processed data’s schema and metadata information into the Glue data catalog database, emr_demo
.
python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-processed
Once the crawler has finished successfully, using the AWS Console, we should see a series of nine tables in the Glue data catalog database, emr_demo
, all prefixed with processed_
. The tables represent the three kaggle dataset’s contents converted to Parquet and correspond to the equivalent tables with the raw_
prefix.

Alternately, we can use the glue get-tables
AWS CLI command to review the tables.
> aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("processed_")).Name' processed_bakery processed_credits processed_keywords processed_links processed_links_small processed_movies_metadata processed_ratings processed_ratings_small processed_stocks
2. Run PySpark Jobs from EMR Master Node
Next, we will explore how to execute PySpark applications remotely on the Master node on the EMR cluster using boto3
and SSH. Although this method may be optimal for certain use cases as opposed to using the EMR SDK, remote SSH execution does not scale as well in my opinion due to a lack of automation, and it exposes some potential security risks.
There are four PySpark applications in the GitHub repository. For this part of the demonstration, we will just submit the bakery_sales_ssm.py
application. This application will perform a simple analysis of the bakery sales data. While the other three PySpark applications use AWS Glue, the bakery_sales_ssm.py
application reads data directly from the processed
data S3 bucket.
The application writes its results into the analyzed
data S3 bucket, in both Parquet and CSV formats. The CSV file is handy for business analysts and other non-technical stakeholders who might wish to import the results of the analysis into Excel or business applications.
Earlier, we created an inbound rule to allow your IP address to access the Master node on port 22. From the EMR Console’s Cluster Summary tab, note the command necessary to SSH into the Master node of the EMR cluster.

The Python script, submit_spark_ssh.py
, shown below, will submit the PySpark job to the EMR Master Node, using paramiko
, a Python implementation of SSHv2. The script is replicating the same functionality as the shell-based SSH command above to execute a remote command on the EMR Master Node. The spark-submit
command is on lines 36–38, below.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to submit the job. The script requires one input parameter, which is the path to your EC2 key pair (e.g., ~/.ssh/my-key-pair.pem
)
python3 ./scripts/submit_spark_ssh.py \
--ec2-key-path </path/to/my-key-pair.pem>
The spark-submit
command will be executed remotely on the EMR cluster’s Master node over SSH. All variables in the commands will be replaced by the environment variables, set in advance, which use AWS CLI emr
and ssm
commands.

Monitoring Spark Jobs
We set spark.yarn.submit.waitAppCompletion
to true
. According to Spark’s documentation, this property controls whether the client waits to exit in YARN cluster mode until the application is completed. If set to true
, the client process will stay alive, reporting the application’s status. Otherwise, the client process will exit after submission. We can watch the job’s progress from the terminal.

We can also use the YARN Timeline Server and the Spark History Server in addition to the terminal. Links to both are shown on both the EMR Console’s Cluster ‘Summary’ and ‘Application user interfaces’ tabs. Unlike other EMR application web interfaces, using port forwarding, also known as creating an SSH tunnel, is not required for the YARN Timeline Server or the Spark History Server.

YARN Timeline Server
Below, we see that the job we submitted running on the YARN Timeline Server also includes useful tools like access to configuration, local logs, server stacks, and server metrics.

YARN Timeline Server allows us to drill down into individual jobs and view logs. Logs are ideal for troubleshooting failed jobs, especially the stdout
logs.

Spark History Server
You can also view the PySpark application we submitted from the Master node using the Spark History Server. Below, we see completed Spark applications (aka Spark jobs) in the Spark History Server.

Below, we see more details about our Spark job using the Spark History Server.

We can even see visual representations of each Spark job’s Directed Acyclic Graph (DAG).

3. Run Job Flow on an Auto-Terminating EMR Cluster
The next option to run PySpark applications on EMR is to create a short-lived, auto-terminating EMR cluster using the run_job_flow
method. We will create a new EMR cluster, run a series of Steps (PySpark applications), and then auto-terminate the cluster. This is a cost-effective method of running PySpark applications on-demand.
We will create a second 3-node EMR v6.2.0 cluster to demonstrate this method, using Amazon EC2 Spot instances for all the EMR cluster’s Master and Core nodes. Unlike the first, long-lived, more general-purpose EMR cluster, we will only deploy the Spark application to this cluster as that is the only application we will need to run the Steps.
Using the run_job_flow
method, we will execute the four PySpark data analysis applications. The PySpark application’s spark-submit
commands are defined in a separate JSON-format file, job_flow_steps_analyze.json
. Similar to the previous add_job_flow_steps.py
script, this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python execution script. Also similar, this script retrieves parameter values from the SSM Parameter Store.
From the GitHub repository’s local copy, run the following command, which will execute a Python script to create a new cluster, run the two PySpark applications, and then auto-terminate.
python3 ./scripts/run_job_flow.py --job-type analyze
As shown below, we see the short-lived EMR cluster in the process of terminating after successfully running the PySpark applications as EMR Steps.


4. Using AWS Step Functions
According to AWS, AWS Step Functions is a serverless function orchestrator that makes it easy to sequence AWS Lambda functions and multiple AWS services. Step Functions manages sequencing, error handling, retry logic, and state, removing a significant operational burden from your team. Step Functions is based on state machines and tasks. A state machine is a workflow. A task is a state in a workflow that represents a single unit of work that another AWS service performs. Each step in a workflow is a state. Using AWS Step Functions, we define our workflows as state machines, which transform complex code into easy to understand statements and diagrams.

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.
We will create two state machines for this demo, one for the PySpark data processing applications and one for the PySpark data analysis applications. To create state machines, we first need to create JSON-based state machine definition files. The files are written in Amazon States Language. According to AWS, Amazon States Language is a JSON-based, structured language used to define a state machine, a collection of states that can do work (Task states), determine which states to transition to next (Choice states), stop execution with an error (Fail states), and so on.
The definition files contain specific references to AWS resources deployed to your AWS account originally created by CloudFormation. Below is a snippet of the state machine definition file, step_function_emr_analyze.json
, showing part of the configuration of the EMR cluster. Note the parameterized key/value pairs (e.g., “Ec2KeyName.$”: “$.InstancesEc2KeyName” on line 5). The values will come from a JSON-formatted inputs file and are dynamically replaced upon the state machine’s execution.
Python Templating
To automate the process of adding dynamic resource references to the state machine’s inputs files, we will use Jinja, the modern and designer-friendly templating language for Python, modeled after Django’s templates. We will render the Jinja template to a JSON-based state machine inputs file, replacing the template’s resource tags (keys) with values from the SSM Parameter Store’s parameters. Below is a snippet from the inputs file Jinja template, step_function_inputs_analyze.j2
.
First, install Jinja2, then create two JSON-based state machine inputs files from the Jinja templates using the included Python file.
# install Jinja2 python3 -m pip install Jinja2 python3 ./scripts/create_inputs_files.py
Below we see the same snippet of the final inputs file. Jinja tags have been replaced with values from the SSM Parameter Store.
Using the definition files, create two state machines using the included Python files.
python3 ./scripts/create_state_machine.py \ --definition-file step_function_emr_process.json \ --state-machine EMR-Demo-Process python3 ./scripts/create_state_machine.py \ --definition-file step_function_emr_analyze.json \ --state-machine EMR-Demo-Analysis
Both state machines should appear in the AWS Step Functions Console’s State Machines tab. Below, we see the ‘EMR-Demo-Analysis’ state machine’s definition both as JSON and rendered visually to a layout.

To execute either of the state machines, use the included Python file, passing in the exact name of the state machine to execute, either ‘EMR-Demo-Process’ or ‘EMR-Demo-Analysis’, and the name of the inputs file. I suggest running the EMR-Demo-Analysis version so as not to re-process all the raw data.
python3 ./scripts/execute_state_machine.py \ --state-machine EMR-Demo-Process \ --inputs-file step_function_inputs_process.json python3 ./scripts/execute_state_machine.py \ --state-machine EMR-Demo-Analysis \ --inputs-file step_function_inputs_analyze.json
When the PySpark analysis application’s Step Function state machine is executed, a new EMR cluster is created, the PySpark applications are run, and finally, the cluster is auto-terminated. Below, we see a successfully executed state machine, which successfully ran the four PySpark analysis applications in parallel, on a new auto-terminating EMR cluster.

Conclusion
This post explored four methods for running PySpark applications on Amazon Elastic MapReduce (Amazon EMR). The key to scaling data analytics with PySpark on EMR is the use of automation. Therefore, we looked at ways to automate the deployment of EMR resources, create and submit PySpark jobs, and terminate EMR resources when the jobs are complete. Furthermore, we were able to decouple references to dynamic AWS resources within our PySpark applications using parameterization. This allows us to deploy and run PySpark resources across multiple AWS Accounts and AWS Regions without code changes.
In part two of the series, we will explore the use of the recently announced service, Amazon Managed Workflows for Apache Airflow (MWAA), and in part three, the use of Juypter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Streaming Analytics with Data Warehouses, using Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight
Posted by Gary A. Stafford in AWS, Cloud, Python, Software Development, SQL on March 5, 2020
Introduction
Databases are ideal for storing and organizing data that requires a high volume of transaction-oriented query processing while maintaining data integrity. In contrast, data warehouses are designed for performing data analytics on vast amounts of data from one or more disparate sources. In our fast-paced, hyper-connected world, those sources often take the form of continuous streams of web application logs, e-commerce transactions, social media feeds, online gaming activities, financial trading transactions, and IoT sensor readings. Streaming data must be analyzed in near real-time, while often first requiring cleansing, transformation, and enrichment.
In the following post, we will demonstrate the use of Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight to analyze streaming data. We will simulate time-series data, streaming from a set of IoT sensors to Kinesis Data Firehose. Kinesis Data Firehose will write the IoT data to an Amazon S3 Data Lake, where it will then be copied to Redshift in near real-time. In Amazon Redshift, we will enhance the streaming sensor data with data contained in the Redshift data warehouse, which has been gathered and denormalized into a star schema.
In Redshift, we can analyze the data, asking questions like, what is the min, max, mean, and median temperature over a given time period at each sensor location. Finally, we will use Amazon Quicksight to visualize the Redshift data using rich interactive charts and graphs, including displaying geospatial sensor data.
Featured Technologies
The following AWS services are discussed in this post.
Amazon Kinesis Data Firehose
According to Amazon, Amazon Kinesis Data Firehose can capture, transform, and load streaming data into data lakes, data stores, and analytics tools. Direct Kinesis Data Firehose integrations include Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. Kinesis Data Firehose enables near real-time analytics with existing business intelligence (BI) tools and dashboards.
Amazon Redshift
According to Amazon, Amazon Redshift is the most popular and fastest cloud data warehouse. With Redshift, users can query petabytes of structured and semi-structured data across your data warehouse and data lake using standard SQL. Redshift allows users to query and export data to and from data lakes. Redshift can federate queries of live data from Redshift, as well as across one or more relational databases.
Amazon Redshift Spectrum
According to Amazon, Amazon Redshift Spectrum can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables. Redshift Spectrum tables are created by defining the structure for data files and registering them as tables in an external data catalog. The external data catalog can be AWS Glue or an Apache Hive metastore. While Redshift Spectrum is an alternative to copying the data into Redshift for analysis, we will not be using Redshift Spectrum in this post.
Amazon QuickSight
According to Amazon, Amazon QuickSight is a fully managed business intelligence service that makes it easy to deliver insights to everyone in an organization. QuickSight lets users easily create and publish rich, interactive dashboards that include Amazon QuickSight ML Insights. Dashboards can then be accessed from any device and embedded into applications, portals, and websites.
What is a Data Warehouse?
According to Amazon, a data warehouse is a central repository of information that can be analyzed to make better-informed decisions. Data flows into a data warehouse from transactional systems, relational databases, and other sources, typically on a regular cadence. Business analysts, data scientists, and decision-makers access the data through business intelligence tools, SQL clients, and other analytics applications.
Demonstration
Source Code
All the source code for this post can be found on GitHub. Use the following command to git clone a local copy of the project.
git clone \ | |
–branch master –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/kinesis-redshift-streaming-demo.git |
CloudFormation
Use the two AWS CloudFormation templates, included in the project, to build two CloudFormation stacks. Please review the two templates and understand the costs of the resources before continuing.
The first CloudFormation template, redshift.yml, provisions a new Amazon VPC with associated network and security resources, a single-node Redshift cluster, and two S3 buckets.
The second CloudFormation template, kinesis-firehose.yml, provisions an Amazon Kinesis Data Firehose delivery stream, associated IAM Policy and Role, and an Amazon CloudWatch log group and two log streams.
Change the REDSHIFT_PASSWORD
value to ensure your security. Optionally, change the REDSHIFT_USERNAME
value. Make sure that the first stack completes successfully, before creating the second stack.
export AWS_DEFAULT_REGION=us-east-1 | |
REDSHIFT_USERNAME=awsuser | |
REDSHIFT_PASSWORD=5up3r53cr3tPa55w0rd | |
# Create resources | |
aws cloudformation create-stack \ | |
–stack-name redshift-stack \ | |
–template-body file://cloudformation/redshift.yml \ | |
–parameters ParameterKey=MasterUsername,ParameterValue=${REDSHIFT_USERNAME} \ | |
ParameterKey=MasterUserPassword,ParameterValue=${REDSHIFT_PASSWORD} \ | |
ParameterKey=InboundTraffic,ParameterValue=$(curl ifconfig.me -s)/32 \ | |
–capabilities CAPABILITY_NAMED_IAM | |
# Wait for first stack to complete | |
aws cloudformation create-stack \ | |
–stack-name kinesis-firehose-stack \ | |
–template-body file://cloudformation/kinesis-firehose.yml \ | |
–parameters ParameterKey=MasterUserPassword,ParameterValue=${REDSHIFT_PASSWORD} \ | |
–capabilities CAPABILITY_NAMED_IAM |
Review AWS Resources
To confirm all the AWS resources were created correctly, use the AWS Management Console.
Kinesis Data Firehose
In the Amazon Kinesis Dashboard, you should see the new Amazon Kinesis Data Firehose delivery stream, redshift-delivery-stream.
The Details tab of the new Amazon Kinesis Firehose delivery stream should look similar to the following. Note the IAM Role, FirehoseDeliveryRole, which was created and associated with the delivery stream by CloudFormation.
We are not performing any transformations of the incoming messages. Note the new S3 bucket that was created and associated with the stream by CloudFormation. The bucket name was randomly generated. This bucket is where the incoming messages will be written.
Note the buffer conditions of 1 MB and 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written in JSON format, using GZIP compression, to S3. These are the minimal buffer conditions, and as close to real-time streaming to Redshift as we can get.
Note the COPY
command, which is used to copy the messages from S3 to the message
table in Amazon Redshift. Kinesis uses the IAM Role, ClusterPermissionsRole, created by CloudFormation, for credentials. We are using a Manifest to copy the data to Redshift from S3. According to Amazon, a Manifest ensures that the COPY
command loads all of the required files, and only the required files, for a data load. The Manifests are automatically generated and managed by the Kinesis Firehose delivery stream.
Redshift Cluster
In the Amazon Redshift Console, you should see a new single-node Redshift cluster consisting of one Redshift dc2.large Dense Compute node type.
Note the new VPC, Subnet, and VPC Security Group created by CloudFormation. Also, observe that the Redshift cluster is publicly accessible from outside the new VPC.
Redshift Ingress Rules
The single-node Redshift cluster is assigned to an AWS Availability Zone in the US East (N. Virginia) us-east-1 AWS Region. The cluster is associated with a VPC Security Group. The Security Group contains three inbound rules, all for Redshift port 5439. The IP addresses associated with the three inbound rules provide access to the following: 1) a /27
CIDR block for Amazon QuickSight in us-east-1, a /27
CIDR block for Amazon Kinesis Firehose in us-east-1, and to you, a /32
CIDR block with your current IP address. If your IP address changes or you do not use the us-east-1 Region, you will need to change one or all of these IP addresses. The list of Kinesis Firehose IP addresses is here. The list of QuickSight IP addresses is here.
If you cannot connect to Redshift from your local SQL client, most often, your IP address has changed and is incorrect in the Security Group’s inbound rule.
Redshift SQL Client
You can choose to use the Redshift Query Editor to interact with Redshift or use a third-party SQL client for greater flexibility. To access the Redshift Query Editor, use the user credentials specified in the redshift.yml CloudFormation template.
There is a lot of useful functionality in the Redshift Console and within the Redshift Query Editor. However, a notable limitation of the Redshift Query Editor, in my opinion, is the inability to execute multiple SQL statements at the same time. Whereas, most SQL clients allow multiple SQL queries to be executed at the same time.
I prefer to use JetBrains PyCharm IDE. PyCharm has out-of-the-box integration with Redshift. Using PyCharm, I can edit the project’s Python, SQL, AWS CLI shell, and CloudFormation code, all from within PyCharm.
If you use any of the common SQL clients, you will need to set-up a JDBC (Java Database Connectivity) or ODBC (Open Database Connectivity) connection to Redshift. The ODBC and JDBC connection strings can be found in the Redshift cluster’s Properties tab or in the Outputs tab from the CloudFormation stack, redshift-stack
.
You will also need the Redshift database username and password you included in the aws cloudformation create-stack
AWS CLI command you executed previously. Below, we see PyCharm’s Project Data Sources window containing a new data source for the Redshift dev
database.
Database Schema and Tables
When CloudFormation created the Redshift cluster, it also created a new database, dev
. Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL commands to create a new database schema, sensor
, and six tables in the sensor
schema.
— Create new schema in Redshift DB | |
DROP SCHEMA IF EXISTS sensor CASCADE; | |
CREATE SCHEMA sensor; | |
SET search_path = sensor; | |
— Create (6) tables in Redshift DB | |
CREATE TABLE message — streaming data table | |
( | |
id BIGINT IDENTITY (1, 1), — message id | |
guid VARCHAR(36) NOT NULL, — device guid | |
ts BIGINT NOT NULL DISTKEY SORTKEY, — epoch in seconds | |
temp NUMERIC(5, 2) NOT NULL, — temperature reading | |
created TIMESTAMP DEFAULT ('now'::text)::timestamp with time zone — row created at | |
); | |
CREATE TABLE location — dimension table | |
( | |
id INTEGER NOT NULL DISTKEY SORTKEY, — location id | |
long NUMERIC(10, 7) NOT NULL, — longitude | |
lat NUMERIC(10, 7) NOT NULL, — latitude | |
description VARCHAR(256) — location description | |
); | |
CREATE TABLE history — dimension table | |
( | |
id INTEGER NOT NULL DISTKEY SORTKEY, — history id | |
serviced BIGINT NOT NULL, — service date | |
action VARCHAR(20) NOT NULL, — INSTALLED, CALIBRATED, FIRMWARE UPGRADED, DECOMMISSIONED, OTHER | |
technician_id INTEGER NOT NULL, — technician id | |
notes VARCHAR(256) — notes | |
); | |
CREATE TABLE sensor — dimension table | |
( | |
id INTEGER NOT NULL DISTKEY SORTKEY, — sensor id | |
guid VARCHAR(36) NOT NULL, — device guid | |
mac VARCHAR(18) NOT NULL, — mac address | |
sku VARCHAR(18) NOT NULL, — product sku | |
upc VARCHAR(12) NOT NULL, — product upc | |
active BOOLEAN DEFAULT TRUE, —active status | |
notes VARCHAR(256) — notes | |
); | |
CREATE TABLE manufacturer — dimension table | |
( | |
id INTEGER NOT NULL DISTKEY SORTKEY, — manufacturer id | |
name VARCHAR(100) NOT NULL, — company name | |
website VARCHAR(100) NOT NULL, — company website | |
notes VARCHAR(256) — notes | |
); | |
CREATE TABLE sensors — fact table | |
( | |
id BIGINT IDENTITY (1, 1) DISTKEY SORTKEY, — fact id | |
sensor_id INTEGER NOT NULL, — sensor id | |
manufacturer_id INTEGER NOT NULL, — manufacturer id | |
location_id INTEGER NOT NULL, — location id | |
history_id BIGINT NOT NULL, — history id | |
message_guid VARCHAR(36) NOT NULL — sensor guid | |
); |
Star Schema
The tables represent denormalized data, taken from one or more relational database sources. The tables form a star schema. The star schema is widely used to develop data warehouses. The star schema consists of one or more fact tables referencing any number of dimension tables. The location
, manufacturer
, sensor
, and history
tables are dimension tables. The sensors
table is a fact table.
In the diagram below, the foreign key relationships are virtual, not physical. The diagram was created using PyCharm’s schema visualization tool. Note the schema’s star shape. The message
table is where the streaming IoT data will eventually be written. The message
table is related to the sensors
fact table through the common guid
field.
Sample Data to S3
Next, copy the sample data, included in the project, to the S3 data bucket created with CloudFormation. Each CSV-formatted data file corresponds to one of the tables we previously created. Since the bucket name is semi-random, we can use the AWS CLI and jq to get the bucket name, then use it to perform the copy commands.
# Get data bucket name | |
DATA_BUCKET=$(aws cloudformation describe-stacks \ | |
–stack-name redshift-stack \ | |
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "DataBucket") | .OutputValue') | |
echo $DATA_BUCKET | |
# Copy data | |
aws s3 cp data/history.csv s3://${DATA_BUCKET}/history/history.csv | |
aws s3 cp data/location.csv s3://${DATA_BUCKET}/location/location.csv | |
aws s3 cp data/manufacturer.csv s3://${DATA_BUCKET}/manufacturer/manufacturer.csv | |
aws s3 cp data/sensor.csv s3://${DATA_BUCKET}/sensor/sensor.csv | |
aws s3 cp data/sensors.csv s3://${DATA_BUCKET}/sensors/sensors.csv |
The output from the AWS CLI should look similar to the following.
Sample Data to Redshift
Whereas a relational database, such as Amazon RDS is designed for online transaction processing (OLTP), Amazon Redshift is designed for online analytic processing (OLAP) and business intelligence applications. To write data to Redshift we typically use the COPY
command versus frequent, individual INSERT
statements, as with OLTP, which would be prohibitively slow. According to Amazon, the Redshift COPY
command leverages the Amazon Redshift massively parallel processing (MPP) architecture to read and load data in parallel from files on Amazon S3, from a DynamoDB table, or from text output from one or more remote hosts.
In the following series of SQL statements, replace the placeholder, your_bucket_name
, in five places with your S3 data bucket name. The bucket name will start with the prefix, redshift-stack-databucket
. The bucket name can be found in the Outputs tab of the CloudFormation stack, redshift-stack
. Next, replace the placeholder, cluster_permissions_role_arn
, with the ARN (Amazon Resource Name) of the ClusterPermissionsRole. The ARN is formatted as follows, arn:aws:iam::your-account-id:role/ClusterPermissionsRole
. The ARN can be found in the Outputs tab of the CloudFormation stack, redshift-stack
.
Using the Redshift Query Editor or your SQL client of choice, execute the SQL statements to copy the sample data from S3 to each of the corresponding tables in the Redshift dev
database. The TRUNCATE
commands guarantee there is no previous sample data present in the tables.
— ** MUST FIRST CHANGE your_bucket_name and cluster_permissions_role_arn ** | |
— sensor schema | |
SET search_path = sensor; | |
— Copy sample data to tables from S3 | |
TRUNCATE TABLE history; | |
COPY history (id, serviced, action, technician_id, notes) | |
FROM 's3://your_bucket_name/history/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
TRUNCATE TABLE location; | |
COPY location (id, long, lat, description) | |
FROM 's3://your_bucket_name/location/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
TRUNCATE TABLE sensor; | |
COPY sensor (id, guid, mac, sku, upc, active, notes) | |
FROM 's3://your_bucket_name/sensor/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
TRUNCATE TABLE manufacturer; | |
COPY manufacturer (id, name, website, notes) | |
FROM 's3://your_bucket_name/manufacturer/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
TRUNCATE TABLE sensors; | |
COPY sensors (sensor_id, manufacturer_id, location_id, history_id, message_guid) | |
FROM 's3://your_bucket_name/sensors/' | |
CREDENTIALS 'aws_iam_role=cluster_permissions_role_arn' | |
CSV IGNOREHEADER 1; | |
SELECT COUNT(*) FROM history; — 30 | |
SELECT COUNT(*) FROM location; — 6 | |
SELECT COUNT(*) FROM sensor; — 6 | |
SELECT COUNT(*) FROM manufacturer; —1 | |
SELECT COUNT(*) FROM sensors; — 30 |
Database Views
Next, create four Redshift database Views. These views may be used to analyze the data in Redshift, and later, in Amazon QuickSight.
- sensor_msg_detail: Returns aggregated sensor details, using the
sensors
fact table and all five dimension tables in a SQL Join. - sensor_msg_count: Returns the number of messages received by Redshift, for each sensor.
- sensor_avg_temp: Returns the average temperature from each sensor, based on all the messages received from each sensor.
- sensor_avg_temp_current: View is identical for the previous view but limited to the last 30 minutes.
Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL statements.
— sensor schema | |
SET search_path = sensor; | |
— View 1: Sensor details | |
DROP VIEW IF EXISTS sensor_msg_detail; | |
CREATE OR REPLACE VIEW sensor_msg_detail AS | |
SELECT ('1970-01-01'::date + e.ts * interval '1 second') AS recorded, | |
e.temp, | |
s.guid, | |
s.sku, | |
s.mac, | |
l.lat, | |
l.long, | |
l.description AS location, | |
('1970-01-01'::date + h.serviced * interval '1 second') AS installed, | |
e.created AS redshift | |
FROM sensors f | |
INNER JOIN sensor s ON (f.sensor_id = s.id) | |
INNER JOIN history h ON (f.history_id = h.id) | |
INNER JOIN location l ON (f.location_id = l.id) | |
INNER JOIN manufacturer m ON (f.manufacturer_id = m.id) | |
INNER JOIN message e ON (f.message_guid = e.guid) | |
WHERE s.active IS TRUE | |
AND h.action = 'INSTALLED' | |
ORDER BY f.id; | |
— View 2: Message count per sensor | |
DROP VIEW IF EXISTS sensor_msg_count; | |
CREATE OR REPLACE VIEW sensor_msg_count AS | |
SELECT count(e.temp) AS msg_count, | |
s.guid, | |
l.lat, | |
l.long, | |
l.description AS location | |
FROM sensors f | |
INNER JOIN sensor s ON (f.sensor_id = s.id) | |
INNER JOIN history h ON (f.history_id = h.id) | |
INNER JOIN location l ON (f.location_id = l.id) | |
INNER JOIN message e ON (f.message_guid = e.guid) | |
WHERE s.active IS TRUE | |
AND h.action = 'INSTALLED' | |
GROUP BY s.guid, l.description, l.lat, l.long | |
ORDER BY msg_count, s.guid; | |
— View 3: Average temperature per sensor (all data) | |
DROP VIEW IF EXISTS sensor_avg_temp; | |
CREATE OR REPLACE VIEW sensor_avg_temp AS | |
SELECT avg(e.temp) AS avg_temp, | |
count(s.guid) AS msg_count, | |
s.guid, | |
l.lat, | |
l.long, | |
l.description AS location | |
FROM sensors f | |
INNER JOIN sensor s ON (f.sensor_id = s.id) | |
INNER JOIN history h ON (f.history_id = h.id) | |
INNER JOIN location l ON (f.location_id = l.id) | |
INNER JOIN message e ON (f.message_guid = e.guid) | |
WHERE s.active IS TRUE | |
AND h.action = 'INSTALLED' | |
GROUP BY s.guid, l.description, l.lat, l.long | |
ORDER BY avg_temp, s.guid; | |
— View 4: Average temperature per sensor (last 30 minutes) | |
DROP VIEW IF EXISTS sensor_avg_temp_current; | |
CREATE OR REPLACE VIEW sensor_avg_temp_current AS | |
SELECT avg(e.temp) AS avg_temp, | |
count(s.guid) AS msg_count, | |
s.guid, | |
l.lat, | |
l.long, | |
l.description AS location | |
FROM sensors f | |
INNER JOIN sensor s ON (f.sensor_id = s.id) | |
INNER JOIN history h ON (f.history_id = h.id) | |
INNER JOIN location l ON (f.location_id = l.id) | |
INNER JOIN (SELECT ('1970-01-01'::date + ts * interval '1 second') AS recorded_time, | |
guid, | |
temp | |
FROM message | |
WHERE DATEDIFF(minute, recorded_time, GETDATE()) <= 30) e ON (f.message_guid = e.guid) | |
WHERE s.active IS TRUE | |
AND h.action = 'INSTALLED' | |
GROUP BY s.guid, l.description, l.lat, l.long | |
ORDER BY avg_temp, s.guid; |
At this point, you should have a total of six tables and four views in the sensor
schema of the dev
database in Redshift.
Test the System
With all the necessary AWS resources and Redshift database objects created and sample data in the Redshift database, we can test the system. The included Python script, kinesis_put_test_msg.py, will generate a single test message and send it to Kinesis Data Firehose. If everything is working, the message should be delivered from Kinesis Data Firehose to S3, then copied to Redshift, and appear in the message
table.
Install the required Python packages and then execute the Python script.
# Install required Python packages | |
python3 -m pip install –user -r scripts/requirements.txt | |
# Set default AWS Region for script | |
export AWS_DEFAULT_REGION=us-east-1 | |
# Execute script in foreground | |
python3 ./scripts/kinesis_put_test_msg.py |
Run the following SQL query to confirm the record is in the message
table of the dev
database. It will take at least one minute for the message to appear in Redshift.
SELECT COUNT(*) FROM message; |
Once the message is confirmed to be present in the message
table, delete the record by truncating the table.
TRUNCATE TABLE message; |
Streaming Data
Assuming the test message worked, we can proceed with simulating the streaming IoT sensor data. The included Python script, kinesis_put_streaming_data.py, creates six concurrent threads, representing six temperature sensors.
#!/usr/bin/env python3 | |
# Simulated multiple streaming time-series iot sensor data | |
# Author: Gary A. Stafford | |
# Date: Revised October 2020 | |
import json | |
import random | |
from datetime import datetime | |
import boto3 | |
import time as tm | |
import numpy as np | |
import threading | |
STREAM_NAME = 'redshift-delivery-stream' | |
client = boto3.client('firehose') | |
class MyThread(threading.Thread): | |
def __init__(self, thread_id, sensor_guid, temp_max): | |
threading.Thread.__init__(self) | |
self.thread_id = thread_id | |
self.sensor_id = sensor_guid | |
self.temp_max = temp_max | |
def run(self): | |
print("Starting Thread: " + str(self.thread_id)) | |
self.create_data() | |
print("Exiting Thread: " + str(self.thread_id)) | |
def create_data(self): | |
start = 0 | |
stop = 20 | |
step = 0.1 # step size (e.g 0 to 20, step .1 = 200 steps in cycle) | |
repeat = 2 # how many times to repeat cycle | |
freq = 60 # frequency of temperature reading in seconds | |
max_range = int(stop * (1 / step)) | |
time = np.arange(start, stop, step) | |
amplitude = np.sin(time) | |
for x in range(0, repeat): | |
for y in range(0, max_range): | |
temperature = round((((amplitude[y] + 1.0) * self.temp_max) + random.uniform(–5, 5)) + 60, 2) | |
payload = { | |
'guid': self.sensor_id, | |
'ts': int(datetime.now().strftime('%s')), | |
'temp': temperature | |
} | |
print(json.dumps(payload)) | |
self.send_to_kinesis(payload) | |
tm.sleep(freq) | |
@staticmethod | |
def send_to_kinesis(payload): | |
_ = client.put_record( | |
DeliveryStreamName=STREAM_NAME, | |
Record={ | |
'Data': json.dumps(payload) | |
} | |
) | |
def main(): | |
sensor_guids = [ | |
"03e39872-e105-4be4-83c0-9ade818465dc", | |
"fa565921-fddd-4bfb-a7fd-d617f816df4b", | |
"d120422d-5789-435d-9dc6-73d8489b04c2", | |
"93238559-4d55-4b2a-bdcb-6aa3be0f3908", | |
"dbc05806-6872-4f0a-aca2-f794cc39bd9b", | |
"f9ade639-f936-4954-aa5a-1f2ed86c9bcf" | |
] | |
timeout = 300 # arbitrarily offset the start of threads (60 / 5 = 12) | |
# Create new threads | |
thread1 = MyThread(1, sensor_guids[0], 25) | |
thread2 = MyThread(2, sensor_guids[1], 10) | |
thread3 = MyThread(3, sensor_guids[2], 7) | |
thread4 = MyThread(4, sensor_guids[3], 30) | |
thread5 = MyThread(5, sensor_guids[4], 5) | |
thread6 = MyThread(6, sensor_guids[5], 12) | |
# Start new threads | |
thread1.start() | |
tm.sleep(timeout * 1) | |
thread2.start() | |
tm.sleep(timeout * 2) | |
thread3.start() | |
tm.sleep(timeout * 1) | |
thread4.start() | |
tm.sleep(timeout * 3) | |
thread5.start() | |
tm.sleep(timeout * 2) | |
thread6.start() | |
# Wait for threads to terminate | |
thread1.join() | |
thread2.join() | |
thread3.join() | |
thread4.join() | |
thread5.join() | |
thread6.join() | |
print("Exiting Main Thread") | |
if __name__ == '__main__': | |
main() |
The simulated data uses an algorithm that follows an oscillating sine wave or sinusoid, representing rising and falling temperatures. In the script, I have configured each thread to start with an arbitrary offset to add some randomness to the simulated data.
The variables within the script can be adjusted to shorten or lengthen the time it takes to stream the simulated data. By default, each of the six threads creates 400 messages per sensor, in one-minute increments. Including the offset start of each proceeding thread, the total runtime of the script is about 7.5 hours to generate 2,400 simulated IoT sensor temperature readings and push to Kinesis Data Firehose. Make sure you can guarantee you will maintain a connection to the Internet for the entire runtime of the script. I normally run the script in the background, from a small EC2 instance.
To use the Python script, execute either of the two following commands. Using the first command will run the script in the foreground. Using the second command will run the script in the background.
# Install required Python packages | |
python3 -m pip install –user -r scripts/requirements.txt | |
# Set default AWS Region for script | |
export AWS_DEFAULT_REGION=us-east-1 | |
# Option #1: Execute script in foreground | |
python3 ./scripts/kinesis_put_streaming_data.py | |
# Option #2: execute script in background | |
nohup python3 -u ./scripts/kinesis_put_streaming_data.py > output.log 2>&1 </dev/null & | |
# Check that the process is running | |
ps -aux | grep 'python3 -u ./scripts/kinesis_put_streaming_data.py' | |
# Wait 1-2 minutes, then check output to confirm script is working | |
cat output.log |
Viewing the output.log file, you should see messages being generated on each thread and sent to Kinesis Data Firehose. Each message contains the GUID of the sensor, a timestamp, and a temperature reading.
The messages are sent to Kinesis Data Firehose, which in turn writes the messages to S3. The messages are written in JSON format using GZIP compression. Below, we see an example of the GZIP compressed JSON files in S3. The JSON files are partitioned by year, month, day, and hour.
Confirm Data Streaming to Redshift
From the Amazon Kinesis Firehose Console Metrics tab, you should see incoming messages flowing to S3 and on to Redshift.
Executing the following SQL query should show an increasing number of messages.
SELECT COUNT(*) FROM message; |
How Near Real-time?
Earlier, we saw how the Amazon Kinesis Data Firehose delivery stream was configured to buffer data at the rate of 1 MB or 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written to S3. Each record in the message
table has two timestamps. The first timestamp, ts, is when the temperature reading was recorded. The second timestamp, created, is when the message was written to Redshift, using the COPY
command. We can calculate the delta in seconds between the two timestamps using the following SQL query in Redshift.
SELECT ('1970-01-01'::date + ts * interval '1 second') AS recorded_time, | |
created AS redshift_time, | |
DATEDIFF(seconds, recorded_time, redshift_time) AS diff_seconds | |
FROM message | |
ORDER BY diff_seconds; |
Using the results of the Redshift query, we can visualize the results in Amazon QuickSight. In my own tests, we see that for 2,400 messages, over approximately 7.5 hours, the minimum delay was 1 second, and a maximum delay was 64 seconds. Hence, near real-time, in this case, is about one minute or less, with an average latency of roughly 30 seconds.
Analyzing the Data with Redshift
I suggest waiting at least thirty minutes for a significant number of messages copied into Redshift. With the data streaming into Redshift, execute each of the database views we created earlier. You should see the streaming message data, joined to the existing static data in Redshift. As data continues to stream into Redshift, the views will display different results based on the current message
table contents.
Here, we see the first ten results of the sensor_msg_detail
view.
recorded | temp | guid | sku | mac | lat | long | location | installed | redshift | |
---|---|---|---|---|---|---|---|---|---|---|
2020-03-04 03:31:59.000000 | 105.56 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:33:01.580147 | |
2020-03-04 03:29:59.000000 | 95.93 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:31:01.388887 | |
2020-03-04 03:26:58.000000 | 91.93 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:28:01.099796 | |
2020-03-04 03:25:58.000000 | 88.70 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:26:00.196113 | |
2020-03-04 03:22:58.000000 | 87.65 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:23:01.558514 | |
2020-03-04 03:20:58.000000 | 77.35 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:21:00.691347 | |
2020-03-04 03:16:57.000000 | 71.84 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:17:59.307510 | |
2020-03-04 03:15:57.000000 | 72.35 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:15:59.813656 | |
2020-03-04 03:14:57.000000 | 67.95 | 03e39872-e105-4be4-83c0-9ade818465dc | PR49-24A | 8e:fa:46:09:14:b2 | 37.7068476 | -122.4191599 | Research Lab #2203 | 2018-01-31 12:00:00.000000 | 2020-03-04 03:15:59.813656 |
Next, we see the results of the sensor_avg_temp
view.
avg_temp | guid | lat | long | location | |
---|---|---|---|---|---|
65.25 | dbc05806-6872-4f0a-aca2-f794cc39bd9b | 37.7066541 | -122.4181399 | Wafer Inspection Lab #0210A | |
67.23 | d120422d-5789-435d-9dc6-73d8489b04c2 | 37.7072686 | -122.4187016 | Zone 4 Wafer Processing Area B3 | |
70.23 | fa565921-fddd-4bfb-a7fd-d617f816df4b | 37.7071763 | -122.4190397 | Research Lab #2209 | |
72.22 | f9ade639-f936-4954-aa5a-1f2ed86c9bcf | 37.7067618 | -122.4186191 | Wafer Inspection Lab #0211C | |
85.48 | 03e39872-e105-4be4-83c0-9ade818465dc | 37.7068476 | -122.4191599 | Research Lab #2203 | |
90.69 | 93238559-4d55-4b2a-bdcb-6aa3be0f3908 | 37.7070334 | -122.4184393 | Zone 2 Semiconductor Assembly Area A2 |
Amazon QuickSight
In a recent post, Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 2, I detailed getting started with Amazon QuickSight. In this post, I will assume you are familiar with QuickSight.
Amazon recently added a full set of aws quicksight
APIs for interacting with QuickSight. Though, for this part of the demonstration, we will be working directly in the Amazon QuickSight Console, as opposed to the AWS CLI, AWS CDK, or CloudFormation.
Redshift Data Sets
To visualize the data from Amazon Redshift, we start by creating Data Sets in QuickSight. QuickSight supports a large number of data sources for creating data sets. We will use the Redshift data source. If you recall, we added an inbound rule for QuickSight, allowing us to connect to our Redshift cluster in us-east-1.
We will select the sensor
schema, which is where the tables and views for this demonstration are located.
We can choose any of the tables or views in the Redshift dev
database that we want to use for visualization.
Below, we see examples of two new data sets, shown in the QuickSight Data Prep Console. Note how QuickSight automatically recognizes field types, including dates, latitude, and longitude.
Visualizations
Using the data sets, QuickSight allows us to create a wide number of rich visualizations. Below, we see the simulated time-series data from the six temperature sensors.
Next, we see an example of QuickSight’s ability to show geospatial data. The Map shows the location of each sensor and the average temperature recorded by that sensor.
Cleaning Up
To remove the resources created for this post, use the following series of AWS CLI commands.
# Get data bucket name | |
DATA_BUCKET=$(aws cloudformation describe-stacks \ | |
–stack-name redshift-stack \ | |
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "DataBucket") | .OutputValue') | |
echo ${DATA_BUCKET} | |
# Get log bucket name | |
LOG_BUCKET=$(aws cloudformation describe-stacks \ | |
–stack-name redshift-stack \ | |
| jq -r '.Stacks[].Outputs[] | select(.OutputKey == "LogBucket") | .OutputValue') | |
echo ${LOG_BUCKET} | |
# Delete demonstration resources | |
python3 ./scripts/delete_buckets.py | |
aws cloudformation delete-stack –stack-name kinesis-firehose-stack | |
# Wait for first stack to be deleted | |
aws cloudformation delete-stack –stack-name redshift-stack |
Conclusion
In this brief post, we have learned how streaming data can be analyzed in near real-time, in Amazon Redshift, using Amazon Kinesis Data Firehose. Further, we explored how the results of those analyses can be visualized in Amazon QuickSight. For customers that depend on a data warehouse for data analytics, but who also have streaming data sources, the use of Amazon Kinesis Data Firehose or Amazon Redshift Spectrum is an excellent choice.
This blog represents my own viewpoints and not of my employer, Amazon Web Services.
Getting Started with PostgreSQL using Amazon RDS, CloudFormation, pgAdmin, and Python
Posted by Gary A. Stafford in AWS, Cloud, Python, Software Development on August 9, 2019
Introduction
In the following post, we will explore how to get started with Amazon Relational Database Service (RDS) for PostgreSQL. CloudFormation will be used to build a PostgreSQL master database instance and a single read replica in a new VPC. AWS Systems Manager Parameter Store will be used to store our CloudFormation configuration values. Amazon RDS Event Notifications will send text messages to our mobile device to let us know when the RDS instances are ready for use. Once running, we will examine a variety of methods to interact with our database instances, including pgAdmin, Adminer, and Python.
Technologies
The primary technologies used in this post include the following.
PostgreSQL
According to its website, PostgreSQL, commonly known as Postgres, is the world’s most advanced Open Source relational database. Originating at UC Berkeley in 1986, PostgreSQL has more than 30 years of active core development. PostgreSQL has earned a strong reputation for its proven architecture, reliability, data integrity, robust feature set, extensibility. PostgreSQL runs on all major operating systems and has been ACID-compliant since 2001.
Amazon RDS for PostgreSQL
According to Amazon, Amazon Relational Database Service (RDS) provides six familiar database engines to choose from, including Amazon Aurora, PostgreSQL, MySQL, MariaDB, Oracle Database, and SQL Server. RDS is available on several database instance types - optimized for memory, performance, or I/O.
Amazon RDS for PostgreSQL makes it easy to set up, operate, and scale PostgreSQL deployments in the cloud. Amazon RDS supports the latest PostgreSQL version 11, which includes several enhancements to performance, robustness, transaction management, query parallelism, and more.
AWS CloudFormation
According to Amazon, CloudFormation provides a common language to describe and provision all the infrastructure resources within AWS-based cloud environments. CloudFormation allows you to use a JSON- or YAML-based template to model and provision all the resources needed for your applications across all AWS regions and accounts, in an automated and secure manner.
Demonstration
Architecture
Below, we see an architectural representation of what will be built in the demonstration. This is not a typical three-tier AWS architecture, wherein the RDS instances would be placed in private subnets (data tier) and accessible only by the application tier, running on AWS. The architecture for the demonstration is designed for interacting with RDS through external database clients such as pgAdmin, and applications like our local Python scripts, detailed later in the post.
Source Code
All source code for this post is available on GitHub in a single public repository, postgres-rds-demo.
. ├── LICENSE.md ├── README.md ├── cfn-templates │ ├── event.template │ ├── rds.template ├── parameter_store_values.sh ├── python-scripts │ ├── create_pagila_data.py │ ├── database.ini │ ├── db_config.py │ ├── query_postgres.py │ ├── requirements.txt │ └── unit_tests.py ├── sql-scripts │ ├── pagila-insert-data.sql │ └── pagila-schema.sql └── stack.yml
To clone the GitHub repository, execute the following command.
git clone --branch master --single-branch --depth 1 --no-tags \ https://github.com/garystafford/aws-rds-postgres.git
Prerequisites
For this demonstration, I will assume you already have an AWS account. Further, that you have the latest copy of the AWS CLI and Python 3 installed on your development machine. Optionally, for pgAdmin and Adminer, you will also need to have Docker installed.
Steps
In this demonstration, we will perform the following steps.
- Put CloudFormation configuration values in Parameter Store;
- Execute CloudFormation templates to create AWS resources;
- Execute SQL scripts using Python to populate the new database with sample data;
- Configure pgAdmin and Python connections to RDS PostgreSQL instances;
AWS Systems Manager Parameter Store
With AWS, it is typical to use services like AWS Systems Manager Parameter Store and AWS Secrets Manager to store overt, sensitive, and secret configuration values. These values are utilized by your code, or from AWS services like CloudFormation. Parameter Store allows us to follow the proper twelve-factor, cloud-native practice of separating configuration from code.
To demonstrate the use of Parameter Store, we will place a few of our CloudFormation configuration items into Parameter Store. The demonstration’s GitHub repository includes a shell script, parameter_store_values.sh
, which will put the necessary parameters into Parameter Store.
Below, we see several of the demo’s configuration values, which have been put into Parameter Store.
SecureString
Whereas our other parameters are stored in Parameter Store as String datatypes, the database’s master user password is stored as a SecureString data-type. Parameter Store uses an AWS Key Management Service (KMS) customer master key (CMK) to encrypt the SecureString parameter value.
SMS Text Alert Option
Before running the Parameter Store script, you will need to change the /rds_demo/alert_phone
parameter value in the script (shown below) to your mobile device number, including country code, such as ‘+12038675309’. Amazon SNS will use it to send SMS messages, using Amazon RDS Event Notification. If you don’t want to use this messaging feature, simply ignore this parameter and do not execute the event.template
CloudFormation template in the proceeding step.
aws ssm put-parameter \ --name /rds_demo/alert_phone \ --type String \ --value "your_phone_number_here" \ --description "RDS alert SMS phone number" \ --overwrite
Run the following command to execute the shell script, parameter_store_values.sh
, which will put the necessary parameters into Parameter Store.
sh ./parameter_store_values.sh
CloudFormation Templates
The GitHub repository includes two CloudFormation templates, cfn-templates/event.template
and cfn-templates/rds.template
. This event template contains two resources, which are an AWS SNS Topic and an AWS RDS Event Subscription. The RDS template also includes several resources, including a VPC, Internet Gateway, VPC security group, two public subnets, one RDS master database instance, and an AWS RDS Read Replica database instance.
The resources are split into two CloudFormation templates so we can create the notification resources, first, independently of creating or deleting the RDS instances. This will ensure we get all our SMS alerts about both the creation and deletion of the databases.
Template Parameters
The two CloudFormation templates contain a total of approximately fifteen parameters. For most, you can use the default values I have set or chose to override them. Four of the parameters will be fulfilled from Parameter Store. Of these, the master database password is treated slightly differently because it is secure (encrypted in Parameter Store). Below is a snippet of the template showing both types of parameters. The last two are fulfilled from Parameter Store.
DBInstanceClass: Type: String Default: "db.t3.small" DBStorageType: Type: String Default: "gp2" DBUser: Type: String Default: "{{resolve:ssm:/rds_demo/master_username:1}}" DBPassword: Type: String Default: "{{resolve:ssm-secure:/rds_demo/master_password:1}}" NoEcho: True
Choosing the default CloudFormation parameter values will result in two minimally-configured RDS instances running the PostgreSQL 11.4 database engine on a db.t3.small instance with 10 GiB of General Purpose (SSD) storage. The db.t3 DB instance is part of the latest generation burstable performance instance class. The master instance is not configured for Multi-AZ high availability. However, the master and read replica each run in a different Availability Zone (AZ) within the same AWS Region.
Parameter Versioning
When placing parameters into Parameter Store, subsequent updates to a parameter result in the version number of that parameter being incremented. Note in the examples above, the version of the parameter is required by CloudFormation, here, ‘1’. If you chose to update a value in Parameter Store, thus incrementing the parameter’s version, you will also need to update the corresponding version number in the CloudFormation template’s parameter.
{ "Parameter": { "Name": "/rds_demo/rds_username", "Type": "String", "Value": "masteruser", "Version": 1, "LastModifiedDate": 1564962073.774, "ARN": "arn:aws:ssm:us-east-1:1234567890:parameter/rds_demo/rds_username" } }
Validating Templates
Although I have tested both templates, I suggest validating the templates yourself, as you usually would for any CloudFormation template you are creating. You can use the AWS CLI CloudFormation validate-template
CLI command to validate the template. Alternately, or I suggest additionally, you can use CloudFormation Linter, cfn-lint
command.
aws cloudformation validate-template \ --template-body file://cfn-templates/rds.template cfn-lint -t cfn-templates/cfn-templates/rds.template
Create the Stacks
To execute the first CloudFormation template and create a CloudFormation Stack containing the two event notification resources, run the following create-stack
CLI command.
aws cloudformation create-stack \ --template-body file://cfn-templates/event.template \ --stack-name RDSEventDemoStack
The first stack only takes less than one minute to create. Using the AWS CloudFormation Console, make sure the first stack completes successfully before creating the second stack with the command, below.
aws cloudformation create-stack \ --template-body file://cfn-templates/rds.template \ --stack-name RDSDemoStack
Wait for my Text
In my tests, the CloudFormation RDS stack takes an average of 25–30 minutes to create and 15–20 minutes to delete, which can seem like an eternity. You could use the AWS CloudFormation console (shown below) or continue to use the CLI to follow the progress of the RDS stack creation.
However, if you recall, the CloudFormation event template creates an AWS RDS Event Subscription. This resource will notify us when the databases are ready by sending text messages to our mobile device.
In the CloudFormation events template, the RDS Event Subscription is configured to generate Amazon Simple Notification Service (SNS) notifications for several specific event types, including RDS instance creation and deletion.
MyEventSubscription: Properties: Enabled: true EventCategories: - availability - configuration change - creation - deletion - failover - failure - recovery SnsTopicArn: Ref: MyDBSNSTopic SourceType: db-instance Type: AWS::RDS::EventSubscription
Amazon SNS will send SMS messages to the mobile number you placed into Parameter Store. Below, we see messages generated during the creation of the two instances, displayed on an Apple iPhone.
Amazon RDS Dashboard
Once the RDS CloudFormation stack has successfully been built, the easiest way to view the results is using the Amazon RDS Dashboard, as shown below. Here we see both the master and read replica instances have been created and are available for our use.
The RDS dashboard offers CloudWatch monitoring of each RDS instance.
The RDS dashboard also provides detailed configuration information about each RDS instance.
The RDS dashboard’s Connection & security tab is where we can obtain connection information about our RDS instances, including the RDS instance’s endpoints. Endpoints information will be required in the next part of the demonstration.
Sample Data
Now that we have our PostgreSQL database instance and read replica successfully provisioned and configured on AWS, with an empty database, we need some test data. There are several sources of sample PostgreSQL databases available on the internet to explore. We will use the Pagila sample movie rental database by pgFoundry. Although the database is several years old, it provides a relatively complex relational schema (table relationships shown below) and plenty of sample data to query, about 100 database objects and 46K rows of data.
In the GitHub repository, I have included the two Pagila database SQL scripts required to install the sample database’s data structures (DDL), sql-scripts/pagila-schema.sql
, and the data itself (DML), sql-scripts/pagila-insert-data.sql
.
To execute the Pagila SQL scripts and install the sample data, I have included a Python script. If you do not want to use Python, you can skip to the Adminer section of this post. Adminer also has the capability to import SQL scripts.
Before running any of the included Python scripts, you will need to install the required Python packages and configure the database.ini
file.
Python Packages
To install the required Python packages using the supplied python-scripts/requirements.txt
file, run the below commands.
cd python-scripts pip3 install --upgrade -r requirements.txt
We are using two packages, psycopg2 and configparser, for the scripts. Psycopg is a PostgreSQL database adapter for Python. According to their website, Psycopg is the most popular PostgreSQL database adapter for the Python programming language. The configparser
module allows us to read configuration from files similar to Microsoft Windows INI files. The unittest package is required for a set of unit tests includes the project, but not discussed as part of the demo.
Database Configuration
The python-scripts/database.ini
file, read by configparser
, provides the required connection information to our RDS master and read replica instance’s databases. Use the input parameters and output values from the CloudFormation RDS template, or the Amazon RDS Dashboard to obtain the required connection information, as shown in the example, below. Your host
values will be unique for your master and read replica. The host values are the instance’s endpoint, listed in the RDS Dashboard’s Configuration tab.
[docker] host=localhost port=5432 database=pagila user=masteruser password=5up3r53cr3tPa55w0rd [master] host=demo-instance.dkfvbjrazxmd.us-east-1.rds.amazonaws.com port=5432 database=pagila user=masteruser password=5up3r53cr3tPa55w0rd [replica] host=demo-replica.dkfvbjrazxmd.us-east-1.rds.amazonaws.com port=5432 database=pagila user=masteruser password=5up3r53cr3tPa55w0rd
With the INI file configured, run the following command, which executes a supplied Python script, python-scripts/create_pagila_data.py
, to create the data structure and insert sample data into the master RDS instance’s Pagila database. The database will be automatically replicated to the RDS read replica instance. From my local laptop, I found the Python script takes approximately 40 seconds to create all 100 database objects and insert 46K rows of movie rental data. That is compared to about 13 seconds locally, using a Docker-based instance of PostgreSQL.
python3 ./create_pagila_data.py --instance master
The Python script’s primary function, create_pagila_db()
, reads and executes the two external SQL scripts.
def create_pagila_db(): """ Creates Pagila database by running DDL and DML scripts """ try: global conn with conn: with conn.cursor() as curs: curs.execute(open("../sql-scripts/pagila-schema.sql", "r").read()) curs.execute(open("../sql-scripts/pagila-insert-data.sql", "r").read()) conn.commit() print('Pagila SQL scripts executed') except (psycopg2.OperationalError, psycopg2.DatabaseError, FileNotFoundError) as err: print(create_pagila_db.__name__, err) close_conn() exit(1)
If the Python script executes correctly, you should see output indicating there are now 28 tables in our master RDS instance’s database.
pgAdmin
pgAdmin is a favorite tool for interacting with and managing PostgreSQL databases. According to its website, pgAdmin is the most popular and feature-rich Open Source administration and development platform for PostgreSQL.
The project includes an optional Docker Swarm stack.yml
file. The stack will create a set of three Docker containers, including a local copy of PostgreSQL 11.4, Adminer, and pgAdmin 4. Having a local copy of PostgreSQL, using the official Docker image, is helpful for development and trouble-shooting RDS issues.
Use the following commands to deploy the Swarm stack.
# create stack docker swarm init docker stack deploy -c stack.yml postgres # get status of new containers docker stack ps postgres --no-trunc docker container ls
If you do not want to spin up the whole Docker Swarm stack, you could use the docker run
command to create just a single pgAdmin Docker container. The pgAdmin 4 Docker image being used is the image recommended by pgAdmin.
docker pull dpage/pgadmin4 docker run -p 81:80 \ -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" \ -e "PGADMIN_DEFAULT_PASSWORD=SuperSecret" \ -d dpage/pgadmin4 docker container ls | grep pgadmin4
Database Server Configuration
Once pgAdmin is up and running, we can configure the master and read replica database servers (RDS instances) using the connection string information from your database.ini
file or from the Amazon RDS Dashboard. Below, I am configuring the master RDS instance (server).
With that task complete, below, we see the master RDS instance and the read replica, as well as my local Docker instance configured in pgAdmin (left side of screengrab). Note how the Pagila database has been replicated automatically, from the RDS master to the read replica instance.
Building SQL Queries
Switching to the Query tab, we can run regular SQL queries against any of the database instances. Below, I have run a simple SELECT query against the master RDS instance’s Pagila database, returning the complete list of movie titles, along with their genre and release date.
The pgAdmin Query tool even includes an Explain tab to view a graphical representation of the same query, very useful for optimization. Here we see the same query, showing an analysis of the execution order. A popup window displays information about the selected object.
Query the Read Replica
To demonstrate the use of the read replica, below I’ve run the same query against the RDS read replica’s copy of the Pagila database. Any schema and data changes against the master instance are replicated to the read replica(s).
Adminer
Adminer is another good general-purpose database management tool, similar to pgAdmin, but with a few different capabilities. According to its website, with Adminer, you get a tidy user interface, rich support for multiple databases, performance, and security, all from a single PHP file. Adminer is my preferred tool for database admin tasks. Amazingly, Adminer works with MySQL, MariaDB, PostgreSQL, SQLite, MS SQL, Oracle, SimpleDB, Elasticsearch, and MongoDB.
Below, we see the Pagila database’s tables and views displayed in Adminer, along with some useful statistical information about each database object.
Similar to pgAdmin, we can also run queries, along with other common development and management tasks, from within the Adminer interface.
Import Pagila with Adminer
Another great feature of Adminer is the ability to easily import and export data. As an alternative to Python, you could import the Pagila data using Adminer’s SQL file import function. Below, you see an example of importing the Pagila database objects into the Pagila database, using the file upload function.
IDE
For writing my AWS infrastructure as code files and Python scripts, I prefer JetBrains PyCharm Professional Edition (v19.2). PyCharm, like all the JetBrains IDEs, has the ability to connect to and manage PostgreSQL database. You can write and run SQL queries, including the Pagila SQL import scripts. Microsoft Visual Studio Code is another excellent, free choice, available on multiple platforms.
Python and RDS
Although our IDE, pgAdmin, and Adminer are useful to build and test our queries, ultimately, we still need to connect to the Amazon RDS PostgreSQL instances and perform data manipulation from our application code. The GitHub repository includes a sample python script, python-scripts/query_postgres.py
. This script uses the same Python packages and connection functions as our Pagila data creation script we ran earlier. This time we will perform the same SELECT query using Python as we did previously with pgAdmin and Adminer.
cd python-scripts python3 ./query_postgres.py --instance master
With a successful database connection established, the scripts primary function, get_movies(return_count)
, performs the SELECT query. The function accepts an integer representing the desired number of movies to return from the SELECT query. A separate function within the script handles closing the database connection when the query is finished.
def get_movies(return_count=100): """ Queries for all films, by genre and year """ try: global conn with conn: with conn.cursor() as curs: curs.execute(""" SELECT title AS title, name AS genre, release_year AS released FROM film f JOIN film_category fc ON f.film_id = fc.film_id JOIN category c ON fc.category_id = c.category_id ORDER BY title LIMIT %s; """, (return_count,)) movies = [] row = curs.fetchone() while row is not None: movies.append(row) row = curs.fetchone() return movies except (psycopg2.OperationalError, psycopg2.DatabaseError) as err: print(get_movies.__name__, err) finally: close_conn() def main(): set_connection('docker') for movie in get_movies(10): print('Movie: {0}, Genre: {1}, Released: {2}' .format(movie[0], movie[1], movie[2]))
Below, we see an example of the Python script’s formatted output, limited to only the first ten movies.
Using the Read Replica
For better application performance, it may be optimal to redirect some or all of the database reads to the read replica, while leaving writes, updates, and deletes to hit the master instance. The script can be easily modified to execute the same query against the read replica rather than the master RDS instance by merely passing the desired section, ‘replica’ versus ‘master’, in the call to the set_connection(section)
function. The section parameter refers to one of the two sections in the database.ini
file. The configparser
module will handle retrieving the correct connection information.
set_connection('replica')
Cleaning Up
When you are finished with the demonstration, the easiest way to clean up all the AWS resources and stop getting billed is to delete the two CloudFormation stacks using the AWS CLI, in the following order.
aws cloudformation delete-stack \ --stack-name RDSDemoStack # wait until the above resources are completely deleted aws cloudformation delete-stack \ --stack-name RDSEventDemoStack
You should receive the following SMS notifications as the first CloudFormation stack is being deleted.
You can delete the running Docker stack using the following command. Note, you will lose all your pgAdmin server connection information, along with your local Pagila database.
docker stack rm postgres
Conclusion
In this brief post, we just scraped the surface of the many benefits and capabilities of Amazon RDS for PostgreSQL. The best way to learn PostgreSQL and the benefits of Amazon RDS is by setting up your own RDS instance, insert some sample data, and start writing queries in your favorite database client or programming language.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Building Asynchronous, Serverless Alexa Skills with AWS Lambda, DynamoDB, S3, and Node.js
Posted by Gary A. Stafford in AWS, Cloud, JavaScript, Serverless, Software Development on July 24, 2018
Introduction
In the following post, we will use the new version 2 of the Alexa Skills Kit, AWS Lambda, Amazon DynamoDB, Amazon S3, and the latest LTS version Node.js, to create an Alexa Custom Skill. According to Amazon, a custom skill allows you to define the requests the skill can handle (intents) and the words users say to invoke those requests (utterances).
If you want to compare the development of an Alexa Custom Skill with those of Google and Azure, in addition to this post, please read my previous two posts in this series, Building and Integrating LUIS-enabled Chatbots with Slack, using Azure Bot Service, Bot Builder SDK, and Cosmos DB and Building Serverless Actions for Google Assistant with Google Cloud Functions, Cloud Datastore, and Cloud Storage. All three of the article’s demonstrations are written in Node.js, all three leverage their cloud platform’s machine learning-based Natural Language Understanding services, and all three take advantage of NoSQL database and storage services available on their respective cloud platforms.
AWS Technologies
The final high-level architecture of our Alexa Custom Skill will look as follows.
Here is a brief overview of the key AWS technologies we will incorporate into our Skill’s architecture.
Alexa Skills Kit
According to Amazon, the Alexa Skills Kit (ASK) is a collection of self-service APIs, tools, documentation, and code samples that makes it possible to add skills to Alexa. The Alexa Skills Kit supports building different types of skills. Currently, Alexa skill types include Custom, Smart Home, Video, Flash Briefing, and List Skills. Each skill type makes use of a different Alexa Skill API.
AWS Serverless Platform
To create a custom skill for Alexa, you currently have the choice of using an AWS Lambda function or a web service. The AWS Lambda is part of an ecosystem of Cloud services and Developer tools, Amazon refers to as the AWS Serverless Platform. The platform’s services are designed to support the development and hosting of highly-performant, enterprise-grade serverless applications.
In this post, we will leverage three of the AWS Serverless Platform’s services, including Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), and AWS Lambda.
Node.js
AWS Lamba supports multiple programming languages, including Node.js (JavaScript), Python, Java (Java 8 compatible), and C# (.NET Core) and Go. All are excellent choices for writing modern serverless functions. For this post, we will use Node.js. According to Node.js Foundation, Node.js is an asynchronous event-driven JavaScript runtime built on Chrome’s V8 JavaScript engine.
In April 2018, AWS Lamba announced support for the Node.js 8.10 runtime, which is the current Long Term Support (LTS) version of Node.js. Node 8, also known as Project Carbon, was the first LTS version of Node to support async/await with Promises. Async/await is the new way of handling asynchronous operations in Node.js. We will make use of async/await and Promises with the custom skill.
Demonstration
To demonstrate Alexa Custom Skills we will build an informational skill that responds to the user with interesting facts about Azure¹, Microsoft’s Cloud computing platform (Alexa talking about Azure, ironic, I know). This is not an official Microsoft skill; it is only used for this demonstration and has not been published.
Source Code
All open-source code for this post can be found on GitHub. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
Important, this post and the associated source code were updated from v1.0 to v2.0 on 13 August 2018. You should clone the GitHub project again, to correspond with this revised post, if you originally cloned the project before 14 August 2018. Code changes were significant.
Objectives
This objective of the fact-based skill will be to demonstrate the following.
- Build, deploy, and test an Alexa Custom Skill using AWS Lambda and Node.js;
- Use DynamoDB to store and retrieve Alexa voice responses;
- Maintain a count of user’s questions in DynamoDB using atomic counters;
- Use Amazon S3 to store and retrieve images, used in Display Cards;
- Log Alexa Skill activities using Amazon CloudWatch Logs;
Steps to Build
Building the Azure fact skill will involve the following steps.
- Design the Alexa skill’s voice interaction model;
- Design the skill’s Display Cards for Alexa-enabled products, to enhance the voice experience;
- Create the skill’s DynamoDB table and import the responses the skill will return;
- Create an S3 bucket and upload the images used for the Display Cards;
- Write the Alexa Skill, which involves mapping the user’s spoken input to the intents your cloud-based service can handle;
- Write the Lambda function, which involves responding to the user’s utterances, by building and returning appropriate voice and display card responses, from DynamoDB and S3;
- Extend the default ASK-generated AWS IAM Role, to allow the Lambda to update DynamoDB;
- Deploy the skill;
- Test the skill;
Let’s explore each step in detail.
Voice Interaction Model
First, we must design the fact skill’s voice interaction model. We need to consider the way we want the user to interact with the skill. What is the user’s conversational journey? How do they invoke your skill? How will the user provide intent?
This skill will require two intent slot values, the fact the user is interested in (i.e. ‘global infrastructure’) and the user’s first name (i.e. ‘Susan’). We will train the skill to allow Alexa to query the user for each slot value, but also allow the user to provide either or both values in the initial intent invocation. We will also allow the user to request a random fact.
Shown below in the Alexa Skills Kit Development Console Test tab are three examples of interactions the skill is trained to understand and handle:
- The first example on the left invokes the skill with no intent (‘Alexa, load Azure Tech Facts). The user is led through a series of three questions to obtain the full intent.
- The center example is similar, however, the initial invocation contains a partial intent (‘Alexa, ask Azure Tech Facts for a fact about certifications’). Alexa must still ask for the user’s name.
- Lastly, the example on the right is a so-called ‘one-shot’ invocation (‘Alexa, ask Azure Tech Facts about Azure’s platforms for Gary’). The user’s invocation of the skill contains a complete intent, allowing Alexa to respond immediately with a fact about Azure platforms.
In all cases, our skill has the ability to continue to provide the user with additional facts if they chose, or they may cancel at any time.
We also need to design how Alexa will respond. What is the persona will Alexa assume through her words, phrases, and use of Speech Synthesis Markup Language (SSML).
User Interaction Previews
Here are a few examples of interactions with the final Alexa skill using an iPhone 8 and the Alexa App. They are intended to show the rich conversational capabilities of custom skills more so the than the display, which is pretty poor on the Alexa App as compared to the Echo Show or even Echo Spot.
Example 1: Indirect Invocation
The first example shows a basic interaction with our Alexa skill. It demonstrates an indirect invocation, a user utterance without initial intent. It also illustrates several variations of user utterances (YouTube).
Example 2: Direct Invocation
The second example of an interaction our skill demonstrates a direct invocation, in which the initial user utterance contains intent. It also demonstrates the user following up with additional requests (YouTube).
Example 3: Direct Invocation, Help, Problem
Lastly, another direct invocation demonstrates the use of the Help Intent. You also see an example of when Alexa does not understand the user’s utterance. The user is able to repeat their request, more clearly (YouTube).
Visual Interaction Model
Many Alexa-enabled devices are capable of both vocal and visual responses. Designing for a multimodal user experience is important. The instructional skill will provide vocal responses, as well as Display Cards optimized for the Amazon Echo Show. The skill contains a basic design for the Display Card shown during the initial invocation, where there is no intent uttered by the user.
The fact skill also contains a Display Card, designed to present the final Alexa response to the user’s intent. The content of the vocal and visual response is returned from DynamoDB via the Lambda function. The random Azure icons, available from Microsoft, are hosted in an S3 bucket. Each fact response is unique, as well as the icon associated with the fact.
The Display Cards will also work on other Alexa-enabled screen-based products. Shown below is the same card on an iPhone 8 using the Amazon Alexa app. This is the same app shown in the videos, above.
DynamoDB
Next, we create the DynamoDB table used to store the facts the Alexa skill will respond with when invoked by the user. DynamoDB is Amazon’s non-relational database that delivers reliable performance at any scale. DynamoDB consists of three basic components: tables, items, and attributes.
There are numerous ways to create a DynamoDB table. For simplicity, I created the AzureFacts
DynamoDB table using the AWS CLI (gist). You could also choose CloudFormation, or create the table using any of nine or more programming languages with an AWS SDK.
aws dynamodb create-table \ | |
--table-name AzureFacts \ | |
--attribute-definitions \ | |
AttributeName=Fact,AttributeType=S \ | |
--key-schema AttributeName=Fact,KeyType=HASH \ | |
--provisioned-throughput ReadCapacityUnits=3,WriteCapacityUnits=3 |
The AzureFacts
table’s schema has four key/value pair attributes per item: Fact, Response, Image, and Hits. The Fact attribute, a string, contains the name of the fact the user is seeking. The Fact attribute also serves as the table’s unique partition key. The Response attribute, a string, contains the conversational response Alexa will return. The Image attribute, a string, contains the name of the image in the S3 bucket displayed by Alexa. Lastly, the Hits attribute, a number, stores the number of user requests for a particular fact.
Importing Table Items
After the DynamoDB table is created, the pre-defined facts are imported into the empty table using AWS CLI (gist). The JSON-formatted data file, AzureFacts.json, is included with the source code on GitHub.
aws dynamodb batch-write-item \ | |
--request-items file://data/AzureFacts.json |
The resulting table should appear as follows in the AWS Management Console.
Note the imported items shown below. The Hits counts reflect the number of times each fact has been requested.
Shown below is a detailed view of a single item that was imported into the DynamoDB table.
Amazon S3 Image Bucket
Next, we create the Amazon S3 bucket, which will house the images, actually Azure icons as PNGs, returned by Alexa with each fact. Again, I used the AWS CLI for simplicity (gist).
aws s3api create-bucket \ | |
--bucket <my_bucket_name> \ | |
--region us-east-1 |
The images can be uploaded manually to the bucket through a web browser, or programmatically, using the AWS CLI or SDKs. You will need to ensure the images are made public so they can be displayed by Alexa.
Alexa Skill
Next, we create the actual Alexa custom skill. I have used version 2 of the Alexa Skills Kit (ASK) Software Development Kit (SDK) for Node.js and the new ASK Command Line Interface (ASK CLI) to create the skill. The ASK SDK v2 for Node.js was recently released in April 2018. If you have previously written Alexa skills using version 1 of the Node.js SDK, the creation of a new project and the format of the Lambda Node.js code is somewhat different. I strongly suggest reviewing the example skills provided by Amazon on GitHub.
With version 1, I would have likely used the Alexa Skills Kit Development Console to develop and deploy the skill, and separate IDE, like JetBrains WebStorm, to write the Lambda. The JSON-format skill would live in the Alexa Skills Kit Development Console, and my Lambda in source control. I would have used AWS Serverless Application Model (AWS SAM) or Claudia.js to handle the deployment of Lambda functions.
With version 2 of ASK, you can easily create and manage the Alexa skill’s JSON-formatted code, as well as the Lambda, all from the command-line and a single IDE or text editor. All components that comprise the skill can be kept together in source control. I now only use the Alexa Skills Kit Development Console to preview my deployed skill and for testing. I am not going to go into detail about creating a new project using the ASK CLI, I suggest reviewing Amazon’s instructional guides.
Below, I have initiated a new AWS profile for the Alexa skill using the ask init
command.
There are three main parts to the new skill project created by the ASK CLI: the skill’s manifest (skill.json), model(s) (en-US.json), and API endpoint, the Lambda (index.js). The skill’s manifest, skill.json, contains information (metadata) about the skill. This is the same information you find in the Distribution tab of the Alexa Skills Kit Development Console. The manifest includes publishing information, example phrases to invoke the skill, the skill’s category, distribution locales, privacy information, and the location of the skill’s API endpoint, the Lambda. An end-user would most commonly see this information in Amazon Alexa app when adding skills to their Alexa-enabled devices.
Next, the skill’s model, en-US.json, is located the models sub-directory. This file defines the skill’s custom interaction model, it contains the skill’s interaction model written in JSON, which includes the invocation name, intents, standard and custom slots, sample utterances, slot values, and synonyms of those values. This is the same information you would find in the Build tab of the Alexa Skills Kit Development Console. Amazon has an excellent guide to creating your custom skill’s interaction model.
Intents and Intent Slots
The skill’s custom interaction model contains the AzureFactsIntent
intent, along with the boilerplate Cancel, Help and Stop intents. The AzureFactsIntent
intent contains two intent slots, myName
and myQuestion
. The myName
intent slot is a standard AMAZON.US_FIRST_NAME slot type. According to Amazon, this slot type understands thousands of popular first names commonly used by speakers in the United States. Shown below, I have included a short list of sample utterances in the intent model, which helps improve voice recognition for Alexa (gist).
{ | |
"name": "AzureFactsIntent", | |
"slots": [{ | |
"name": "myName", | |
"type": "AMAZON.US_FIRST_NAME", | |
"samples": [ | |
"{myName}", | |
"my name is {myName}", | |
"my name's {myName}", | |
"name is {myName}", | |
"the name is {myName}", | |
"name's {myName}", | |
"they call me {myName}" | |
] | |
}] | |
} |
Custom Slot Types and Entities
The myQuestion
intent slot is a custom slot type. According to Amazon, a custom slot type defines a list of representative values for the slot. The myQuestion
slot contains all the available facts the custom instructional skill understands and can retrieve from DynamoDB. Like myName
, the user can provide the fact intent in various ways (gist).
{ | |
"name": "myQuestion", | |
"type": "list_of_facts", | |
"samples": [ | |
"{myQuestion}", | |
"give me a fact about {myQuestion}", | |
"give me a {myQuestion} fact", | |
"how many {myQuestion} does Azure have", | |
"I'd like to hear about {myQuestion}", | |
"I'd like to hear more about {myQuestion}", | |
"tell me about {myQuestion}", | |
"tell me about Azure's {myQuestion}", | |
"tell me about Azure {myQuestion}", | |
"tell me a {myQuestion} fact", | |
"tell me another {myQuestion} fact", | |
"when was Azure {myQuestion}" | |
] | |
} |
This slot also contains synonyms for each fact. Collectively, the slot value, it’s synonyms, and the optional ID are collectively referred to as an Entity. According to Amazon, entity resolution improves the way Alexa matches possible slot values in a user’s utterance with the slots defined in the skill’s interaction model.
An example of an entity in the myQuestion
custom slot type is ‘competition’. A user can ask Alexa to tell them about Azure’s competition. The slot value ‘competition’ returns a fact about Azure’s leading competitors, as reported on the G2 Crowd website’s Microsoft Azure Alternatives & Competitors page. However, the user might also substitute the words ‘competitor’ or ‘competitors’ for ‘competition’. Using synonyms, if the user utters any of these three words in their intent, they will receive the same response from Alexa (gist).
"types": [{ | |
"name": "list_of_facts", | |
"values": [{ | |
"name": { | |
"value": "competition", | |
"synonyms": [ | |
"competitors", | |
"competitor" | |
] | |
} | |
}, | |
{ | |
"name": { | |
"value": "certifications", | |
"synonyms": [ | |
"certification", | |
"certification exam", | |
"certification exams" | |
] | |
} | |
} | |
] | |
}] |
Lambda
Initializing a skill with the ASK CLI also creates the default API endpoint, a Lambda (index.js). The serverless Lambda function is written in Node.js 8.10. As mentioned in the Introduction, AWS recently announced support for the Node.js 8.10 runtime, in April. This is the first LTS version of Node to support async/await with Promises. Node’s async/await is the new way of handling asynchronous operations in Node.js.
The layout of the custom skill’s Lambda’s code closely follows the custom Alexa Fact Skill example. I suggest closely reviewing this example. The Lambda has four main sections: constants, setup code, intent handlers, and helper functions.
In addition to the boilerplate Help, Stop, Error, and Session intent handlers, there are the LaunchRequestHandler
and the AzureFactsIntent
handlers. According to Amazon, a LaunchRequestHandler
fires when the Lambda receives a LaunchRequest
from Alexa, in which the user invokes the skill with the invocation name, but does not provide any command mapping to an intent.
The AzureFactsIntent
aligns with the custom intent we defined in the skill’s model (en-US.json
), of the same name. This handler handles an IntentRequest
from Alexa. This handler and the buildFactResponse
function the handler calls are what translate a request for a fact from the user into a request to DynamoDB for a response.
The AzureFactsIntent
handler checks the IntentRequest
for both the myName
and myQuestion
slot values. If the values are unfulfilled, the AzureFactsIntent
handler delegates responsibility back to Alexa, using a Dialog delegate directive (addDelegateDirective
). Alexa then requests the slot values from the user in a conversational interaction. Alexa then calls the AzureFactsIntent
handler again (gist).
const request = handlerInput.requestEnvelope.request; | |
let currentIntent = request.intent; | |
if (myNameValue === undefined) { | |
myNameValue = slotValue(request.intent.slots.myName); | |
} | |
if (!myNameValue) { | |
return handlerInput.responseBuilder | |
.addDelegateDirective(currentIntent) | |
.getResponse(); | |
} | |
let myQuestionValue = slotValue(request.intent.slots.myQuestion); | |
if (!myQuestionValue) { | |
return handlerInput.responseBuilder | |
.addDelegateDirective(currentIntent) | |
.getResponse(); | |
} |
Once both slot values are received by the AzureFactsIntent
handler, it calls the buildFactResponse
function, passing in the myName
and myQuestion
slot values. In turn, the buildFactResponse
function calls AWS.DynamoDB.DocumentClient.update
. The DynamoDB update returns a callback. In turn, the buildFactResponse
function returns a Promise, a standard built-in object type, part of the JavaScript ES2015 spec (gist).
function buildFactResponse(myName, myQuestion) { | |
return new Promise((resolve, reject) => { | |
if (myQuestion !== undefined) { | |
let params = {}; | |
params.TableName = "AzureFacts"; | |
params.Key = {"Fact": myQuestion}; | |
params.UpdateExpression = "set Hits = Hits + :val"; | |
params.ExpressionAttributeValues = {":val": 1}; | |
params.ReturnValues = "ALL_NEW"; | |
docClient.update(params, function (err, data) { | |
if (err) { | |
console.log("GetItem threw an error:", JSON.stringify(err, null, 2)); | |
reject(err); | |
} else { | |
console.log("GetItem succeeded:", JSON.stringify(data, null, 2)); | |
resolve(data); | |
} | |
}); | |
} | |
}); | |
} |
What is unique about the DynamoDB update
call in this case, is it actually performs two functions. First, it implements an Atomic Counter. According to AWS, an atomic counter is a numeric DynamoDB attribute that is incremented, unconditionally, without interfering with other write requests. The update increments the numeric Hits attribute of the requested fact by exactly one. Secondly, the update returns the DynamoDB item. We can increment the count and get the response in a single call.
The buildFactResponse
function’s Promise returns the DynamoDB item, a JSON object, from the callback. An example of a JSON response payload is shown below. (gist).
"Attributes": { | |
"Hits": 4, | |
"Fact": "global", | |
"Image": "image-02.png", | |
"Response": "according to Microsoft, with 54 Azure regions, Azure has more global regions than any other cloud provider. Azure is currently available in 140 countries." | |
} |
The AzureFactsIntent
handler uses the async/await methods to perform the call to the buildFactResponse
function. Note line 7 of the AzureFactsIntent
handler below, where the async
method is applied directly to the handler. Note line 33 where the await
method is used with the call to the buildFactResponse
function (gist).
const AzureFactsIntent = { | |
canHandle(handlerInput) { | |
const request = handlerInput.requestEnvelope.request; | |
return request.type === "IntentRequest" | |
&& request.intent.name === "AzureFactsIntent"; | |
}, | |
async handle(handlerInput) { | |
const request = handlerInput.requestEnvelope.request; | |
let currentIntent = request.intent; | |
if (myNameValue === undefined) { | |
myNameValue = slotValue(request.intent.slots.myName); | |
} | |
if (!myNameValue) { | |
return handlerInput.responseBuilder | |
.addDelegateDirective(currentIntent) | |
.getResponse(); | |
} | |
let myQuestionValue = slotValue(request.intent.slots.myQuestion); | |
if (!myQuestionValue) { | |
return handlerInput.responseBuilder | |
.addDelegateDirective(currentIntent) | |
.getResponse(); | |
} | |
if (myQuestionValue.toString().trim() === 'random') { | |
myQuestionValue = selectRandomFact(); | |
} | |
let fact = await buildFactResponse(myNameValue, myQuestionValue); | |
myNameValue = Object.is(myNameValue, undefined) ? undefined : capitalizeFirstLetter(myNameValue); | |
let factToSpeak = `${myNameValue}, ${fact.Attributes.Response}`; | |
cardContent = factToSpeak; | |
// optional: logged to CloudWatch Logs | |
console.log(`myName: ${myNameValue}`); | |
console.log(`myQuestion: ${myQuestionValue}`); | |
console.log(`factToSpeak: ${factToSpeak}`); | |
return handlerInput | |
.responseBuilder | |
.speak(factToSpeak) | |
.reprompt("You can request another fact") | |
.withStandardCard(CARD_TITLE, cardContent, | |
IMAGES.smallImageUrl, `${BUCKET_URL}\/${fact.Attributes.Image}`) | |
.getResponse(); | |
} | |
}; |
The AzureFactsIntent
handler awaits the Promise from the buildFactResponse
function. In an async function, you can await for any Promise or catch its rejection cause. If the update callback and the ensuing Promise were both returned successfully, the AzureFactsIntent
handler returns both a vocal and visual response to Alexa.
AWS IAM Role
By default, an AWS IAM Role was created by ASK when the project was initialized, the ask-lambda-alexa-skill-azure-facts
role. This role is automatically associated with the AWS Managed Policy, AWSLambdaBasicExecutionRole
. This managed policy simply allows the skill’s Lambda function to create Amazon CloudWatch Events (gist).
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Action": [ | |
"logs:CreateLogGroup", | |
"logs:CreateLogStream", | |
"logs:PutLogEvents" | |
], | |
"Resource": "*" | |
} | |
] | |
} |
For the skill’s Lambda to read and write to DynamoDB, we must extend the default role’s permissions, by adding an additional policy. I have created a new AzureFacts_Alexa_Skill
IAM Policy, which allows the associated role to get and update items from the AzureFacts
DynamoDB table, and that is it. The role only has access to two of forty possible DynamoDB actions, and only for the AzureFacts
table, and nothing else. Following the principle of Least Privilege is a cornerstone of AWS Security (gist).
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Sid": "VisualEditor0", | |
"Effect": "Allow", | |
"Action": [ | |
"dynamodb:GetItem", | |
"dynamodb:UpdateItem" | |
], | |
"Resource": "arn:aws:dynamodb:us-east-1:931066906971:table/AzureFacts" | |
} | |
] | |
} |
Below, we see the new IAM Policy in the AWS Management Console.
Below, we see the policy being applied to the skill’s IAM Role, along with the original AWS managed policy.
Deploying the Skill
Version 2 of the ASK CLI makes deploying the Alexa custom skill very easy. Using the ASK CLI’s deploy
command, we can validate and deploy the skill (manifest), model, and Lambda, all at once, as shown below. This makes DevOps automation of skill deployments with tools like Jenkins or AWS CodeDeploy straight-forward.
You can verify the skill has been deployed, from the Alexa Skills Kit Development Console. You should observe the skill’s model (intents, slots, entities, and endpoints) in the Build tab. You should observe the skill’s publishing details in the Distribution tab. Note deploying the skill does not submit the skill to Amazon’s for review and publishing, you must still submit the skill separately.
From the AWS Lambda Management Console, you should observe the skill’s Lambda was deployed. You should observe only the skill can trigger the Lambda. Lastly, you should observe that the correct IAM Role was applied to the Lambda, giving the Lambda access to Amazon CloudWatch Logs and Amazon DynamoDB.
Testing the Skill
The ASK CLI comes with the simulate command. According to Amazon, the simulate
command simulates an invocation of the skill with text-based input. Again, the ASK CLI makes DevOps test automation with tools like Jenkins or AWS CodeDeploy pretty easy (gist).
ask simulate \ | |
--text "Load Azure Tech Facts" \ | |
--locale "en-US" \ | |
--skill-id "<your_skill_id>" \ | |
--profile "default" |
Below, are the results of simulating the invocation. The simulate
command returns the expected verbal response, including any SSML, and the visual responses (the Display Card). You could easily write an automation script to run a battery of these tests on every code commit, and prior to deployment.
I also like to manually test my skills from the Alexa Skills Kit Development Console Test tab. You may invoke the skill using your voice or by typing the skill invocation.
The Alexa Skills Kit Development Console Test tab both shows and speaks Alexa’s response. The console also displays the request and response body (JSON input/output), as well as the Display Card for an Echo Show and Echo Spot.
Lastly, the Alexa Skills Kit Development Console Test tab displays the Device Log. The log captures Alexa Directives and Events. I have found the Device Log to be very helpful in troubleshooting problems with deployed skills.
CloudWatch Logs
By default the custom skill outputs events to CloudWatch Logs. I have added the DynamoDB callback payload, as well as the slot values of myName and myQuestion to the logs, for each successful Alexa response. CloudWatch logs, like the Device Logs above, are very helpful in troubleshooting problems with deployed skills.
Conclusion
In this brief post, we have seen how to use the new ASK SDK/CLI version 2, services from the AWS Serverless Platform, and the LTS version of Node.js, to create an Alexa Custom Skill. Using the AWS Serverless Platform, we could easily extend the example to take advantage of additional serverless services, such as the use of Amazon SNS and SQS for notifications and messaging and Amazon Kinesis for analytics.
In a future post, we will extend this example, adding the capability to securely add and update our DynamoDB table’s items. We will use addition AWS services, including Amazon Cognito to authorize access to our API. We will also use AWS API Gateway to integrate with our Lambdas, producing a completely serverless API.
¹Azure is a trademark of Microsoft
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.