Archive for category Python

Streaming Data Analytics with Amazon Kinesis Data Firehose, Redshift, and QuickSight

Databases are ideal for storing and organizing data that requires a high volume of transaction-oriented query processing while maintaining data integrity. In contrast, data warehouses are designed for performing data analytics on vast amounts of data from one or more disparate sources. In our fast-paced, hyper-connected world, those sources often take the form of continuous streams of web application logs, e-commerce transactions, social media feeds, online gaming activities, financial trading transactions, and IoT sensor readings. Streaming data must be analyzed in near real-time, while often first requiring cleansing, transformation, and enrichment.

In the following post, we will demonstrate the use of Amazon Kinesis Data Firehose, Amazon Redshift, and Amazon QuickSight to analyze streaming data. We will simulate time-series data, streaming from a set of IoT sensors to Kinesis Data Firehose. Kinesis Data Firehose will write the IoT data to an Amazon S3 Data Lake, where it will then be copied to Redshift in near real-time. In Amazon Redshift, we will enhance the streaming sensor data with data contained in the Redshift data warehouse, which has been gathered and denormalized into a star schema.

Streaming-Kinesis-Redshift

In Redshift, we can analyze the data, asking questions like, what is the min, max, mean, and median temperature over a given time period at each sensor location. Finally, we will use Amazon Quicksight to visualize the Redshift data using rich interactive charts and graphs, including displaying geospatial sensor data.

screen_shot_2020-03-04_at_9.27.33_pm

Featured Technologies

The following AWS services are discussed in this post.

Amazon Kinesis Data Firehose

According to Amazon, Amazon Kinesis Data Firehose can capture, transform, and load streaming data into data lakes, data stores, and analytics tools. Direct Kinesis Data Firehose integrations include Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. Kinesis Data Firehose enables near real-time analytics with existing business intelligence (BI) tools and dashboards.

Amazon Redshift

According to Amazon, Amazon Redshift is the most popular and fastest cloud data warehouse. With Redshift, users can query petabytes of structured and semi-structured data across your data warehouse and data lake using standard SQL. Redshift allows users to query and export data to and from data lakes. Redshift can federate queries of live data from Redshift, as well as across one or more relational databases.

Amazon Redshift Spectrum

According to Amazon, Amazon Redshift Spectrum can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables. Redshift Spectrum tables are created by defining the structure for data files and registering them as tables in an external data catalog. The external data catalog can be AWS Glue or an Apache Hive metastore. While Redshift Spectrum is an alternative to copying the data into Redshift for analysis, we will not be using Redshift Spectrum in this post.

Amazon QuickSight

According to Amazon, Amazon QuickSight is a fully managed business intelligence service that makes it easy to deliver insights to everyone in an organization. QuickSight lets users easily create and publish rich, interactive dashboards that include Amazon QuickSight ML Insights. Dashboards can then be accessed from any device and embedded into applications, portals, and websites.

What is a Data Warehouse?

According to Amazon, a data warehouse is a central repository of information that can be analyzed to make better-informed decisions. Data flows into a data warehouse from transactional systems, relational databases, and other sources, typically on a regular cadence. Business analysts, data scientists, and decision-makers access the data through business intelligence tools, SQL clients, and other analytics applications.

Demonstration

Source Code

All the source code for this post can be found on GitHub. Use the following command to git clone a local copy of the project.

CloudFormation

Use the two AWS CloudFormation templates, included in the project, to build two CloudFormation stacks. Please review the two templates and understand the costs of the resources before continuing.

The first CloudFormation template, redshift.yml, provisions a new Amazon VPC with associated network and security resources, a single-node Redshift cluster, and two S3 buckets.

The second CloudFormation template, kinesis-firehose.yml, provisions an Amazon Kinesis Data Firehose delivery stream, associated IAM Policy and Role, and an Amazon CloudWatch log group and two log streams.

Change the REDSHIFT_PASSWORD value to ensure your security. Optionally, change the REDSHIFT_USERNAME value. Make sure that the first stack completes successfully, before creating the second stack.

Review AWS Resources

To confirm all the AWS resources were created correctly, use the AWS Management Console.

Kinesis Data Firehose

In the Amazon Kinesis Dashboard, you should see the new Amazon Kinesis Data Firehose delivery stream, redshift-delivery-stream.

screen_shot_2020-03-02_at_3.58.12_pm

The Details tab of the new Amazon Kinesis Firehose delivery stream should look similar to the following. Note the IAM Role, FirehoseDeliveryRole, which was created and associated with the delivery stream by CloudFormation.

screen_shot_2020-03-02_at_6.30.25_pm

We are not performing any transformations of the incoming messages. Note the new S3 bucket that was created and associated with the stream by CloudFormation. The bucket name was randomly generated. This bucket is where the incoming messages will be written.

screen_shot_2020-03-02_at_6.31.13_pm

Note the buffer conditions of 1 MB and 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written in JSON format, using GZIP compression, to S3. These are the minimal buffer conditions, and as close to real-time streaming to Redshift as we can get.

screen_shot_2020-03-02_at_6.31.28_pm

Note the COPY command, which is used to copy the messages from S3 to the message table in Amazon Redshift. Kinesis uses the IAM Role, ClusterPermissionsRole, created by CloudFormation, for credentials. We are using a Manifest to copy the data to Redshift from S3. According to Amazon, a Manifest ensures that the COPY command loads all of the required files, and only the required files, for a data load. The Manifests are automatically generated and managed by the Kinesis Firehose delivery stream.

screen_shot_2020-03-02_at_6.31.43_pm

Redshift Cluster

In the Amazon Redshift Console, you should see a new single-node Redshift cluster consisting of one Redshift dc2.large Dense Compute node type.

screen_shot_2020-03-02_at_7.09.35_pm

Note the new VPC, Subnet, and VPC Security Group created by CloudFormation. Also, observe that the Redshift cluster is publically accessible from outside the new VPC.

screen_shot_2020-03-02_at_7.09.41_pm

Redshift Ingress Rules

The single-node Redshift cluster is assigned to an AWS Availability Zone in the US East (N. Virginia) us-east-1 AWS Region. The cluster is associated with a VPC Security Group. The Security Group contains three inbound rules, all for Redshift port 5439. The IP addresses associated with the three inbound rules provide access to the following: 1) a /27 CIDR block for Amazon QuickSight in us-east-1, a /27 CIDR block for Amazon Kinesis Firehose in us-east-1, and to you, a /32 CIDR block with your current IP address. If your IP address changes or you do not use the us-east-1 Region, you will need to change one or all of these IP addresses. The list of Kinesis Firehose IP addresses is here. The list of QuickSight IP addresses is here.

screen_shot_2020-03-02_at_7.09.59_pm

If you cannot connect to Redshift from your local SQL client, most often, your IP address has changed and is incorrect in the Security Group’s inbound rule.

Redshift SQL Client

You can choose to use the Redshift Query Editor to interact with Redshift or use a third-party SQL client for greater flexibility. To access the Redshift Query Editor, use the user credentials specified in the redshift.yml CloudFormation template.

screen_shot_2020-03-02_at_4.01.49_pm

There is a lot of useful functionality in the Redshift Console and within the Redshift Query Editor. However, a notable limitation of the Redshift Query Editor, in my opinion, is the inability to execute multiple SQL statements at the same time. Whereas, most SQL clients allow multiple SQL queries to be executed at the same time.

screen_shot_2020-03-04_at_8.48.39_am

I prefer to use JetBrains PyCharm IDE. PyCharm has out-of-the-box integration with RedShift. Using PyCharm, I can edit the project’s Python, SQL, AWS CLI shell, and CloudFormation code, all from within PyCharm.

screen_shot_2020-03-02_at_7.12.11_pm

If you use any of the common SQL clients, you will need to set-up a JDBC (Java Database Connectivity) or ODBC (Open Database Connectivity) connection to Redshift. The ODBC and JDBC connection strings can be found in the Redshift cluster’s Properties tab or in the Outputs tab from the CloudFormation stack, redshift-stack.

screen_shot_2020-03-04_at_9.07.14_am

You will also need the RedShift database username and password you included in the aws cloudformation create-stack AWS CLI command you executed previously. Below, we see PyCharm’s Project Data Sources window containing a new data source for the Redshift dev database.

screen_shot_2020-03-02_at_6.33.01_pm

Database Schema and Tables

When CloudFormation created the Redshift cluster, it also created a new database, dev. Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL commands to create a new database schema, sensor, and six tables in the sensor schema.

Star Schema

The tables represent denormalized data, taken from one or more relational database sources. The tables form a star schema.  The star schema is widely used to develop data warehouses. The star schema consists of one or more fact tables referencing any number of dimension tables. The location, manufacturer, sensor, and history tables are dimension tables. The sensors table is a fact table.

In the diagram below, the foreign key relationships are virtual, not physical. The diagram was created using PyCharm’s schema visualization tool. Note the schema’s star shape. The message table is where the streaming IoT data will eventually be written. The message table is related to the sensors fact table through the common guid field.

schema-light-2

Sample Data to S3

Next, copy the sample data, included in the project, to the S3 data bucket created with CloudFormation. Each CSV-formatted data file corresponds to one of the tables we previously created. Since the bucket name is semi-random, we can use the AWS CLI and jq to get the bucket name, then use it to perform the copy commands.

The output from the AWS CLI should look similar to the following.

screen_shot_2020-03-02_at_7.18.22_pm

Sample Data to RedShift

Whereas a relational database, such as Amazon RDS is designed for online transaction processing (OLTP), Amazon Redshift is designed for online analytic processing (OLAP) and business intelligence applications. To write data to Redshift we typically use the COPY command versus frequent, individual INSERT statements, as with OLTP, which would be prohibitively slow. According to Amazon, the Redshift COPY command leverages the Amazon Redshift massively parallel processing (MPP) architecture to read and load data in parallel from files on Amazon S3, from a DynamoDB table, or from text output from one or more remote hosts.

In the following series of SQL statements, replace the placeholder, your_bucket_name, in five places with your S3 data bucket name. The bucket name will start with the prefix, redshift-stack-databucket. The bucket name can be found in the Outputs tab of the CloudFormation stack, redshift-stack. Next, replace the placeholder, cluster_permissions_role_arn, with the ARN (Amazon Resource Name) of the ClusterPermissionsRole. The ARN is formatted as follows, arn:aws:iam::your-account-id:role/ClusterPermissionsRole. The ARN can be found in the Outputs tab of the CloudFormation stack, redshift-stack.

Using the Redshift Query Editor or your SQL client of choice, execute the SQL statements to copy the sample data from S3 to each of the corresponding tables in the Redshift dev database. The TRUNCATE commands guarantee there is no previous sample data present in the tables.

Database Views

Next, create four Redshift database Views. These views may be used to analyze the data in Redshift, and later, in Amazon QuickSight.

  1. sensor_msg_detail: Returns aggregated sensor details, using the sensors fact table and all five dimension tables in a SQL Join.
  2. sensor_msg_count: Returns the number of messages received by Redshift, for each sensor.
  3. sensor_avg_temp: Returns the average temperature from each sensor, based on all the messages received from each sensor.
  4. sensor_avg_temp_current: View is identical for the previous view but limited to the last 30 minutes.

Using the Redshift Query Editor or your SQL client of choice, execute the following series of SQL statements.

At this point, you should have a total of six tables and four views in the sensor schema of the dev database in Redshift.

Test the System

With all the necessary AWS resources and Redshift database objects created and sample data in the Redshift database, we can test the system. The included Python script, kinesis_put_test_msg.py, will generate a single test message and send it to Kinesis Data Firehose. If everything is working, the message should be delivered from Kinesis Data Firehose to S3, then copied to Redshift, and appear in the message table.

Install the required Python packages and then execute the Python script.

Run the following SQL query to confirm the record is in the message table of the dev database. It will take at least one minute for the message to appear in Redshift.

Once the message is confirmed to be present in the message table, delete the record by truncating the table.

Streaming Data

Assuming the test message worked, we can proceed with simulating the streaming IoT sensor data. The included Python script, kinesis_put_streaming_data.py, creates six concurrent threads, representing six temperature sensors. The simulated data uses an algorithm that follows an oscillating sine wave or sinusoid, representing rising and falling temperatures. In the script, I have configured each thread to start with an arbitrary offset to add some randomness to the simulated data.

screen_shot_2020-03-04_at_9.27.33_pm

The variables within the script can be adjusted to shorten or lengthen the time it takes to stream the simulated data. By default, each of the six threads creates 400 messages per sensor, in one-minute increments. Including the offset start of each proceeding thread, the total runtime of the script is about 7.5 hours to generate 2,400 simulated IoT sensor temperature readings and push to Kinesis Data Firehose. Make sure you can guarantee you will maintain a connection to the Internet for the entire runtime of the script. I normally run the script in the background, from a small EC2 instance.

To use the Python script, execute either of the two following commands. Using the first command will run the script in the foreground. Using the second command will run the script in the background.

If you used the foreground command, you should see messages being generated by each thread and sent to Kinesis Data Firehose. Each message contains the GUID of the sensor, a timestamp, and a temperature reading.

screen_shot_2020-03-03_at_7.01.37_am

The messages are sent to Kinesis Data Firehose, which in turn writes the messages to S3. The messages are written in JSON format using GZIP compression. Below, we see an example of the GZIP compressed JSON files in S3. The JSON files are partitioned by year, month, day, and hour.

screen_shot_2020-03-04_at_11.04.36_pm

Confirm Data Streaming to Redshift

From the Amazon Kinesis Firehose Console Metrics tab, you should see incoming messages flowing to S3 and on to Redshift.

screen_shot_2020-03-03_at_6.17.14_pm

Executing the following SQL query should show an increasing number of messages.

How Near Real-time?

Earlier, we saw how the Amazon Kinesis Data Firehose delivery stream was configured to buffer data at the rate of 1 MB or 60 seconds. Whenever the buffer of incoming messages is greater than 1 MB or the time exceeds 60 seconds, the messages are written to S3. Each record in the message table has two timestamps. The first timestamp, ts, is when the temperature reading was recorded. The second timestamp, created, is when the message was written to Redshift, using the COPY command. We can calculate the delta in seconds between the two timestamps using the following SQL query in Redshift.

Using the results of the Redshift query, we can visualize the results in Amazon QuickSight. In my own tests, we see that for 2,400 messages, over approximately 7.5 hours, the minimum delay was 1 second, and a maximum delay was 64 seconds. Hence, near real-time, in this case, is about one minute or less, with an average latency of roughly 30 seconds.

latency

Analyzing the Data with Redshift

I suggest waiting at least thirty minutes for a significant number of messages copied into Redshift. With the data streaming into Redshift, execute each of the database views we created earlier. You should see the streaming message data, joined to the existing static data in RedShift. As data continues to stream into Redshift, the views will display different results based on the current message table contents.

Here, we see the first ten results of the sensor_msg_detail view.

Next, we see the results of the sensor_avg_temp view.

Amazon QuickSight

In a recent post, Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 2, I detailed getting started with Amazon QuickSight. In this post, I will assume you are familiar with QuickSight.

Amazon recently added a full set of aws quicksight APIs for interacting with QuickSight. Though, for this part of the demonstration, we will be working directly in the Amazon QuickSight Console, as opposed to the AWS CLI, AWS CDK, or CloudFormation.

Redshift Data Sets

To visualize the data from Amazon Redshift, we start by creating Data Sets in QuickSight. QuickSight supports a large number of data sources for creating data sets. We will use the Redshift data source. If you recall, we added an inbound rule for QuickSight, allowing us to connect to our Redshift cluster in us-east-1.

screen_shot_2020-03-02_at_10.21.35_pm

We will select the sensor schema, which is where the tables and views for this demonstration are located.

screen_shot_2020-03-02_at_10.21.49_pm

We can choose any of the tables or views in the Redshift dev database that we want to use for visualization.

screen_shot_2020-03-02_at_10.22.11_pm

Below, we see examples of two new data sets, shown in the QuickSight Data Prep Console. Note how QuickSight automatically recognizes field types, including dates, latitude, and longitude.

screen_shot_2020-03-02_at_10.23.12_pm

screen_shot_2020-03-02_at_10.59.32_pm

Visualizations

Using the data sets, QuickSight allows us to create a wide number of rich visualizations. Below, we see the simulated time-series data from the six temperature sensors.

screen_shot_2020-03-04_at_9.26.26_pm

Next, we see an example of QuickSight’s ability to show geospatial data. The Map shows the location of each sensor and the average temperature recorded by that sensor.

screen_shot_2020-03-04_at_9.26.51_pm

Cleaning Up

To remove the resources created for this post, use the following series of AWS CLI commands.

Conclusion

In this brief post, we have learned how streaming data can be analyzed in near real-time, in Amazon Redshift, using Amazon Kinesis Data Firehose. Further, we explored how the results of those analyses can be visualized in Amazon QuickSight. For customers that depend on a data warehouse for data analytics, but who also have streaming data sources, the use of Amazon Kinesis Data Firehose or Amazon Redshift Spectrum is an excellent choice.

This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , , , , ,

Leave a comment

Executing Amazon Athena Queries from JetBrains PyCharm

Amazon Athena

According to Amazon, Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Amazon Athena supports and works with a variety of popular data file formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet.

The underlying technology behind Amazon Athena is Presto, the popular, open-source distributed SQL query engine for big data, created by Facebook. According to AWS, the Athena query engine is based on Presto 0.172 (released April 9, 2017). Athena is ideal for quick, ad-hoc querying, but it can also handle complex analysis, including large joins, window functions, and arrays. In addition to Presto, Athena also uses Apache Hive to define tables.

screen_shot_2020-01-05_at_10_32_25_am

Athena Query Editor

In the previous post, Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight, we used the Athena Query Editor to construct and test SQL queries against semi-structured data in an S3-based data lake. The Athena Query Editor has many of the basic features Data Engineers and Analysts expect, including SQL syntax highlighting, code auto-completion, and query formatting. Queries can be run directly from the Editor, saved for future reference, and query results downloaded. The Editor can convert SELECT queries to CREATE TABLE AS (CTAS) and CREATE VIEW AS  statements. Access to AWS Glue data sources is also available from within the Editor.

 

Full-Featured IDE

Although the Athena Query Editor is fairly functional, many Engineers perform a majority of their software development work in a fuller-featured IDE. The choice of IDE may depend on one’s predominant programming language. According to the PYPL Index, the ten most popular, current IDEs are: 1) Microsoft Visual Studio, 2) Android Studio, 3) Eclipse, 4) Visual Studio Code, 5) Apache NetBeans, 6) JetBrains PyCharm, 7) JetBrains IntelliJ, 8) Apple Xcode, 9) Sublime Text, and 10) Atom.

Within the domains of data science, big data analytics, and data analysis, languages such as SQL, Python, Java, Scala, and R are common. Although I work in a variety of IDEs, my go-to choices are JetBrains PyCharm for Python (including for PySpark and Jupyter Notebook development) and JetBrains IntelliJ for Java and Scala (including Apache Spark development). Both these IDEs also support many common SQL-based technologies, out-of-the-box, and are easily extendable to add new technologies.

jetbrains.png

Athena Integration with PyCharm

Utilizing the extensibility of the JetBrains suite of professional development IDEs, it is simple to add Amazon Athena to the list of available database drivers and make JDBC (Java Database Connectivity) connections to Athena instances on AWS.

Downloading the Athena JDBC Driver

To start, download the Athena JDBC Driver from Amazon. There are two versions, based on your choice of Java JDKs. Considering Java 8 was released almost eight years ago (March 2014), most users will likely want the AthenaJDBC42-2.0.9.jar is compatible with JDBC 4.2 and JDK 8.0 or later.

screen_shot_2020-01-06_at_9_28_14_pm

Installation Guide

AWS also supplies a JDBC Driver Installation and Configuration Guide. The guide, as well as the Athena JDBC and ODBC Drivers, are produced by Simba Technologies (acquired by Magnitude Software). Instructions for creating an Athena Driver starts on page 23.

screen_shot_2020-01-06_at_9_28_27_pm

Creating a New Athena Driver

From PyCharm’s Database Tool Window, select the Drivers dialog box, select the downloaded Athena JDBC Driver JAR. Select com.simba.athena.jdbc.Driver in the Class dropdown. Name the Driver, ‘Amazon Athena.’

screen_shot_2020-01-06_at_10_06_58_pm

You can configure the Athena Driver further, using the Options and Advanced tabs.

screen_shot_2020-01-11_at_8.25.22_pm

Creating a New Athena Data Source

From PyCharm’s Database Tool Window, select the Data Source dialog box to create a new connection to your Athena instance. Choose ‘Amazon Athena’ from the list of available Database Drivers.

screen_shot_2020-01-08_at_3_47_48_pm

You will need four items to create an Athena Data Source: 1) your IAM User Access Key ID, 2) your IAM User Secret Access Key, 3) the AWS Region of your Athena instance (e.g., us-east-1), and 4) an existing S3 bucket location to store query results. The Athena connection URL is a combination of the AWS Region and the S3 bucket, items 3 and 4, above. The format of the Athena connection URL is as follows.

jdbc:awsathena://AwsRegion=your-region;S3OutputLocation=s3://your-bucket-name/query-results-path

Give the new Athena Data Source a logical Name, input the User (Access Key ID), Password (Secret Access Key), and the Athena URL. To test the Athena Data Source, use the ‘Test Connection’ button.

screen_shot_2020-01-06_at_10_10_03_pm

You can create multiple Athena Data Sources using the Athena Driver. For example, you may have separate Development, Test, and Production instances of Athena, each in a different AWS Account.

Data Access

Once a successful connection has been made, switching to the Schemas tab, you should see a list of available AWS Glue Data Catalog databases. Below, we see the AWS Glue Catalog, which we created in the prior post. This Glue Data Catalog database contains ten metadata tables, each corresponding to a semi-structured, file-based data source in an S3-based data lake.

In the example below, I have chosen to limit the new Athena Data Source to a single Data Catalog database, to which the Data Source’s IAM User has access. Applying the core AWS security principle of granting least privilege, IAM Users should only have the permissions required to perform a specific set of approved tasks. This principle applies to the Glue Data Catalog databases, metadata tables, and the underlying S3 data sources.

screen_shot_2020-01-06_at_10_11_03_pm.png

Querying Athena from PyCharm

From within the PyCharm’s Database Tool Window, you should now see a list of the metadata tables defined in your AWS Glue Data Catalog database(s), as well as the individual columns within each table.

screen_shot_2020-01-06_at_10_12_18_pm

Similar to the Athena Query Editor, you can write SQL queries against the database tables in PyCharm. Like the Athena Query Editor, PyCharm has standard features SQL syntax highlighting, code auto-completion, and query formatting. Right-click on the Athena Data Source and choose New, then Console, to start.

screen_shot_2020-01-08_at_3_46_01_pm

Be mindful when writing queries and searching the Internet for SQL references, the Athena query engine is based on Presto 0.172. The current version of Presto, 0.229, is more than 50 releases ahead of the current Athena version. Both Athena and Presto functionality continue to change and diverge. There are also additional considerations and limitations for SQL queries in Athena to be aware of.

Whereas the Athena Query Editor is limited to only one query per query tab, in PyCharm, we can write and run multiple SQL queries in the same console window and have multiple console sessions opened to Athena at the same time.

screen_shot_2020-01-06_at_10_41_05_pm

By default, PyCharm’s query results are limited to the first ten rows of data. The number of rows displayed, as well as many other preferences, can be changed in the PyCharm’s Database Preferences dialog box.

screen_shot_2020-01-06_at_10_15_34_pm

Saving Queries and Exporting Results

In PyCharm, Athena queries can be saved as part of your PyCharm projects, as .sql files. Whereas the Athena Query Editor is limited to CSV, in PyCharm, query results can be exported in a variety of standard data file formats.

screen_shot_2020-01-08_at_3_43_39_pm

 

Athena Query History

All Athena queries ran from PyCharm are recorded in the History tab of the Athena Console. Although PyCharm shows query run times, the Athena History tab also displays the amount of data scanned. Knowing the query run time and volume of data scanned is useful when performance tuning queries.

screen_shot_2020-01-07_at_11_12_46_pm

Other IDEs

The technique shown for JetBrains PyCharm can also be applied to other JetBrains products, including GoLand, DataGrip, PhpStorm, and IntelliJ (shown below).

screen-shot-2020-01-08-at-5_35_57-pm.png

, , , , , ,

Leave a comment

Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 1

Introduction

According to Wikipedia, data analysis is “a process of inspecting, cleansing, transforming, and modeling data with the goal of discovering useful information, informing conclusion, and supporting decision-making.” In this two-part post, we will explore how to get started with data analysis on AWS, using the serverless capabilities of Amazon Athena, AWS Glue, Amazon QuickSight, Amazon S3, and AWS Lambda. We will learn how to use these complementary services to transform, enrich, analyze, and visualize semi-structured data.

Data Analysis—discovering useful information, informing conclusion, and supporting decision-making. –Wikipedia

In part one, we will begin with raw, semi-structured data in multiple formats. We will discover how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We will build an S3-based data lake, and learn how AWS leverages open-source technologies, such as Presto, Apache Hive, and Apache Parquet. In part two, we will learn how to further analyze and visualize the data using Amazon QuickSight. Here’s a quick preview of what we will build in part one of the post.

Demonstration

In this demonstration, we will adopt the persona of a large, US-based electric energy provider. The energy provider has developed its next-generation Smart Electrical Monitoring Hub (Smart Hub). They have sold the Smart Hub to a large number of residential customers throughout the United States. The hypothetical Smart Hub wirelessly collects detailed electrical usage data from individual, smart electrical receptacles and electrical circuit meters, spread throughout the residence. Electrical usage data is encrypted and securely transmitted from the customer’s Smart Hub to the electric provider, who is running their business on AWS.

Customers are able to analyze their electrical usage with fine granularity, per device, and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.

screen_shot_2020-01-13_at_7_57_47_pm_v4.pngPreview of post’s data in Amazon QuickSight.

The original concept for the Smart Hub was developed as part of a multi-day training and hackathon, I recently attended with an AWSome group of AWS Solutions Architects in San Francisco. As a team, we developed the concept of the Smart Hub integrated with a real-time, serverless, streaming data architecture, leveraging AWS IoT Core, Amazon Kinesis, AWS Lambda, and Amazon DynamoDB.

SA_Team_PhotoFrom left: Bruno Giorgini, Mahalingam (‘Mahali’) Sivaprakasam, Gary Stafford, Amit Kumar Agrawal, and Manish Agarwal.

This post will focus on data analysis, as opposed to the real-time streaming aspect of data capture or how the data is persisted on AWS.

athena-glue-architecture-v2High-level AWS architecture diagram of the demonstration.

Featured Technologies

The following AWS services and open-source technologies are featured prominently in this post.

Athena-Glue-v2.png

Amazon S3-based Data Lake

Screen Shot 2020-01-02 at 5.09.05 PMAn Amazon S3-based Data Lake uses Amazon S3 as its primary storage platform. Amazon S3 provides an optimal foundation for a data lake because of its virtually unlimited scalability, from gigabytes to petabytes of content. Amazon S3 provides ‘11 nines’ (99.999999999%) durability. It has scalable performance, ease-of-use features, and native encryption and access control capabilities.

AWS Glue

Screen Shot 2020-01-02 at 5.11.37 PMAWS Glue is a fully managed extract, transform, and load (ETL) service to prepare and load data for analytics. AWS Glue discovers your data and stores the associated metadata (e.g., table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

AWS Glue Data Catalog

Screen Shot 2020-01-02 at 5.13.01 PM.pngThe AWS Glue Data Catalog is an Apache Hive Metastore compatible, central repository to store structural and operational metadata for data assets. For a given data set, store table definition, physical location, add business-relevant attributes, as well as track how the data has changed over time.

AWS Glue Crawler

Screen Shot 2020-01-02 at 5.14.57 PMAn AWS Glue Crawler connects to a data store, progresses through a prioritized list of classifiers to extract the schema of your data and other statistics, and then populates the Glue Data Catalog with this metadata. Crawlers can run periodically to detect the availability of new data as well as changes to existing data, including table definition changes. Crawlers automatically add new tables, new partitions to an existing table, and new versions of table definitions. You can even customize Glue Crawlers to classify your own file types.

AWS Glue ETL Job

Screen Shot 2020-01-02 at 5.11.37 PMAn AWS Glue ETL Job is the business logic that performs extract, transform, and load (ETL) work in AWS Glue. When you start a job, AWS Glue runs a script that extracts data from sources, transforms the data, and loads it into targets. AWS Glue generates a PySpark or Scala script, which runs on Apache Spark.

Amazon Athena

Screen Shot 2020-01-02 at 5.17.49 PMAmazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena supports and works with a variety of standard data formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet. Athena is integrated, out-of-the-box, with AWS Glue Data Catalog. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

The underlying technology behind Amazon Athena is Presto, the open-source distributed SQL query engine for big data, created by Facebook. According to the AWS, the Athena query engine is based on Presto 0.172 (released April 9, 2017). In addition to Presto, Athena uses Apache Hive to define tables.

Amazon QuickSight

Screen Shot 2020-01-02 at 5.18.40 PMAmazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that can then be accessed from any device, and embedded into your applications, portals, and websites.

AWS Lambda

Screen Shot 2020-01-02 at 5.25.57 PMAWS Lambda automatically runs code without requiring the provisioning or management servers. AWS Lambda automatically scales applications by running code in response to triggers. Lambda code runs in parallel. With AWS Lambda, you are charged for every 100ms your code executes and the number of times your code is triggered. You pay only for the compute time you consume.

Smart Hub Data

Everything in this post revolves around data. For the post’s demonstration, we will start with four categories of raw, synthetic data. Those data categories include Smart Hub electrical usage data, Smart Hub sensor mapping data, Smart Hub residential locations data, and electrical rate data. To demonstrate the capabilities of AWS Glue to handle multiple data formats, the four categories of raw data consist of three distinct file formats: XML, JSON, and CSV. I have attempted to incorporate as many ‘real-world’ complexities into the data without losing focus on the main subject of the post. The sample datasets are intentionally small to keep your AWS costs to a minimum for the demonstration.

To further reduce costs, we will use a variety of data partitioning schemes. According to AWS, by partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost. We have very little data for the demonstration, in which case partitioning may negatively impact query performance. However, in a ‘real-world’ scenario, there would be millions of potential residential customers generating terabytes of data. In that case, data partitioning would be essential for both cost and performance.

Smart Hub Electrical Usage Data

The Smart Hub’s time-series electrical usage data is collected from the customer’s Smart Hub. In the demonstration’s sample electrical usage data, each row represents a completely arbitrary five-minute time interval. There are a total of ten electrical sensors whose electrical usage in kilowatt-hours (kW) is recorded and transmitted. Each Smart Hub records and transmits electrical usage for 10 device sensors, 288 times per day (24 hr / 5 min intervals), for a total of 2,880 data points per day, per Smart Hub. There are two days worth of usage data for the demonstration, for a total of 5,760 data points. The data is stored in JSON Lines format. The usage data will be partitioned in the Amazon S3-based data lake by date (e.g., ‘dt=2019-12-21’).

Note the electrical usage data contains nested data. The electrical usage for each of the ten sensors is contained in a JSON array, within each time series entry. The array contains ten numeric values of type, double.

Real data is often complex and deeply nested. Later in the post, we will see that AWS Glue can map many common data types, including nested data objects, as illustrated below.

screen_shot_2020-01-05_at_7_46_19_am

Smart Hub Sensor Mappings

The Smart Hub sensor mappings data maps a sensor column in the usage data (e.g., ‘s_01’ to the corresponding actual device (e.g., ‘Central Air Conditioner’). The data contains the device location, wattage, and the last time the record was modified. The data is also stored in JSON Lines format. The sensor mappings data will be partitioned in the Amazon S3-based data lake by the state of the residence (e.g., ‘state=or’ for Oregon).

Smart Hub Locations

The Smart Hub locations data contains the geospatial coordinates, home address, and timezone for each residential Smart Hub. The data is stored in CSV format. The data for the four cities included in this demonstration originated from OpenAddresses, ‘the free and open global address collection.’ There are approximately 4k location records. The location data will be partitioned in the Amazon S3-based data lake by the state of the residence where the Smart Hub is installed (e.g., ‘state=or’ for Oregon).

Electrical Rates

Lastly, the electrical rate data contains the cost of electricity. In this demonstration, the assumption is that the rate varies by state, by month, and by the hour of the day. The data is stored in XML, a data export format still common to older, legacy systems. The electrical rate data will not be partitioned in the Amazon S3-based data lake.

Data Analysis Process

Due to the number of steps involved in the data analysis process in the demonstration, I have divided the process into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).

athena-glue-0.pngFull data analysis workflow diagram (click to enlarge…)

Raw Data Ingestion

In the Raw Data Ingestion stage, semi-structured CSV-, XML-, and JSON-format data files are copied to a secure Amazon Simple Storage Service (S3) bucket. Within the bucket, data files are organized into folders based on their physical data structure (schema). Due to the potentially unlimited number of data files, files are further organized (partitioned) into subfolders. Organizational strategies for data files are based on date, time, geographic location, customer id, or other common data characteristics.

This collection of semi-structured data files, S3 buckets, and partitions form what is referred to as a Data Lake. According to AWS, a data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.

A series of AWS Glue Crawlers process the raw CSV-, XML-, and JSON-format files, extracting metadata, and creating table definitions in the AWS Glue Data Catalog. According to AWS, an AWS Glue Data Catalog contains metadata tables, where each table specifies a single data store.

Athena-Glue-1

Data Transformation

In the Data Transformation stage, the raw data in the previous stage is transformed. Data transformation may include both modifying the data and changing the data format. Data modifications include data cleansing, re-casting data types, changing date formats, field-level computations, and field concatenation.

The data is then converted from CSV-, XML-, and JSON-format to Apache Parquet format and written back to the Amazon S3-based data lake. Apache Parquet is a compressed, efficient columnar storage format. Amazon Athena, like many Cloud-based services, charges you by the amount of data scanned per query. Hence, using data partitioning, bucketing, compression, and columnar storage formats, like Parquet, will reduce query cost.

Lastly, the transformed Parquet-format data is cataloged to new tables, alongside the raw CSV, XML, and JSON data, in the Glue Data Catalog.

Athena-Glue-2

Data Enrichment

According to ScienceDirect, data enrichment or augmentation is the process of enhancing existing information by supplementing missing or incomplete data. Typically, data enrichment is achieved by using external data sources, but that is not always the case.

Data Enrichment—the process of enhancing existing information by supplementing missing or incomplete data. –ScienceDirect

In the Data Enrichment stage, the Parquet-format Smart Hub usage data is augmented with related data from the three other data sources: sensor mappings, locations, and electrical rates. The customer’s Smart Hub usage data is enriched with the customer’s device types, the customer’s timezone, and customer’s electricity cost per monitored period based on the customer’s geographic location and time of day.

Athena-Glue-3a

Once the data is enriched, it is converted to Parquet and optimized for query performance, stored in the data lake, and cataloged. At this point, the original CSV-, XML-, and JSON-format raw data files, the transformed Parquet-format data files, and the Parquet-format enriched data files are all stored in the Amazon S3-based data lake and cataloged in the Glue Data Catalog.

Athena-Glue-3b

Data Visualization

In the final Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for visualization and Business Intelligence, which integrate with Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo. In this demonstration, we will focus on Amazon QuickSight.

Athena-Glue-4

Getting Started

Requirements

To follow along with the demonstration, you will need an AWS Account and a current version of the AWS CLI. To get the most from the demonstration, you should also have Python 3 and jq installed in your work environment.

Source Code

All source code for this post can be found on GitHub. Use the following command to clone a copy of the project.

Source code samples in this post are displayed as GitHub Gists, which will not display correctly on some mobile and social media browsers.

TL;DR?

Just want the jump in without reading the instructions? All the AWS CLI commands, found within the post, are consolidated in the GitHub project’s README file.

CloudFormation Stack

To start, create the ‘smart-hub-athena-glue-stack’ CloudFormation stack using the smart-hub-athena-glue.yml template. The template will create (3) Amazon S3 buckets, (1) AWS Glue Data Catalog Database, (5) Data Catalog Database Tables, (6) AWS Glue Crawlers, (1) AWS Glue ETL Job, and (1) IAM Service Role for AWS Glue.

Make sure to change the DATA_BUCKET, SCRIPT_BUCKET, and LOG_BUCKET variables, first, to your own unique S3 bucket names. I always suggest using the standard AWS 3-part convention of 1) descriptive name, 2) AWS Account ID or Account Alias, and 3) AWS Region, to name your bucket (e.g. ‘smart-hub-data-123456789012-us-east-1’).

Raw Data Files

Next, copy the raw CSV-, XML-, and JSON-format data files from the local project to the DATA_BUCKET S3 bucket (steps 1a-1b in workflow diagram). These files represent the beginnings of the S3-based data lake. Each category of data uses a different strategy for organizing and separating the files. Note the use of the Apache Hive-style partitions (e.g., /smart_hub_data_json/dt=2019-12-21). As discussed earlier, the assumption is that the actual, large volume of data in the data lake would necessitate using partitioning to improve query performance.

Confirm the contents of the DATA_BUCKET S3 bucket with the following command.

There should be a total of (14) raw data files in the DATA_BUCKET S3 bucket.

Lambda Functions

Next, package the (5) Python3.8-based AWS Lambda functions for deployment.

Copy the five Lambda packages to the SCRIPT_BUCKET S3 bucket. The ZIP archive Lambda packages are accessed by the second CloudFormation stack, smart-hub-serverless. This CloudFormation stack, which creates the Lambda functions, will fail to deploy if the packages are not found in the SCRIPT_BUCKET S3 bucket.

I have chosen to place the packages in a different S3 bucket then the raw data files. In a real production environment, these two types of files would be separated, minimally, into separate buckets for security. Remember, only data should go into the data lake.

Create the second ‘smart-hub-lambda-stack’ CloudFormation stack using the smart-hub-lambda.yml CloudFormation template. The template will create (5) AWS Lambda functions and (1) Lambda execution IAM Service Role.

At this point, we have deployed all of the AWS resources required for the demonstration using CloudFormation. We have also copied all of the raw CSV-, XML-, and JSON-format data files in the Amazon S3-based data lake.

AWS Glue Crawlers

If you recall, we created five tables in the Glue Data Catalog database as part of the CloudFormation stack. One table for each of the four raw data types and one table to hold temporary ELT data later in the demonstration. To confirm the five tables were created in the Glue Data Catalog database, use the Glue Data Catalog Console, or run the following AWS CLI / jq command.

The five data catalog tables should be as follows.

We also created six Glue Crawlers as part of the CloudFormation template. Four of these Crawlers are responsible for cataloging the raw CSV-, XML-, and JSON-format data from S3 into the corresponding, existing Glue Data Catalog database tables. The Crawlers will detect any new partitions and add those to the tables as well. Each Crawler corresponds to one of the four raw data types. Crawlers can be scheduled to run periodically, cataloging new data and updating data partitions. Crawlers will also create a Data Catalog database tables. We use Crawlers to create new tables, later in the post.

Run the four Glue Crawlers using the AWS CLI (step 1c in workflow diagram).

You can check the Glue Crawler Console to ensure the four Crawlers finished successfully.

screen_shot_2020-01-03_at_3_05_29_pm

Alternately, use another AWS CLI / jq command.

When complete, all Crawlers should all be in a state of ‘Still Estimating = false’ and ‘TimeLeftSeconds = 0’. In my experience, the Crawlers can take up one minute to start, after the estimation stage, and one minute to stop when complete.

Successfully running the four Crawlers completes the Raw Data Ingestion stage of the demonstration.

Converting to Parquet with CTAS

With the Raw Data Ingestion stage completed, we will now transform the raw Smart Hub usage data, sensor mapping data, and locations data into Parquet-format using three AWS Lambda functions. Each Lambda subsequently calls Athena, which executes a CREATE TABLE AS SELECT SQL statement (aka CTAS) . Each Lambda executes a similar command, varying only by data source, data destination, and partitioning scheme. Below, is an example of the command used for the Smart Hub electrical usage data, taken from the Python-based Lambda, athena-json-to-parquet-data/index.py.

This compact, yet powerful CTAS statement converts a copy of the raw JSON- and CSV-format data files into Parquet-format, and partitions and stores the resulting files back into the S3-based data lake. Additionally, the CTAS SQL statement catalogs the Parquet-format data files into the Glue Data Catalog database, into new tables. Unfortunately, this method will not work for the XML-format raw data files, which we will tackle next.

The five deployed Lambda functions should be visible from the Lambda Console’s Functions tab.

screen_shot_2020-01-04_at_5_57_31_pm

Invoke the three Lambda functions using the AWS CLI. (part of step 2a in workflow diagram).

Here is an example of the same CTAS command, shown above for the Smart Hub electrical usage data, as it is was executed successfully by Athena.

We can view any Athena SQL query from the Athena Console’s History tab. Clicking on a query (in pink) will copy it to the Query Editor tab and execute it. Below, we see the three SQL statements executed by the Lamba functions.

screen_shot_2020-01-04_at_7_08_32_pm

AWS Glue ETL Job for XML

If you recall, the electrical rate data is in XML format. The Lambda functions we just executed, converted the CSV and JSON data to Parquet using Athena. Currently, unlike CSV, JSON, ORC, Parquet, and Avro, Athena does not support the older XML data format. For the XML data files, we will use an AWS Glue ETL Job to convert the XML data to Parquet. The Glue ETL Job is written in Python and uses Apache Spark, along with several AWS Glue PySpark extensions. For this job, I used an existing script created in the Glue ETL Jobs Console as a base, then modified the script to meet my needs.

The three Python command-line arguments the script expects (lines 10–12, above) are defined in the CloudFormation template, smart-hub-athena-glue.yml. Below, we see them on lines 10–12 of the CloudFormation snippet. They are injected automatically when the job is run and can be overridden from the command line when starting the job.

First, copy the Glue ETL Job Python script to the SCRIPT_BUCKET S3 bucket.

Next, start the Glue ETL Job (part of step 2a in workflow diagram). Although the conversion is a relatively simple set of tasks, the creation of the Apache Spark environment, to execute the tasks, will take several minutes. Whereas the Glue Crawlers took about 2 minutes on average, the Glue ETL Job could take 10–15 minutes in my experience. The actual execution time only takes about 1–2 minutes of the 10–15 minutes to complete. In my opinion, waiting up to 15 minutes is too long to be viable for ad-hoc jobs against smaller datasets; Glue ETL Jobs are definitely targeted for big data.

To check on the status of the job, use the Glue ETL Jobs Console, or use the AWS CLI.

When complete, you should see results similar to the following. Note the ‘JobRunState’ is ‘SUCCEEDED.’ This particular job ran for a total of 14.92 minutes, while the actual execution time was 2.25 minutes.

The job’s progress and the results are also visible in the AWS Glue Console’s ETL Jobs tab.

screen_shot_2020-01-04_at_7_42_51_pm

Detailed Apache Spark logs are also available in CloudWatch Management Console, which is accessible directly from the Logs link in the AWS Glue Console’s ETL Jobs tab.

screen_shot_2020-01-04_at_7_44_08_pm

The last step in the Data Transformation stage is to convert catalog the Parquet-format electrical rates data, created with the previous Glue ETL Job, using yet another Glue Crawler (part of step 2b in workflow diagram). Start the following Glue Crawler to catalog the Parquet-format electrical rates data.

This concludes the Data Transformation stage. The raw and transformed data is in the data lake, and the following nine tables should exist in the Glue Data Catalog.

If we examine the tables, we should observe the data partitions we used to organize the data files in the Amazon S3-based data lake are contained in the table metadata. Below, we see the four partitions, based on state, of the Parquet-format locations data.

screen_shot_2020-01-05_at_7_45_46_am

Data Enrichment

To begin the Data Enrichment stage, we will invoke the AWS Lambda, athena-complex-etl-query/index.py. This Lambda accepts input parameters (lines 28–30, below), passed in the Lambda handler’s event parameter. The arguments include the Smart Hub ID, the start date for the data requested, and the end date for the data requested. The scenario for the demonstration is that a customer with the location id value, using the electrical provider’s application, has requested data for a particular range of days (start date and end date), to visualize and analyze.

The Lambda executes a series of Athena INSERT INTO SQL statements, one statement for each of the possible Smart Hub connected electrical sensors, s_01 through s_10, for which there are values in the Smart Hub electrical usage data. Amazon just released the Amazon Athena INSERT INTO a table using the results of a SELECT query capability in September 2019, an essential addition to Athena. New Athena features are listed in the release notes.

Here, the SELECT query is actually a series of chained subqueries, using Presto SQL’s WITH clause capability. The queries join the Parquet-format Smart Hub electrical usage data sources in the S3-based data lake, with the other three Parquet-format, S3-based data sources: sensor mappings, locations, and electrical rates. The Parquet-format data is written as individual files to S3 and inserted into the existing ‘etl_tmp_output_parquet’ Glue Data Catalog database table. Compared to traditional relational database-based queries, the capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!

The capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!

Below, we see the SQL statement starting on line 43.

Below, is an example of one of the final queries, for the s_10 sensor, as executed by Athena. All the input parameter values, Python variables, and environment variables have been resolved into the query.

Along with enriching the data, the query performs additional data transformation using the other data sources. For example, the Unix timestamp is converted to a localized timestamp containing the date and time, according to the customer’s location (line 7, above). Transforming dates and times is a frequent, often painful, data analysis task. Another example of data enrichment is the augmentation of the data with a new, computed column. The column’s values are calculated using the values of two other columns (line 33, above).

Invoke the Lambda with the following three parameters in the payload (step 3a in workflow diagram).

The ten INSERT INTO SQL statement’s result statuses (one per device sensor) are visible from the Athena Console’s History tab.

screen_shot_2020-01-05_at_9_17_23_pm

Each Athena query execution saves that query’s results to the S3-based data lake as individual, uncompressed Parquet-format data files. The data is partitioned in the Amazon S3-based data lake by the Smart Meter location ID (e.g., ‘loc_id=b6a8d42425fde548’).

Below is a snippet of the enriched data for a customer’s clothes washer (sensor ‘s_04’). Note the timestamp is now an actual date and time in the local timezone of the customer (e.g., ‘2019-12-21 20:10:00.000’). The sensor ID (‘s_04’) is replaced with the actual device name (‘Clothes Washer’). The location of the device (‘Basement’) and the type of electrical usage period (e.g. ‘peak’ or ‘partial-peak’) has been added. Finally, the cost column has been computed.

To transform the enriched CSV-format data to Parquet-format, we need to catalog the CSV-format results using another Crawler, first (step 3d in workflow diagram).

Optimizing Enriched Data

The previous step created enriched Parquet-format data. However, this data is not as optimized for query efficiency as it should be. Using the Athena INSERT INTO WITH SQL statement, allowed the data to be partitioned. However, the method does not allow the Parquet data to be easily combined into larger files and compressed. To perform both these optimizations, we will use one last Lambda, athena-parquet-to-parquet-elt-data/index.py. The Lambda will create a new location in the Amazon S3-based data lake, containing all the enriched data, in a single file and compressed using Snappy compression.

The resulting Parquet file is visible in the S3 Management Console.

screen_shot_2020-01-04_at_6_07_23_pm

The final step in the Data Enrichment stage is to catalog the optimized Parquet-format enriched ETL data. To catalog the data, run the following Glue Crawler (step 3i in workflow diagram

Final Data Lake and Data Catalog

We should now have the following ten top-level folders of partitioned data in the S3-based data lake. The ‘tmp’ folder may be ignored.

Similarly, we should now have the following ten corresponding tables in the Glue Data Catalog. Use the AWS Glue Console to confirm the tables exist.

screen_shot_2020-01-04_at_8_30_50_pm

Alternately, use the following AWS CLI / jq command to list the table names.

‘Unknown’ Bug

You may have noticed the four tables created with the AWS Lambda functions, using the CTAS SQL statement, erroneously have the ‘Classification’ of ‘Unknown’ as opposed to ‘parquet’. I am not sure why, I believe it is a possible bug with the CTAS feature. It seems to have no adverse impact on the table’s functionality. However, to fix the issue, run the following set of commands. This aws glue update-table hack will switch the table’s ‘Classification’ to ‘parquet’.

The results of the fix may be seen from the AWS Glue Console. All ten tables are now classified correctly.

screen_shot_2020-01-05_at_11_43_50_pm

Explore the Data

Before starting to visualize and analyze the data with Amazon QuickSight, try executing a few Athena queries against the tables in the Glue Data Catalog database, using the Athena Query Editor. Working in the Editor is the best way to understand the data, learn Athena, and debug SQL statements and queries. The Athena Query Editor has convenient developer features like SQL auto-complete and query formatting capabilities.

Be mindful when writing queries and searching the Internet for SQL references, the Athena query engine is based on Presto 0.172. The current version of Presto, 0.229, is more than 50 releases ahead of the current Athena version. Both Athena and Presto functionality has changed and diverged. There are additional considerations and limitations for SQL queries in Athena to be aware of.

screen_shot_2020-01-05_at_10_32_25_am

Here are a few simple, ad-hoc queries to run in the Athena Query Editor.

Cleaning Up

You may choose to save the AWS resources created in part one of this demonstration, to be used in part two. Since you are not actively running queries against the data, ongoing AWS costs will be minimal. If you eventually choose to clean up the AWS resources created in part one of this demonstration, execute the following AWS CLI commands. To avoid failures, make sure each command completes before running the subsequent command. You will need to confirm the CloudFormation stacks are deleted using the AWS CloudFormation Console or the AWS CLI. These commands will not remove Amazon QuickSight data sets, analyses, and dashboards created in part two. However, deleting the AWS Glue Data Catalog and the underlying data sources will impact the ability to visualize the data in QuickSight.

Part Two

In part one, starting with raw, semi-structured data in multiple formats, we learned how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we will use the transformed and enriched datasets, stored in the data lake, to create compelling visualizations using Amazon QuickSight.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , ,

2 Comments

Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker

There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the marketing hype, these technologies are having a significant influence on many aspects of our modern lives. Due to their popularity and potential benefits, commercial enterprises, academic institutions, and the public sector are rushing to develop hardware and software solutions to lower the barriers to entry and increase the velocity of ML and Data Scientists and Engineers.

Machine Learning and Data Science Search Results_ 5-Year Trend r2
(courtesy Google Trends and Plotly)

Many open-source software projects are also lowering the barriers to entry into these technologies. An excellent example of one such open-source project working on this challenge is Project Jupyter. Similar to Apache Zeppelin and the newly open-sourced Netflix’s Polynote, Jupyter Notebooks enables data-driven, interactive, and collaborative data analytics.

This post will demonstrate the creation of a containerized data analytics environment using Jupyter Docker Stacks. The particular environment will be suited for learning and developing applications for Apache Spark using the Python, Scala, and R programming languages. We will focus on Python and Spark, using PySpark.

Featured Technologies

pyspark_article_00b_feature

The following technologies are featured prominently in this post.

Jupyter Notebooks

According to Project Jupyter, the Jupyter Notebook, formerly known as the IPython Notebook, is an open-source web application that allows users to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleansing and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The word, Jupyter, is a loose acronym for Julia, Python, and R, but today, Jupyter supports many programming languages.

Interest in Jupyter Notebooks has grown dramatically over the last 3–5 years, fueled in part by the major Cloud providers, AWS, Google Cloud, and Azure. Amazon Sagemaker, Amazon EMR (Elastic MapReduce), Google Cloud Dataproc, Google Colab (Collaboratory), and Microsoft Azure Notebooks all have direct integrations with Jupyter notebooks for big data analytics and machine learning.

Jupyter Search Results_ 5-Year Trend
(courtesy Google Trends and Plotly)

Jupyter Docker Stacks

To enable quick and easy access to Jupyter Notebooks, Project Jupyter has created Jupyter Docker Stacks. The stacks are ready-to-run Docker images containing Jupyter applications, along with accompanying technologies. Currently, the Jupyter Docker Stacks focus on a variety of specializations, including the r-notebook, scipy-notebook, tensorflow-notebook, datascience-notebook, pyspark-notebook, and the subject of this post, the all-spark-notebook. The stacks include a wide variety of well-known packages to extend their functionality, such as scikit-learn, pandas, MatplotlibBokeh, NumPy, and Facets.

Apache Spark

According to Apache, Spark is a unified analytics engine for large-scale data processing. Starting as a research project at the UC Berkeley AMPLab in 2009, Spark was open-sourced in early 2010 and moved to the Apache Software Foundation in 2013. Reviewing the postings on any major career site will confirm that Spark is widely used by well-known modern enterprises, such as Netflix, Adobe, Capital One, Lockheed Martin, JetBlue Airways, Visa, and Databricks. At the time of this post, LinkedIn, alone, had approximately 3,500 listings for jobs that reference the use of Apache Spark, just in the United States.

With speeds up to 100 times faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine. Spark’s polyglot programming model allows users to write applications quickly in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). You can run Spark using its standalone cluster mode, Apache Hadoop YARNMesos, or Kubernetes.

PySpark

The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API and uses Py4J. According to Apache, Py4J, a bridge between Python and Java, enables Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine (JVM). Data is processed in Python and cached and shuffled in the JVM.

Docker

According to Docker, their technology gives developers and IT the freedom to build, manage, and secure business-critical applications without the fear of technology or infrastructure lock-in. For this post, I am using the current stable version of Docker Desktop Community version for macOS, as of March 2020.

Screen Shot 2020-03-07 at 9.16.03 PM

Docker Swarm

Current versions of Docker include both a Kubernetes and Swarm orchestrator for deploying and managing containers. We will choose Swarm for this demonstration. According to Docker, the cluster management and orchestration features embedded in the Docker Engine are built using swarmkit. Swarmkit is a separate project that implements Docker’s orchestration layer and is used directly within Docker.

PostgreSQL

PostgreSQL is a powerful, open-source, object-relational database system. According to their website, PostgreSQL comes with many features aimed to help developers build applications, administrators to protect data integrity and build fault-tolerant environments, and help manage data no matter how big or small the dataset.

Demonstration

In this demonstration, we will explore the capabilities of the Spark Jupyter Docker Stack to provide an effective data analytics development environment. We will explore a few everyday uses, including executing Python scripts, submitting PySpark jobs, and working with Jupyter Notebooks, and reading and writing data to and from different file formats and a database. We will be using the latest jupyter/all-spark-notebook Docker Image. This image includes Python, R, and Scala support for Apache Spark, using Apache Toree.

Architecture

As shown below, we will deploy a Docker stack to a single-node Docker swarm. The stack consists of a Jupyter All-Spark-Notebook, PostgreSQL (Alpine Linux version 12), and Adminer container. The Docker stack will have two local directories bind-mounted into the containers. Files from our GitHub project will be shared with the Jupyter application container through a bind-mounted directory. Our PostgreSQL data will also be persisted through a bind-mounted directory. This allows us to persist data external to the ephemeral containers. If the containers are restarted or recreated, the data is preserved locally.

JupyterDiagram

Source Code

All source code for this post can be found on GitHub. Use the following command to clone the project. Note this post uses the v2 branch.

git clone \
  --branch v2 --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/pyspark-setup-demo.git

Source code samples are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers.

Deploy Docker Stack

To start, create the $HOME/data/postgres directory to store PostgreSQL data files.

mkdir -p ~/data/postgres

This directory will be bind-mounted into the PostgreSQL container on line 41 of the stack.yml file, $HOME/data/postgres:/var/lib/postgresql/data. The HOME environment variable assumes you are working on Linux or macOS and is equivalent to HOMEPATH on Windows.

The Jupyter container’s working directory is set on line 15 of the stack.yml file, working_dir: /home/$USER/work. The local bind-mounted working directory is $PWD/work. This path is bind-mounted to the working directory in the Jupyter container, on line 29 of the Docker stack file, $PWD/work:/home/$USER/work. The PWD environment variable assumes you are working on Linux or macOS (CD on Windows).

By default, the user within the Jupyter container is jovyan. We will override that user with our own local host’s user account, as shown on line 21 of the Docker stack file, NB_USER: $USER. We will use the host’s USER environment variable value (equivalent to USERNAME on Windows). There are additional options for configuring the Jupyter container. Several of those options are used on lines 17–22 of the Docker stack file (gist).

The jupyter/all-spark-notebook Docker image is large, approximately 5 GB. Depending on your Internet connection, if this is the first time you have pulled this image, the stack may take several minutes to enter a running state. Although not required, I usually pull new Docker images in advance.

docker pull jupyter/all-spark-notebook:latest
docker pull postgres:12-alpine
docker pull adminer:latest

Assuming you have a recent version of Docker installed on your local development machine and running in swarm mode, standing up the stack is as easy as running the following docker command from the root directory of the project.

docker stack deploy -c stack.yml jupyter

The Docker stack consists of a new overlay network, jupyter_demo-net, and three containers. To confirm the stack deployed successfully, run the following docker command.

docker stack ps jupyter --no-trunc

screen-shot-2019-12-04-at-10_34_40-am

To access the Jupyter Notebook application, you need to obtain the Jupyter URL and access token. The Jupyter URL and the access token are output to the Jupyter container log, which can be accessed with the following command.

docker logs $(docker ps | grep jupyter_spark | awk '{print $NF}')

You should observe log output similar to the following. Retrieve the complete URL, for example, http://127.0.0.1:8888/?token=f78cbe..., to access the Jupyter web-based user interface.

screen-shot-2019-12-04-at-10_34_52-am

From the Jupyter dashboard landing page, you should see all the files in the project’s work/ directory. Note the types of files you can create from the dashboard, including Python 3, R, and Scala (using Apache Toree or spylon-kernal) notebooks, and text. You can also open a Jupyter terminal or create a new Folder from the drop-down menu. At the time of this post (March 2020), the latest jupyter/all-spark-notebook Docker Image runs Spark 2.4.5, Scala 2.11.12, Python 3.7.6, and OpenJDK 64-Bit Server VM, Java 1.8.0 Update 242.

screen_shot_2019-12-01_at_4_40_12_pm

Bootstrap Environment

Included in the project is a bootstrap script, bootstrap_jupyter.sh. The script will install the required Python packages using pip, the Python package installer, and a requirement.txt file. The bootstrap script also installs the latest PostgreSQL driver JAR, configures Apache Log4j to reduce log verbosity when submitting Spark jobs, and installs htop. Although these tasks could also be done from a Jupyter terminal or from within a Jupyter notebook, using a bootstrap script ensures you will have a consistent work environment every time you spin up the Jupyter Docker stack. Add or remove items from the bootstrap script as necessary.

That’s it, our new Jupyter environment is ready to start exploring.

Running Python Scripts

One of the simplest tasks we could perform in our new Jupyter environment is running Python scripts. Instead of worrying about installing and maintaining the correct versions of Python and multiple Python packages on your own development machine, we can run Python scripts from within the Jupyter container. At the time of this post update, the latest jupyter/all-spark-notebook Docker image runs Python 3.7.3 and Conda 4.7.12. Let’s start with a simple Python script, 01_simple_script.py.

From a Jupyter terminal window, use the following command to run the script.

python3 01_simple_script.py

You should observe the following output.

screen_shot_2019-12-01_at_4_46_38_pm

Kaggle Datasets

To explore more features of the Jupyter and PySpark, we will use a publicly available dataset from Kaggle. Kaggle is an excellent open-source resource for datasets used for big-data and ML projects. Their tagline is ‘Kaggle is the place to do data science projects’. For this demonstration, we will use the Transactions from a bakery dataset from Kaggle. The dataset is available as a single CSV-format file. A copy is also included in the project.

pyspark_article_03_kaggle

The ‘Transactions from a bakery’ dataset contains 21,294 rows with 4 columns of data. Although certainly not big data, the dataset is large enough to test out the Spark Jupyter Docker Stack functionality. The data consists of 9,531 customer transactions for 21,294 bakery items between 2016-10-30 and 2017-04-09 (gist).

Submitting Spark Jobs

We are not limited to Jupyter notebooks to interact with Spark. We can also submit scripts directly to Spark from the Jupyter terminal. This is typically how Spark is used in a Production for performing analysis on large datasets, often on a regular schedule, using tools such as Apache Airflow. With Spark, you are load data from one or more data sources. After performing operations and transformations on the data, the data is persisted to a datastore, such as a file or a database, or conveyed to another system for further processing.

The project includes a simple Python PySpark ETL script, 02_pyspark_job.py. The ETL script loads the original Kaggle Bakery dataset from the CSV file into memory, into a Spark DataFrame. The script then performs a simple Spark SQL query, calculating the total quantity of each type of bakery item sold, sorted in descending order. Finally, the script writes the results of the query to a new CSV file, output/items-sold.csv.

Run the script directly from a Jupyter terminal using the following command.

python3 02_pyspark_job.py

An example of the output of the Spark job is shown below.
screen-shot-2019-12-03-at-9_31_38-am

Typically, you would submit the Spark job using the spark-submit command. Use a Jupyter terminal to run the following command.

$SPARK_HOME/bin/spark-submit 02_pyspark_job.py

Below, we see the output from the spark-submit command. Printing the results in the output is merely for the purposes of the demo. Typically, Spark jobs are submitted non-interactively, and the results are persisted directly to a datastore or conveyed to another system.
screen-shot-2019-12-03-at-9_32_25-am

Using the following commands, we can view the resulting CVS file, created by the spark job.

ls -alh output/items-sold.csv/
head -5 output/items-sold.csv/*.csv

An example of the files created by the spark job is shown below. We should have discovered that coffee is the most commonly sold bakery item with 5,471 sales, followed by bread with 3,325 sales.
screen-shot-2019-12-03-at-9_32_52-am

Interacting with Databases

To demonstrate the flexibility of Jupyter to work with databases, PostgreSQL is part of the Docker Stack. We can read and write data from the Jupyter container to the PostgreSQL instance, running in a separate container. To begin, we will run a SQL script, written in Python, to create our database schema and some test data in a new database table. To do so, we will use the psycopg2, the PostgreSQL database adapter package for the Python, we previously installed into our Jupyter container using the bootstrap script. The below Python script, 03_load_sql.py, will execute a set of SQL statements contained in a SQL file, bakery.sql, against the PostgreSQL container instance.

The SQL file, bakery.sql.

To execute the script, run the following command.

python3 03_load_sql.py

This should result in the following output, if successful.
screen-shot-2019-12-03-at-9_34_26-am

Adminer

To confirm the SQL script’s success, use Adminer. Adminer (formerly phpMinAdmin) is a full-featured database management tool written in PHP. Adminer natively recognizes PostgreSQL, MySQL, SQLite, and MongoDB, among other database engines. The current version is 4.7.6 (March 2020).

Adminer should be available on localhost port 8080. The password credentials, shown below, are located in the stack.yml file. The server name, postgres, is the name of the PostgreSQL Docker container. This is the domain name the Jupyter container will use to communicate with the PostgreSQL container.
screen_shot_2019-12-01_at_6_09_57_pm

Connecting to the new bakery database with Adminer, we should see the transactions table.
screen_shot_2019-12-01_at_6_10_20_pm

The table should contain 21,293 rows, each with 5 columns of data.

screen_shot_2019-12-01_at_6_11_32_pm

pgAdmin

Another excellent choice for interacting with PostgreSQL, in particular, is pgAdmin 4. It is my favorite tool for the administration of PostgreSQL. Although limited to PostgreSQL, the user interface and administrative capabilities of pgAdmin is superior to Adminer, in my opinion. For brevity, I chose not to include pgAdmin in this post. The Docker stack also contains a pgAdmin container, which has been commented out. To use pgAdmin, just uncomment the container and re-run the Docker stack deploy command. pgAdmin should then be available on localhost port 81. The pgAdmin login credentials are in the Docker stack file.

screen_shot_2019-12-05_at_10_11_15_amscreen_shot_2019-12-05_at_10_11_44_am

Developing Jupyter Notebooks

The real power of the Jupyter Docker Stacks containers is Jupyter Notebooks. According to the Jupyter Project, the notebook extends the console-based approach to interactive computing in a qualitatively new direction, providing a web-based application suitable for capturing the whole computation process, including developing, documenting, and executing code, as well as communicating the results. Notebook documents contain the inputs and outputs of an interactive session as well as additional text that accompanies the code but is not meant for execution.

To explore the capabilities of Jupyter notebooks, the project includes two simple Jupyter notebooks. The first notebooks, 04_notebook.ipynb, demonstrates typical PySpark functions, such as loading data from a CSV file and from the PostgreSQL database, performing basic data analysis with Spark SQL including the use of PySpark user-defined functions (UDF), graphing the data using BokehJS, and finally, saving data back to the database, as well as to the fast and efficient Apache Parquet file format. Below we see several notebook cells demonstrating these features.screen_shot_2019-12-04_at_11_05_00_pmscreen_shot_2019-12-04_at_11_05_07_pmscreen_shot_2019-12-04_at_11_05_22_pmscreen_shot_2019-12-05_at_3_54_11_pmscreen_shot_2019-12-04_at_11_14_22_pm

IDE Integration

Recall, the working directory, containing the GitHub source code for the project, is bind-mounted to the Jupyter container. Therefore, you can also edit any of the files, including notebooks, in your favorite IDE, such as JetBrains PyCharm and Microsoft Visual Studio Code. PyCharm has built-in language support for Jupyter Notebooks, as shown below.
screen_shot_2019-12-01_at_9_21_49_pm

As does Visual Studio Code using the Python extension.

screen_shot_2019-12-08_at_8_02_43_pm.png

Using Additional Packages

As mentioned in the Introduction, the Jupyter Docker Stacks come ready-to-run, with a wide variety of Python packages to extend their functionality. To demonstrate the use of these packages, the project contains a second Jupyter notebook document, 05_notebook.ipynb. This notebook uses SciPy, the well-known Python package for mathematics, science, and engineering, NumPy, the well-known Python package for scientific computing, and the Plotly Python Graphing Library. While NumPy and SciPy are included on the Jupyter Docker Image, the bootstrap script uses pip to install the required Plotly packages. Similar to Bokeh, shown in the previous notebook, we can use these libraries to create richly interactive data visualizations.

Plotly

To use Plotly from within the notebook, you will first need to sign up for a free account and obtain a username and API key. To ensure we do not accidentally save sensitive Plotly credentials in the notebook, we are using python-dotenv. This Python package reads key/value pairs from a .env file, making them available as environment variables to our notebook. Modify and run the following two commands from a Jupyter terminal to create the .env file and set you Plotly username and API key credentials. Note that the .env file is part of the .gitignore file and will not be committed to back to git, potentially compromising the credentials.

echo "PLOTLY_USERNAME=your-username" >> .env
echo "PLOTLY_API_KEY=your-api-key" >> .env

The notebook expects to find the two environment variables, which it uses to authenticate with Plotly.

screen_shot_2019-12-04_at_11_20_06_pm

Shown below, we use Plotly to construct a bar chart of daily bakery items sold. The chart uses SciPy and NumPy to construct a linear fit (regression) and plot a line of best fit for the bakery data and overlaying the vertical bars. The chart also uses SciPy’s Savitzky-Golay Filter to plot the second line, illustrating a smoothing of our bakery data.

screen_shot_2019-12-05_at_11_14_27_am

Plotly also provides Chart Studio Online Chart Maker. Plotly describes Chart Studio as the world’s most modern enterprise data visualization solutions. We can enhance, stylize, and share our data visualizations using the free version of Chart Studio Cloud.

screen_shot_2019-12-05_at_11_15_55_am

Jupyter Notebook Viewer

Notebooks can also be viewed using nbviewer, an open-source project under Project Jupyter. Thanks to Rackspace hosting, the nbviewer instance is a free service.

screen_shot_2019-12-05_at_11_28_01_am.png

Using nbviewer, below, we see the output of a cell within the 04_notebook.ipynb notebook. View this notebook, here, using nbviewer.

screen_shot_2019-12-04_at_11_39_28_pm

Monitoring Spark Jobs

The Jupyter Docker container exposes Spark’s monitoring and instrumentation web user interface. We can review each completed Spark Job in detail.
screen-shot-2019-12-04-at-11_56_19-pm

We can review details of each stage of the Spark job, including a visualization of the DAG (Directed Acyclic Graph), which Spark constructs as part of the job execution plan, using the DAG Scheduler.
screen-shot-2019-12-04-at-11_57_18-pm

We can also review the task composition and timing of each event occurring as part of the stages of the Spark job.
screen-shot-2019-12-04-at-11_57_48-pm

We can also use the Spark interface to review and confirm the runtime environment configuration, including versions of Java, Scala, and Spark, as well as packages available on the Java classpath.
screen-shot-2019-12-04-at-11_58_02-pm

Local Spark Performance

Running Spark on a single node within the Jupyter Docker container on your local development system is not a substitute for a true Spark cluster, Production-grade, multi-node Spark clusters running on bare metal or robust virtualized hardware, and managed with Apache Hadoop YARN, Apache Mesos, or Kubernetes. In my opinion, you should only adjust your Docker resources limits to support an acceptable level of Spark performance for running small exploratory workloads. You will not realistically replace the need to process big data and execute jobs requiring complex calculations on a Production-grade, multi-node Spark cluster.
screen_shot_2019-12-03_at_9_18_36_am

We can use the following docker stats command to examine the container’s CPU and memory metrics.

docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"

Below, we see the stats from the Docker stack’s three containers showing little or no activity.
Screen Shot 2019-12-05 at 12.16.14 AM

Compare those stats with the ones shown below, recorded while a notebook was reading and writing data, and executing Spark SQL queries. The CPU and memory output show spikes, but both appear to be within acceptable ranges.
Screen Shot 2019-12-05 at 12.13.49 AM

Linux Process Monitors

Another option to examine container performance metrics is top, which is pre-installed in our Jupyter container. For example, execute the following top command from a Jupyter terminal, sorting processes by CPU usage.

top -o %CPU

We should observe the individual performance of each process running in the Jupyter container.

screen_shot_2019-12-05_at_12_25_51_pm.png

A step up from top is htop, an interactive process viewer for Unix. It was installed in the container by the bootstrap script. For example, we can execute the htop command from a Jupyter terminal, sorting processes by CPU % usage.

htop --sort-key PERCENT_CPU

With htop, observe the individual CPU activity. Here, the four CPUs at the top left of the htop window are the CPUs assigned to Docker. We get insight into the way Spark is using multiple CPUs, as well as other performance metrics, such as memory and swap.

screen_shot_2019-12-02_at_12_08_18_pm.png

Assuming your development machine is robust, it is easy to allocate and deallocate additional compute resources to Docker if required. Be careful not to allocate excessive resources to Docker, starving your host machine of available compute resources for other applications.
screen_shot_2019-12-03_at_9_18_50_am

Notebook Extensions

There are many ways to extend the Jupyter Docker Stacks. A popular option is jupyter-contrib-nbextensions. According to their website, the jupyter-contrib-nbextensions package contains a collection of community-contributed unofficial extensions that add functionality to the Jupyter notebook. These extensions are mostly written in JavaScript and will be loaded locally in your browser. Installed notebook extensions can be enabled, either by using built-in Jupyter commands, or more conveniently by using the jupyter_nbextensions_configurator server extension.

The project contains an alternate Docker stack file, stack-nbext.yml. The stack uses an alternative Docker image, garystafford/all-spark-notebook-nbext:latest, This Dockerfile, which builds it, uses the jupyter/all-spark-notebook:latest image as a base image. The alternate image adds in the jupyter-contrib-nbextensions and jupyter_nbextensions_configurator packages. Since Jupyter would need to be restarted after nbextensions is deployed, it cannot be done from within a running jupyter/all-spark-notebook container.

Using this alternate stack, below in our Jupyter container, we see the sizable list of extensions available. Some of my favorite extensions include ‘spellchecker’, ‘Codefolding’, ‘Code pretty’, ‘ExecutionTime’, ‘Table of Contents’, and ‘Toggle all line numbers’.

screen_shot_2019-12-05_at_7_53_17_pm.png

Below, we see five new extension icons that have been added to the menu bar of 04_notebook.ipynb. You can also observe the extensions have been applied to the notebook, including the table of contents, code-folding, execution time, and line numbering. The spellchecking and pretty code extensions were both also applied.

screen_shot_2019-12-05_at_7_47_12_pm

Conclusion

In this brief post, we have seen how easy it is to get started learning and performing data analytics using Jupyter notebooks, Python, Spark, and PySpark, all thanks to the Jupyter Docker Stacks. We could use this same stack to learn and perform machine learning using Scala and R. Extending the stack’s capabilities is as simple as swapping out this Jupyter image for another, with a different set of tools, as well as adding additional containers to the stack, such as MySQL, MongoDB, RabbitMQ, Apache Kafka, and Apache Cassandra.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients.

, , , , , , , ,

1 Comment

Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 2

Introduction

In Part 1 of this two-part post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on Amazon Elastic MapReduce (EMR). Further, we configured Zeppelin integrations with AWS Glue Data CatalogAmazon Relational Database Service (RDS) for PostgreSQL, and Amazon Simple Cloud Storage Service (S3) Data Lake. We also covered how to obtain the project’s source code from the two GitHub repositories, zeppelin-emr-demo and zeppelin-emr-config. Below is a high-level architectural diagram of the infrastructure we constructed in Part 1 for this demonstration.

EMR-Zeppelin

Part 2

In Part 2 of this post, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using a series of four Zeppelin notebooks. Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.

Notebook 1

The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.

Interpreters

When you open a notebook for the first time, you are given the choice of interpreters to bind and unbind to the notebook. The last interpreter in the list shown below, postgres, is the new PostgreSQL JDBC Zeppelin interpreter we created in Part 1 of this post. We will use this interpreter in Notebook 3.

screen-shot-2019-11-24-at-8_03_50-pm

Application Versions

The first two paragraphs of the notebook are used to confirm the version of Spark, Scala, OpenJDK, and Python we are using. Recall we updated the Spark and Python interpreters to use Python 3.

screen_shot_2019-11-26_at_6_58_33_pm

Helium Visualizations

If you recall from Part 1 of the post, we pre-installed several additional Helium Visualizations, including the Ultimate Pie Chart. Below, we see the use of the Spark SQL (%sql) interpreter to query a Spark DataFrame, return results, and visualize the data using the Ultimate Pie Chart.  In addition to the pie chart, we see the other pre-installed Helium visualizations proceeding the five default visualizations, in the menu bar. With Zeppelin, all we have to do is write Spark SQL queries against the Spark DataFrame created earlier in the notebook, and Zeppelin will handle the visualization. You have some basic controls over charts using the ‘settings’ option.

screen-shot-2019-11-24-at-8_06_56-pm

Building the Data Lake

Notebook 1 demonstrates how to read and write data to S3. We read and write the Bakery dataset to both CSV-format and Apache Parquet-format, using Spark (PySpark). We also write the results of Spark SQL queries, like the one above, in Parquet, to S3.

screen-shot-2019-11-24-at-8_20_18-pm

With Parquet, data may be split into multiple files, as shown in the S3 bucket directory below. Parquet is much faster to read into a Spark DataFrame than CSV. Spark provides support for both reading and writing Parquet files. We will write all of our data to Parquet in S3, making future re-use of the data much more efficient than downloading data from the Internet, like GroupLens or kaggle, or consuming CSV from S3.

screen-shot-2019-11-24-at-8_22_46-pm

Preview S3 Data

In addition to using the Zeppelin notebook, we can preview data right in the S3 bucket web interface using the Amazon S3 Select feature. This query in place feature is helpful to quickly understand the structure and content of new data files with which you want to interact within Zeppelin.

screen-shot-2019-11-24-at-8_23_33-pm

screen-shot-2019-11-24-at-8_23_40-pm

screen_shot_2019-11-28_at_7_41_49_pm.png

Saving Changes to GitHub

In Part 1, we configured Zeppelin to read and write the notebooks from your own copy of the GitHub notebook repository. Using the ‘version control’ menu item, changes made to the notebooks can be committed directly to GitHub.

screen-shot-2019-11-24-at-8_16_36-pm

screen-shot-2019-11-24-at-8_38_19-pm

In GitHub, note the committer is the zeppelin user.

screen_shot_2019-11-26_at_7_48_42_pm.png

Notebook 2

The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.

screen-shot-2019-11-24-at-8_41_31-pm

Multi-Node EMR Cluster

If you recall from Part 1, we waited to create this cluster due to the compute costs of running the cluster’s large EC2 instances. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.

Normalized Instance Hours
Understanding the costs of EMR requires understanding the concept of normalized instance hours. Cluster displayed in the EMR AWS Console contains two columns, ‘Elapsed time’ and ‘Normalized instance hours’. The ‘Elapsed time’ column reflects the actual wall-clock time the cluster was used. The ‘Normalized instance hours’ column indicates the approximate number of compute hours the cluster has used, rounded up to the nearest hour.

screen_shot_2019-11-28_at_6_09_38_pm

Normalized instance hours calculations are based on a normalization factor. The normalization factor ranges from 1 for a small instance, up to 64 for an 8xlarge. Based on the type and quantity of instances in our multi-node cluster, we would use approximately 56 compute hours (aka normalized instance hours) for every one hour of wall-clock time our EMR cluster is running. Note the multi-node cluster used in our demo, highlighted in yellow above. The cluster ran for two hours, which equated to 112 normalized instance hours.

Screen Shot 2019-12-16 at 9.48.59 PM.png

Create the Multi-Node Cluster

Create the multi-node EMR cluster using CloudFormation. Change the following nine variable values, then run the emr cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
EC2_KEY_NAME="your-key-name"
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"
GITHUB_ACCOUNT="your-account-name"
GITHUB_REPO="your-new-project-name"
GITHUB_TOKEN="your-token-value"
MASTER_INSTANCE_TYPE="m5.xlarge" # optional
CORE_INSTANCE_TYPE="m5.2xlarge" # optional
CORE_INSTANCE_COUNT=3 # optional

aws cloudformation create-stack \
    --stack-name zeppelin-emr-prod-stack \
    --template-body file://cloudformation/emr_cluster.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
                 ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \
                 ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \
                 ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \
                 ParameterKey=CoreInstanceType,ParameterValue=${CORE_INSTANCE_TYPE} \
                 ParameterKey=CoreInstanceCount,ParameterValue=${CORE_INSTANCE_COUNT} \
                 ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \
                 ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \
                 ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}

Use the Amazon EMR web interface to confirm the success of the CloudFormation stack. The fully-provisioned cluster should be in the ‘Waiting’ state when ready.

screen_shot_2019-11-26_at_4_58_05_pm

Configuring the EMR Cluster

Refer to Part 1 for the configuration steps necessary to prepare the EMR cluster and Zeppelin before continuing. Repeat all the steps used for the single-node cluster.

Monitoring with Ganglia

In Part 1, we installed Ganglia as part of creating the EMR cluster. Ganglia, according to its website, is a scalable distributed monitoring system for high-performance computing systems such as clusters and grids. Ganglia can be used to evaluate the performance of the single-node and multi-node EMR clusters. With Ganglia, we can easily view cluster and individual instance CPU, memory, and network I/O performance.

screen-shot-2019-11-24-at-8_46_46-pm
Ganglia Example: Cluster CPU

screen-shot-2019-11-24-at-8_48_44-pm
Ganglia Example: Cluster Memory

screen_shot_2019-11-26_at_5_18_51_pm
Ganglia Example: Cluster Network I/O

YARN Resource Manager

The YARN Resource Manager Web UI is also available on our EMR cluster. Using the Resource Manager, we can view the compute resource load on the cluster, as well as the individual EMR Core nodes. Below, we see that the multi-node cluster has 24 vCPUs and 72 GiB of memory available, split evenly across the three Core cluster nodes.

You might recall, the m5.2xlarge EC2 instance type, used for the three Core nodes, each contains 8 vCPUs and 32 GiB of memory. However, by default, although all 8 vCPUs are available for computation per node, only 24 GiB of the node’s 32 GiB of memory are available for computation. EMR ensures a portion of the memory on each node is reserved for other system processes. The maximum available memory is controlled by the YARN memory configuration option, yarn.scheduler.maximum-allocation-mb.

screen_shot_2019-11-26_at_5_15_00_pm

The YARN Resource Manager preview above shows the load on the Code nodes as Notebook 2 is executing the Spark SQL queries on the large MovieLens with 27MM ratings. Note that only 4 of the 24 vCPUs (16.6%) are in use, but that 70.25 of the 72 GiB (97.6%) of available memory is being used. According to Spark, because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth. In this case, memory appears to be the most constrained resource. Using memory-optimized instances, such as r4 or r5 instance types, might be more effective for the core nodes than the m5 instance types.

MovieLens Datasets

By changing one variable in the notebook, we can work with the latest, smaller GroupLens MovieLens dataset containing approximately 100k rows (ml-latest-small) or the larger dataset, containing approximately 27M rows (ml-latest). For this demo, try both datasets on both the single-node and multi-node clusters. Compare the Spark SQL paragraph execution times for each of the four variations, including single-node with the small dataset, single-node with the large dataset, multi-node with the small dataset, and multi-node with the large dataset. Observe how fast the SQL queries are executed on the single-node versus multi-node cluster. Try switching to a different Core node instance type, such as r5.2xlarge. Try creating a cluster with additional Core nodes. How is the compute time effected?

screen_shot_2019-11-26_at_5_02_34_pm

Terminate the multi-node EMR cluster to save yourself the expense before continuing to Notebook 3.

aws cloudformation delete-stack \
    --stack-name=zeppelin-emr-prod-stack

Notebook 3

The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.

screen_shot_2019-11-27_at_11_44_44_pm

Glue Crawlers

Before continuing with Notebook 3, run the two Glue Crawlers using the AWS CLI.

aws glue start-crawler --name bakery-transactions-crawler
aws glue start-crawler --name movie-ratings-crawler

The two Crawlers will create a total of seven tables in the Glue Data Catalog database.

screen_shot_2019-11-27_at_8_50_09_pm

If we examine the Glue Data Catalog database, we should now observe several tables, one for each dataset found in the S3 bucket. The location of each dataset is shown in the ‘Location’ column of the tables view.

screen-shot-2019-11-24-at-9_14_19-pm

From the Zeppelin notebook, we can even use Spark SQL to query the AWS Glue Data Catalog, itself, for its databases and the tables within them.

screen-shot-2019-11-24-at-9_12_52-pm

According to Amazon, the Glue Data Catalog tables and databases are containers for the metadata definitions that define a schema for underlying source data. Using Zeppelin’s SQL interpreter, we can query the Data Catalog database and return the underlying source data. The SQL query example, below, demonstrates how we can perform a join across two tables in the data catalog database, representing two different data sources, and return results.

screen-shot-2019-11-24-at-9_09_26-pm

Notebook 4

The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.

screen-shot-2019-11-27-at-11_26_34-am.png

First, we create a new schema and four related tables for the RDS PostgreSQL movie ratings database, using the Psycopg 2 PostgreSQL adapter for Python and the SQL file we copied to S3 in Part 1.

screen_shot_2019-11-27_at_11_09_42_am

The RDS database’s schema, shown below, approximates the schema of the four CSV files from the GroupLens MovieLens rating dataset we used in Notebook 2.

screen_shot_2019-11-22_at_12_55_25_pm

Since the schema of the PostgreSQL database matches the MovieLens dataset files, we can import the data from the CVS files, downloaded from GroupLens, directly into the RDS database, again using the Psycopg PostgreSQL adapter for Python.

screen_shot_2019-11-27_at_11_09_52_am

According to the Spark documentation, Spark SQL also includes a data source that can read data from other databases using JDBC. Using Spark’s JDBC capability and the PostgreSQL JDBC Driver we installed in Part 1, we can perform Spark SQL queries against the RDS database using PySpark (%spark.pyspark). Below, we see a paragraph example of reading the RDS database’s movies table, using Spark.

screen_shot_2019-11-27_at_11_10_01_am

As a third method of querying the RDS database, we can use the custom Zeppelin PostgreSQL JDBC interpreter (%postgres) we created in Part 1. Although the default driver of the JDBC interpreter is set as PostgreSQL, and the associated JAR is included with Zeppelin, we overrode that older JAR, with the latest PostgreSQL JDBC Driver JAR.

Using the %postgres interpreter, we query the RDS database’s public schema, and return the four database tables we created earlier in the notebook.

screen_shot_2019-11-27_at_11_10_26_am

Again, below, using the %postgres interpreter in the notebook’s paragraph, we query the RDS database and return data, which we then visualize using Zeppelin’s bar chart. Finally, note the use of Zeppelin Dynamic Forms in this example. Dynamic Forms allows Zeppelin to dynamically creates input forms, whose input values are then available to use programmatically. Here, we use two form input values to control the data returned from our query and the resulting visualization.

screen_shot_2019-11-27_at_11_10_54_am

Conclusion

In this two-part post, we learned how effectively Apache Zeppelin integrates with Amazon EMR. We also learned how to extend Zeppelin’s capabilities, using  AWS Glue, Amazon RDS, and Amazon S3 as a Data Lake. Beyond what was covered in this post, there are dozens of more Zeppelin and EMR features, as well as dozens of more AWS services that integrate with Zeppelin and EMR, for you to discover.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

1 Comment

Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 1

Introduction

There is little question big data analyticsdata scienceartificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last 3–5 years. Behind the hype cycles and marketing buzz, these technologies are having a significant influence on many aspects of our modern lives. Due to their popularity, commercial enterprises, academic institutions, and the public sector have all rushed to develop hardware and software solutions to decrease the barrier to entry and increase the velocity of ML and Data Scientists and Engineers.

screen_shot_2019-11-17_at_7_24_10_am1Data Science: 5-Year Search Trend (courtesy Google Trends)

screen_shot_2019-11-17_at_7_24_10_am2Machine Learning: 5-Year Search Trend (courtesy Google Trends)

Technologies

All three major cloud providers, Amazon Web Services (AWS), Microsoft Azure, and Google Cloud, have rapidly maturing big data analytics, data science, and AI and ML services. AWS, for example, introduced Amazon Elastic MapReduce (EMR) in 2009, primarily as an Apache Hadoop-based big data processing service. Since then, according to Amazon, EMR has evolved into a service that uses Apache SparkApache Hadoop, and several other leading open-source frameworks to quickly and cost-effectively process and analyze vast amounts of data. More recently, in late 2017, Amazon released SageMaker, a service that provides the ability to build, train, and deploy machine learning models quickly and securely.

Simultaneously, organizations are building solutions that integrate and enhance these Cloud-based big data analytics, data science, AI, and ML services. One such example is Apache Zeppelin. Similar to the immensely popular Project Jupyter and the newly open-sourced Netflix’s Polynote, Apache Zeppelin is a web-based, polyglot, computational notebook. Zeppelin enables data-driven, interactive data analytics and document collaboration using a number of interpreters such as Scala (with Apache Spark), Python (with Apache Spark), Spark SQL, JDBC, Markdown, Shell and so on. Zeppelin is one of the core applications supported natively by Amazon EMR.

screen_shot_2019-11-15_at_11_59_16_pm

In the following two-part post, we will explore the use of Apache Zeppelin on EMR for data science and data analytics using a series of Zeppelin notebooks. The notebooks feature the use of AWS Glue, the fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load data for analytics. The notebooks also feature the use of Amazon Relational Database Service (RDS) for PostgreSQL and Amazon Simple Cloud Storage Service (S3). Amazon S3 will serve as a Data Lake to store our unstructured data. Given the current choice of Zeppelin’s more than twenty different interpreters, we will use Python3 and Apache Spark, specifically Spark SQL and PySpark, for all notebooks.

zeppelin_header

We will build an economical single-node EMR cluster for data exploration, as well as a larger multi-node EMR cluster for analyzing large data sets. Amazon S3 will be used to store input and output data, while intermediate results are stored in the Hadoop Distributed File System (HDFS) on the EMR cluster. Amazon provides a good overview of EMR architecture. Below is a high-level architectural diagram of the infrastructure we will construct during Part 1 for this demonstration.

EMR-Zeppelin.png

Notebook Features

Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin, including Moonsoo Lee, Zepl CTO and creator for Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.

Notebook 1

The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.

screen_shot_2019-11-27_at_11_13_53_am

Notebook 2

The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.

screen_shot_2019-11-27_at_11_14_01_am

Notebook 3

The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.

screen_shot_2019-11-27_at_11_44_44_pm.png

Notebook 4

The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.

screen-shot-2019-11-27-at-11_26_34-am

Demonstration

In Part 1 of the post, as a DataOps Engineer, we will create and configure the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3-based data lake. In Part 2 of this post, as a Data Scientist, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the Zeppelin notebooks.

Source Code

The demonstration’s source code is contained in two public GitHub repositories. The first repository, zeppelin-emr-demo, includes the four Zeppelin notebooks, organized according to the conventions of Zeppelin’s pluggable notebook storage mechanisms.

.
├── 2ERVVKTCG
│   └── note.json
├── 2ERYY923A
│   └── note.json
├── 2ESH8DGFS
│   └── note.json
├── 2EUZKQXX7
│   └── note.json
├── LICENSE
└── README.md

Zeppelin GitHub Storage

During the demonstration, changes made to your copy of the Zeppelin notebooks running on EMR will be automatically pushed back to GitHub when a commit occurs. To accomplish this, instead of just cloning a local copy of my zeppelin-emr-demo project repository, you will want your own copy, within your personal GitHub account. You could folk my zeppelin-emr-demo GitHub repository or copy a clone into your own GitHub repository.

To make a copy of the project in your own GitHub account, first, create a new empty repository on GitHub, for example, ‘my-zeppelin-emr-demo-copy’. Then, execute the following commands from your terminal, to clone the original project repository to your local environment, and finally, push it to your GitHub account.

# change me
GITHUB_ACCOUNT="your-account-name" # i.e. garystafford
GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo-copy

# shallow clone into new directory
git clone --branch master \
    --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/zeppelin-emr-demo.git \
    ${GITHUB_REPO}

# re-initialize repository
cd ${GITHUB_REPO}
rm -rf .git
git init

# re-commit code
git add -A
git commit -m "Initial commit of my copy of zeppelin-emr-demo"

# push to your repo
git remote add origin \
    https://github.com/$GITHUB_ACCOUNT/$GITHUB_REPO.git
git push -u origin master

GitHub Personal Access Token

To automatically push changes to your GitHub repository when a commit occurs, Zeppelin will need a GitHub personal access token. Create a personal access token with the scope shown below. Be sure to keep the token secret. Make sure you do not accidentally check your token value into your source code on GitHub. To minimize the risk, change or delete the token after completing the demo.

screen_shot_2019-11-18_at_8_44_27_pm

The second repository, zeppelin-emr-config, contains the necessary bootstrap files, CloudFormation templates, and PostgreSQL DDL (Data Definition Language) SQL script.

.
├── LICENSE
├── README.md
├── bootstrap
│   ├── bootstrap.sh
│   ├── emr-config.json
│   ├── helium.json
├── cloudformation
│   ├── crawler.yml
│   ├── emr_single_node.yml
│   ├── emr_cluster.yml
│   └── rds_postgres.yml
└── sql
    └── ratings.sql

Use the following AWS CLI command to clone the GitHub repository to your local environment.

git clone --branch master \
    --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/zeppelin-emr-demo-setup.git

Requirements

To follow along with the demonstration, you will need an AWS Account, an existing Amazon S3 bucket to store EMR configuration and data, and an EC2 key pair. You will also need a current version of the AWS CLI installed in your work environment. Due to the particular EMR features, we will be using, I recommend using the us-east-1 AWS Region to create the demonstration’s resources.

# create secure emr config and data bucket
# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"

aws s3api create-bucket \
    --bucket ${ZEPPELIN_DEMO_BUCKET}
aws s3api put-public-access-block \
    --bucket ${ZEPPELIN_DEMO_BUCKET} \
    --public-access-block-configuration \
    BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

Copy Configuration Files to S3

To start, we need to copy three configuration files, bootstrap.sh, helium.json, and ratings.sql, from the zeppelin-emr-demo-setup project directory to our S3 bucket. Change the ZEPPELIN_DEMO_BUCKET variable value, then run the following s3 cp API command, using the AWS CLI. The three files will be copied to a bootstrap directory within your S3 bucket.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
aws s3 cp bootstrap/bootstrap.sh s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/
aws s3 cp bootstrap/helium.json s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/
aws s3 cp sql/ratings.sql s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/

Below, sample output from copying local files to S3.

screen-shot-2019-11-21-at-2_03_49-pm

Create AWS Resources

We will start by creating most of the required AWS resources for the demonstration using three CloudFormation templates. We will create a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, an AWS Glue Data Catalog database, two AWS Glue Crawlers, and a Glue IAM Role. We will wait to create the multi-node EMR cluster due to the compute costs of running large EC2 instances in the cluster. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.

Single-Node EMR Cluster

We will start by creating the single-node Amazon EMR cluster, consisting of just one master node with no core or task nodes (a cluster of one). All operations will take place on the master node.

Default EMR Resources

The following EMR instructions assume you have already created at least one EMR cluster in the past, in your current AWS Region, using the EMR web interface with the ‘Create Cluster – Quick Options’ option. Creating a cluster this way creates several additional AWS resources, such as the EMR_EC2_DefaultRole EC2 instance profile, the default EMR_DefaultRole EMR IAM Role, and the default EMR S3 log bucket.

screen_shot_2019-11-17_at_5_46_56_pm.png

If you haven’t created any EMR clusters using the EMR ‘Create Cluster – Quick Options’ in the past, don’t worry, you can also create the required resources with a few quick AWS CLI commands. Change the following LOG_BUCKET variable value, then run the aws emr and aws s3api API commands, using the AWS CLI. The LOG_BUCKET variable value follows the convention of aws-logs-awsaccount-region. For example, aws-logs-012345678901-us-east-1.

# create emr roles
aws emr create-default-roles

# create log secure bucket
# change me
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"

aws s3api create-bucket --bucket ${LOG_BUCKET}
aws s3api put-public-access-block --bucket ${LOG_BUCKET} \
    --public-access-block-configuration \
    BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

The new EMR IAM Roles can be viewed in the IAM Roles web interface.

screen-shot-2019-11-21-at-2_05_35-pm

Often, I see tutorials that reference these default EMR resources from the AWS CLI or CloudFormation, without any understanding or explanation of how they are created.

EMR Bootstrap Script

As part of creating our EMR cluster, the CloudFormation template, emr_single_node.yml, will call the bootstrap script we copied to S3, earlier, bootstrap.sh. The bootstrap script pre-installs required Python and Linux software packages, and the PostgreSQL driver JAR. The bootstrap script also clones your copy of the zeppelin-emr-demo GitHub repository.

#!/bin/bash
set -ex

if [[ $# -ne 2 ]] ; then
    echo "Script requires two arguments"
    exit 1
fi

GITHUB_ACCOUNT=$1
GITHUB_REPO=$2

# install extra python packages
sudo python3 -m pip install psycopg2-binary boto3

# install extra linux packages
yes | sudo yum install git htop

# clone github repo
cd /tmp
git clone "https://github.com/${GITHUB_ACCOUNT}/${GITHUB_REPO}.git"

# install extra jars
POSTGRES_JAR="postgresql-42.2.8.jar"
wget -nv "https://jdbc.postgresql.org/download/${POSTGRES_JAR}"
sudo chown -R hadoop:hadoop ${POSTGRES_JAR}
mkdir -p /home/hadoop/extrajars/
cp ${POSTGRES_JAR} /home/hadoop/extrajars/

EMR Application Configuration

The EMR CloudFormation template will also modify the EMR cluster’s Spark and Zeppelin application configurations. Amongst other configuration properties, the template sets the default Python version to Python3, instructs Zeppelin to use the cloned GitHub notebook directory path, and adds the PostgreSQL Driver JAR to the JVM ClassPath. Below we can see the configuration properties applied to an existing EMR cluster.

screen-shot-2019-11-21-at-2_24_43-pm
EMR Application Versions

As of the date of this post, EMR is at version 5.28.0. Below, as shown in the EMR web interface, are the current (21) applications and frameworks available for installation on EMR.

emr-28.png

For this demo, we will install Apache Spark v2.4.4, Ganglia v3.7.2, and Zeppelin 0.8.2.

screen_shot_2019-11-17_at_8_32_17_pmApache Zeppelin: Web Interface

screen_shot_2019-11-13_at_5_40_12_pmApache Spark: DAG Visualization

screen_shot_2019-11-13_at_8_31_13_pmGanglia: Cluster CPU Monitoring

Create the EMR CloudFormation Stack

Change the following (7) variable values, then run the emr cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
EC2_KEY_NAME="your-key-name"
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"
GITHUB_ACCOUNT="your-account-name" # i.e. garystafford
GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo
GITHUB_TOKEN="your-token-value"
MASTER_INSTANCE_TYPE="m5.xlarge" # optional
 
aws cloudformation create-stack \
    --stack-name zeppelin-emr-dev-stack \
    --template-body file://cloudformation/emr_single_node.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
                 ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \
                 ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \
                 ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \
                 ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \
                 ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \
                 ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}

You can use the Amazon EMR web interface to confirm the results of the CloudFormation stack. The cluster should be in the ‘Waiting’ state.

screen_shot_2019-11-15_at_7_42_09_pm

PostgreSQL on Amazon RDS

Next, create a simple, single-AZ, single-master, non-replicated Amazon RDS PostgreSQL database, using the included CloudFormation template, rds_postgres.yml. We will use this database in Notebook 4. For the demo, I have selected the current-generation general purpose db.m4.large EC2 instance type to run PostgreSQL. You can easily change the instance type to another RDS-supported instance type to suit your own needs.

Change the following (3) variable values, then run the cloudformation create-stack API command, using the AWS CLI.

# change me
DB_MASTER_USER="your-db-username" # i.e. masteruser
DB_MASTER_PASSWORD="your-db-password" # i.e. 5up3r53cr3tPa55w0rd
MASTER_INSTANCE_TYPE="db.m4.large" # optional
 
aws cloudformation create-stack \
    --stack-name zeppelin-rds-stack \
    --template-body file://cloudformation/rds_postgres.yml \
    --parameters ParameterKey=DBUser,ParameterValue=${DB_MASTER_USER} \
                 ParameterKey=DBPassword,ParameterValue=${DB_MASTER_PASSWORD} \
                 ParameterKey=DBInstanceClass,ParameterValue=${MASTER_INSTANCE_TYPE}

You can use the Amazon RDS web interface to confirm the results of the CloudFormation stack.

screen_shot_2019-11-17_at_8_06_44_pm.png

AWS Glue

Next, create the AWS Glue Data Catalog database, the Apache Hive-compatible metastore for Spark SQL, two AWS Glue Crawlers, and a Glue IAM Role (ZeppelinDemoCrawlerRole), using the included CloudFormation template, crawler.yml. The AWS Glue Data Catalog database will be used in Notebook 3.

Change the following variable value, then run the cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
aws cloudformation create-stack \
    --stack-name zeppelin-crawlers-stack \
    --template-body file://cloudformation/crawler.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
    --capabilities CAPABILITY_NAMED_IAM

You can use the AWS Glue web interface to confirm the results of the CloudFormation stack. Note the Data Catalog database and the two Glue Crawlers. We will not run the two crawlers until Part 2 of the post, so no tables will exist in the Data Catalog database, yet.

screen_shot_2019-11-27_at_8_50_09_pmscreen_shot_2019-11-27_at_8_56_47_pm

At this point in the demonstration, you should have successfully created a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, and several AWS Glue resources, all using CloudFormation templates.

screen_shot_2019-11-21_at_4_19_01_pm

Post-EMR Creation Configuration

RDS Security

For the new EMR cluster to communicate with the RDS PostgreSQL database, we need to ensure that port 5432 is open from the RDS database’s VPC security group, which is the default VPC security group, to the security groups of the EMR nodes. Obtain the Group ID of the ElasticMapReduce-master and ElasticMapReduce-slave Security Groups from the EMR web interface.

screen_shot_2019-11-20_at_11_51_10_am

Access the Security Group for the RDS database using the RDS web interface. Change the inbound rule for port 5432 to include both Security Group IDs.

screen_shot_2019-11-20_at_11_52_23_am

SSH to EMR Master Node

In addition to the bootstrap script and configurations, we already applied to the EMR cluster, we need to make several post-EMR creation configuration changes to the EMR cluster for our demonstration. These changes will require SSH’ing to the EMR cluster. Using the master node’s public DNS address and SSH command provided in the EMR web console, SSH into the master node.

screen_shot_2019-11-15_at_7_42_09_pm_v3

If you cannot access the node using SSH, check that port 22 is open on the associated EMR master node IAM Security Group (ElasticMapReduce-master) to your IP address or address range.

screen_shot_2019-11-15_at_7_42_09_pm_v2

screen_shot_2019-11-21_at_4_51_01_pm.png

Git Permissions

We need to change permissions on the git repository we installed during the EMR bootstrapping phase. Typically, with an EC2 instance, you perform operations as the ec2-user user. With Amazon EMR, you often perform actions as the hadoop user. With Zeppelin on EMR, the notebooks perform operations, including interacting with the git repository as the zeppelin user. As a result of the bootstrap.sh script, the contents of the git repository directory, /tmp/zeppelin-emr-demo/, are owned by the hadoop user and group by default.

screen_shot_2019-11-17_at_8_01_24_pm

We will change their owner to the zeppelin user and group. We could not perform this step as part of the bootstrap script since the the zeppelin user and group did not exist at the time the script was executed.

cd /tmp/zeppelin-emr-demo/
sudo chown -R zeppelin:zeppelin .

The results should look similar to the following output.

screen_shot_2019-11-17_at_8_02_16_pm

Pre-Install Visualization Packages

Next, we will pre-install several Apache Zeppelin Visualization packages. According to the Zeppelin website, an Apache Zeppelin Visualization is a pluggable package that can be loaded/unloaded on runtime through the Helium framework in Zeppelin. We can use them just like any other built-in visualization in the notebook. A Visualization is a javascript npm package. For example, here is a link to the ultimate-pie-chart on the public npm registry.

We can pre-load plugins by replacing the /usr/lib/zeppelin/conf/helium.json file with the version of helium.json we copied to S3, earlier, and restarting Zeppelin. If you have a lot of Visualizations or package types or use any DataOps automation to create EMR clusters, this approach is much more efficient and repeatable than manually loading plugins using the Zeppelin UI, each time you create a new EMR cluster. Below, the helium.json file, which pre-loads (8) Visualization packages.

{
    "enabled": {
        "ultimate-pie-chart": "ultimate-pie-chart@0.0.2",
        "ultimate-column-chart": "ultimate-column-chart@0.0.2",
        "ultimate-scatter-chart": "ultimate-scatter-chart@0.0.2",
        "ultimate-range-chart": "ultimate-range-chart@0.0.2",
        "ultimate-area-chart": "ultimate-area-chart@0.0.1",
        "ultimate-line-chart": "ultimate-line-chart@0.0.1",
        "zeppelin-bubblechart": "zeppelin-bubblechart@0.0.4",
        "zeppelin-highcharts-scatterplot": "zeppelin-highcharts-scatterplot@0.0.2"
    },
    "packageConfig": {},
    "bundleDisplayOrder": [
        "ultimate-pie-chart",
        "ultimate-column-chart",
        "ultimate-scatter-chart",
        "ultimate-range-chart",
        "ultimate-area-chart",
        "ultimate-line-chart",
        "zeppelin-bubblechart",
        "zeppelin-highcharts-scatterplot"
    ]
}

Run the following commands to load the plugins and adjust the permissions on the file.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
sudo aws s3 cp s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/helium.json \
    /usr/lib/zeppelin/conf/helium.json
sudo chown zeppelin:zeppelin /usr/lib/zeppelin/conf/helium.json

Create New JDBC Interpreter

Lastly, we need to create a new Zeppelin JDBC Interpreter to connect to our RDS database. By default, Zeppelin has several interpreters installed. You can review a list of available interpreters using the following command.

sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --list

screen_shot_2019-11-18_at_6_29_40_am

The new JDBC interpreter will allow us to connect to our RDS PostgreSQL database, using Java Database Connectivity (JDBC). First, ensure all available interpreters are installed, including the current Zeppelin JDBC driver (org.apache.zeppelin:zeppelin-jdbc:0.8.0) to /usr/lib/zeppelin/interpreter/jdbc.

Creating a new interpreter is a two-part process. In this stage, we install the required interpreter files on the master node using the following command. Then later, in the Zeppelin web interface, we will configure the new PostgreSQL JDBC interpreter. Note we must provide a unique name for the interpreter (I chose ‘postgres’ in this case), which we will refer to in part two of the interpreter creation process.

sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --all
 
sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh \
    --name "postgres" \
    --artifact org.apache.zeppelin:zeppelin-jdbc:0.8.0

To complete the post-EMR creation configuration on the master node, we must restart Zeppelin for our changes to take effect.

sudo stop zeppelin && sudo start zeppelin

In my experience, it could take 2–3 minutes for the Zeppelin UI to become fully responsive after a restart.

screen_shot_2019-11-18_at_10_01_54_pm

Zeppelin Web Interface Access

With all the EMR application configuration complete, we will access the Zeppelin web interface running on the master node. Use the Zeppelin connection information provided in the EMR web interface to setup SSH tunneling to the Zeppelin web interface, running on the master node. Using this method, we can also access the Spark History Server, Ganglia, and Hadoop Resource Manager web interfaces; all links are provided from EMR.

screen_shot_2019-11-15_at_7_42_09_pm

To set up a web connection to the applications installed on the EMR cluster, I am using FoxyProxy as a proxy management tool with Google Chrome.

screen_shot_2019-11-17_at_8_22_09_pm.png

If everything is working so far, you should see the Zeppelin web interface with all four Zeppelin notebooks available from the cloned GitHub repository. You will be logged in as the anonymous user. Zeppelin offers authentication for accessing notebooks on the EMR cluster. For brevity, we will not cover setting up authentication in Zeppelin, using Shiro Authentication.

screen_shot_2019-11-17_at_8_32_17_pm.png

To confirm the path to the local, cloned copy of the GitHub notebook repository, is correct, check the Notebook Repos interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen. The value should match the ZEPPELIN_NOTEBOOK_DIR configuration property value in the emr_single_node.yml CloudFormation template we executed earlier.

screen_shot_2019-11-18_at_10_04_23_pm

To confirm the Helium Visualizations were pre-installed correctly, using the helium.json file, open the Helium interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen.

screen_shot_2019-11-15_at_7_45_28_pmNote the enabled visualizations. And, it is easy to enable additional plugins through the web interface.screen_shot_2019-11-15_at_7_45_33_pm

New PostgreSQL JDBC Interpreter

If you recall, earlier, we install the required interpreter files on the master node using the following command using the bootstrap script. We will now complete the process of configuring the new PostgreSQL JDBC interpreter. Open the Interpreter interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen.

The title of the new interpreter must match the name we used to install the interpreter files, ‘postgres’. The interpreter group will be ‘jdbc’. There are, minimally, three properties we need to configure for your specific RDS database instance, including default.url, default.user, and default.password. These should match the values you used to create your RDS instance, earlier. Make sure to includes the database name in the default.url. An example is shown below.

default.url: jdbc:postgresql://zeppelin-demo.abcd1234efg56.us-east-1.rds.amazonaws.com:5432/ratings
default.user: masteruser
default.password: 5up3r53cr3tPa55w0rd

We also need to provide a path to the PostgreSQL driver JAR dependency. This path is the location where we placed the JAR using the bootstrap.sh script, earlier, /home/hadoop/extrajars/postgresql-42.2.8.jar. Save the new interpreter and make sure it starts successfully (shows a green icon).

screen_shot_2019-11-17_at_9_03_02_pm.png

screen_shot_2019-11-18_at_6_42_13_am

Switch Interpreters to Python 3

The last thing we need to do is change the Spark and Python interpreters to use Python 3 instead of the default Python 2. On the same screen you used to create a new interpreter, modify the Spark and Python interpreters. First, for the Python interpreter, change the zeppelin.python property to python3.

screen_shot_2019-11-18_at_3_30_39_pm

Next, for the Spark interpreter, change the zeppelin.pyspark.python property to python3.

screen_shot_2019-11-18_at_3_32_34_pm

Part 2

In Part 1 of this post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3 data lake. In Part 2 of this post, we will explore some of Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the notebooks.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

Leave a comment

Event-driven, Serverless Architectures with AWS Lambda, SQS, DynamoDB, and API Gateway

Introduction

In this post, we will explore modern application development using an event-driven, serverless architecture on AWS. To demonstrate this architecture, we will integrate several fully-managed services, all part of the AWS Serverless Computing platform, including Lambda, API Gateway, SQS, S3, and DynamoDB. The result will be an application composed of small, easily deployable, loosely coupled, independently scalable, serverless components.

What is ‘Event-Driven’?

According to Otavio Ferreira, Manager, Amazon SNS, and James Hood, Senior Software Development Engineer, in their AWS Compute Blog, Enriching Event-Driven Architectures with AWS Event Fork Pipelines, “Many customers are choosing to build event-driven applications in which subscriber services automatically perform work in response to events triggered by publisher services. This architectural pattern can make services more reusable, interoperable, and scalable.” This description of an event-driven architecture perfectly captures the essence of the following post. All interactions between application components in this post will be as a direct result of triggering an event.

What is ‘Serverless’?

Mistakingly, many of us think of serverless as just functions (aka Function-as-a-Service or FaaS). When it comes to functions on AWS, Lambda is just one of many fully-managed services that make up the AWS Serverless Computing platform. So, what is ‘serverless’? According to AWS, “Serverless applications don’t require provisioning, maintaining, and administering servers for backend components such as compute, databases, storage, stream processing, message queueing, and more.

As a Developer, one of my favorite features of serverless is the cost, or lack thereof. With serverless on AWS, you pay for consistent throughput or execution duration rather than by server unit, and, at least on AWS, you don’t pay for idle resources. This is not always true of ‘serverless’ offerings on other leading Cloud platforms. Remember, if you’re paying for it but not using it, it’s not serverless.

If you’re paying for it but not using it, it’s not serverless.

Demonstration

To demonstrate an event-driven, serverless architecture, we will build, package, and deploy an application capable of extracting messages from CSV files placed in S3, transforming those messages, queueing them to SQS, and finally, writing the messages to DynamoDB, using Lambda functions throughout. We will also expose a RESTful API, via API Gateway, to perform CRUD-like operations on those messages in DynamoDB.

AWS Technologies

In this demonstration, we will use several AWS serverless services, including the following.

Each Lambda will use function-specific execution roles, part of AWS Identity and Access Management (IAM). We will log the event details and monitor services using Amazon CloudWatch.

To codify, build, package, deploy, and manage the Lambda functions and other AWS resources in a fully automated fashion, we will also use the following AWS services:

Architecture

The high-level architecture for the platform provisioned and deployed in this post is illustrated in the diagram below. There are two separate workflows. In the first workflow (top), data is extracted from CSV files placed in S3, transformed, queued to SQS, and written to DynamoDB, using Python-based Lambda functions throughout. In the second workflow (bottom), data is manipulated in DynamoDB through interactions with a RESTful API, exposed via an API Gateway, and backed by Node.js-based Lambda functions.

new-01-sqs-dynamodb

Using the vast array of current AWS services, there are several ways we could extract, transform, and load data from static files into DynamoDB. The demonstration’s event-driven, serverless architecture represents just one possible approach.

Source Code

All source code for this post is available on GitHub in a single public repository, serverless-sqs-dynamo-demo. To clone the GitHub repository, execute the following command.

git clone --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/serverless-sqs-dynamo-demo.git

The project files relevant to this demonstration are organized as follows.

.
├── README.md
├── lambda_apigtw_to_dynamodb
│   ├── app.js
│   ├── events
│   ├── node_modules
│   ├── package.json
│   └── tests
├── lambda_s3_to_sqs
│   ├── __init__.py
│   ├── app.py
│   ├── requirements.txt
│   └── tests
├── lambda_sqs_to_dynamodb
│   ├── __init__.py
│   ├── app.py
│   ├── requirements.txt
│   └── tests
├── requirements.txt
├── template.yaml
└── sample_data
    ├── data.csv
    ├── data_bad_msg.csv
    └── data_good_msg.csv

Some source code samples in this post are GitHub Gists, which may not display correctly on all social media browsers, such as LinkedIn.

Prerequisites

The demonstration assumes you already have an AWS account. You will need the latest copy of the AWS CLI, SAM CLI, and Python 3 installed on your development machine.

Additionally, you will need two existing S3 buckets. One bucket will be used to store the packaged project files for deployment. The second bucket is where we will place CSV data files, which, in turn, will trigger events that invoke multiple Lambda functions.

Deploying the Project

Before diving into the code, we will deploy the project to AWS. Conveniently, the entire project’s resources are codified in an AWS SAM template. We are using the AWS Serverless Application Model (SAM). AWS SAM is a model used to define serverless applications on AWS. According to the official SAM GitHub project documentation, AWS SAM is based on AWS CloudFormation. A serverless application is defined in a CloudFormation template and deployed as a CloudFormation stack.

Template Parameter

CloudFormation will create and uniquely name the SQS queues and the DynamoDB table. However, to avoid circular references, a common issue when creating resources associated with S3 event notifications, it is easier to use a pre-existing bucket. To start, you will need to change the SAM template’s DataBucketName parameter’s default value to your own S3 bucket name. Again, this bucket is where we will eventually push the CSV data files. Alternately, override the default values using the sam build command, next.

Parameters:
  DataBucketName:
    Type: String
    Description: S3 bucket where CSV files are processed
    Default: your-data-bucket-name

SAM CLI Commands

With the DataBucketName parameter set, proceed to validate, build, package, and deploy the project using the SAM CLI and the commands below. In addition to the sam validate command, I also like to use the aws cloudformation validate-template command to validate templates and catch any potential, additional errors.

Note the S3_BUCKET_BUILD variable, below, refers to the name of the S3 bucket SAM will use package and deploy the project from, as opposed to the S3 bucket, which the CSV data files will be placed into (gist).

After validating the template, SAM will build and package each individual Lambda function and its associated dependencies. Below, we see each individual Lambda function being packaged with a copy of its dependencies.

screen_shot_2019-09-30_at_8_42_41_pm

Once packaged, SAM will deploy the project and create the AWS resources as a CloudFormation stack.

screen_shot_2019-09-30_at_8_43_11_pm

Once the stack creation is complete, use the CloudFormation management console to review the AWS resources created by SAM. There are approximately 14 resources defined in the SAM template, which result in 33 individual resources deployed as part of the CloudFormation stack.

screen_shot_2019-09-30_at_8_44_46_pm

Note the stack’s output values. You will need these values to interact with the deployed platform, later in the demonstration.

screen_shot_2019-09-30_at_8_45_13_pm

Test the Deployed Application

Once the CloudFormation stack has deployed without error, copying a CSV file to the S3 bucket is the quickest way to confirm everything is working. The project includes test data files with 20 rows of test message data. Below is a sample of the CSV file, which is included in the project. The data was collected from IoT devices that measured response time from wired versus wireless devices on a LAN network; the message details are immaterial to this demonstration (gist).

Run the following commands to copy the test data file to your S3 bucket.

S3_DATA_BUCKET=your_data_bucket_name
aws s3 cp sample_data/data.csv s3://$S3_DATA_BUCKET

Visit the DynamoDB management console. You should observe a new DynamoDB table.

screen_shot_2019-09-30_at_8_47_10_pm

Within the new DynamoDB table, you should observe twenty items, corresponding to each of the twenty rows in the CSV file, uploaded to S3.

screen_shot_2019-09-30_at_8_51_07_pm

Drill into an individual item within the table and review its attributes. They should match the rows in the CSV file.

screen_shot_2019-09-30_at_8_51_54_pm

Both the Python- and Node.js-based Lambda functions have their default logging levels set to debug. The debug-level output from each Lambda function is streamed to individual Amazon CloudWatch Log Groups. We can use the CloudWatch logs to troubleshoot any issues with the deployed application. Below we see an example of CloudWatch log entries for the request and response payloads generated from GetMessageFunction Lambda function, which is querying DynamoDB for a single Item.

screen_shot_2019-10-03_at_9_16_22_pm

Event-Driven Patterns

There are three distinct and discrete event-driven dataflows within the demonstration’s architecture

  1. S3 Event Source for Lambda (S3 to SQS)
  2. SQS Event Source for Lambda (SQS to DynamoDB)
  3. API Gateway Event Source for Lambda (API Gateway to DynamoDB)

Let’s examine each event-driven dataflow and the Lambda code associated with that part of the architecture.

S3 Event Source for Lambda

Whenever a file is copied into the target S3 bucket, an S3 Event Notification triggers an asynchronous invocation of a Lambda. According to AWS, when you invoke a function asynchronously, the Lambda sends the event to the SQS queue. A separate process reads events from the queue and executes your Lambda function.

new-02-sqs-dynamodb

The Lambda’s function handler, written in Python, reads the CSV file, whose filename is contained in the event. The Lambda extracts the rows in the CSV file, transforms the data, and pushes each message to the SQS queue (gist).

Below is an example of a message body, part an SQS message, extracted from a single row of the CSV file, and sent by the Lambda to the SQS queue. The timestamp has been converted to separate date and time fields by the Lambda. The DynamoDB table is part of the SQS message body. The key/value pairs in the Item JSON object reflect the schema of the DynamoDB table (gist).

SQS Event Source for Lambda

According to AWS, SQS offers two types of message queues, Standard and FIFO (First-In-First-Out). An SQS FIFO queue is designed to guarantee that messages are processed exactly once, in the exact order that they are sent. A Standard SQS queue offers maximum throughput, best-effort ordering, and at-least-once delivery.

Examining the SQS management console, you should observe that the CloudFormation stack creates two SQS Standard queues—a primary queue and a Dead Letter Queue (DLQ). According to AWS, Amazon SQS supports dead-letter queues, which other queues (source queues) can target for messages that cannot be processed (consumed) successfully.

screen_shot_2019-09-30_at_8_55_33_pm

Examining the SQS Lambda Triggers tab, you should observe the Lambda, which will be triggered by the SQS events.

screen_shot_2019-09-30_at_8_56_25_pm

When a message is pushed into the SQS queue by the previous process, an SQS event is fired, which synchronously triggers an invocation of the Lambda using the SQS Event Source for Lambda functionality. When a function is invoked synchronously, Lambda runs the function and waits for a response.

new-03-sqs-dynamodb

In the demonstration, the Lambda’s function handler, also written in Python, pulls the message off of the SQS queue and writes the message (DynamoDB put) to the DynamoDB table. Although writing is the primary use case in this demonstration, an event could also trigger a get, scan, update, or delete command to be executed on the DynamoDB table (gist).

API Gateway Event Source for Lambda

Examining the API Gateway management console, you should observe that CloudFormation created a new Edge-optimized API. The API contains several resources and their associated HTTP methods.

screen_shot_2019-09-30_at_9_02_52_pm

Each API resource is associated with a deployed Lambda function. Switching to the Lambda console, you should observe a total of seven new Lambda functions. There are five Lambda functions related to the API, in addition to the Lambda called by the S3 event notifications and the Lambda called by the SQS event notifications.

screen_shot_2019-09-30_at_8_46_24_pm

Examining one of the Lambda functions associated with the API Gateway, we should observe that the API Gateway trigger for the Lambda (lower left and bottom).

screen_shot_2019-09-30_at_8_59_39_pm

When an end-user makes an HTTP(S) request via the RESTful API exposed by the API Gateway, an event is fired, which synchronously invokes a Lambda using the API Gateway Event Source for Lambda functionality. The event contains details about the HTTP request that is received. The event triggers any one of five different Lambda functions, depending on the HTTP request method.

new-04-sqs-dynamodb

The Lambda code, written in Node.js, contains five function handlers. Each handler corresponds to an HTTP method, including GET (DynamoDB get) POST (put), PUT (update), DELETE (delete), and SCAN (scan). Below is an example of the getMessage handler function. The function accepts two inputs. First, a path parameter, the date, which is the primary partition key for the DynamoDB table. Second, a query parameter, the time, which is the primary sort key for the DynamoDB table. Both the primary partition key and sort key must be passed to DynamoDB to retrieve the requested record (gist).

Test the API

To test the Lambda functions, called by our API, we can use the sam local invoke command, part of the SAM CLI. Using this command, we can test the local Lambda functions, without them being deployed to AWS. The command allows us to trigger events, which the Lambda functions will handle. This is useful as we continue to develop, test, and re-deploy the Lambda functions to our Development, Staging, and Production environments.

The local Node.js-based, API-related Lambda functions, just like their deployed copies, will execute commands against the actual DynamoDB on AWS. The Github project contains a set of five sample events, corresponding to the five Lambda functions, which in turn are associated with five different HTTP methods and API resources. For example, the event_getMessage.json event is associated with the GET HTTP method and calls the /message/{date}?time={time} resource endpoint, to return a single item. This event, shown below, triggers the GetMessageFunction Lambda (gist).

We can trigger all the events from using the CLI. The local Lambda expects the DynamoDB table name to exist as an environment variable. Make sure you set it locally, first, before executing the sam local invoke commands (gist).

If the events were successfully handled by the local Lambda functions, in the terminal, you should see the same HTTP response status codes you would expect from calling the RESTful resources via the API Gateway. Below, for example, we see the POST event being handled by the PostMessageFunction Lambda, adding a record to the DynamoDB table, and returning a successful status of 201 Created.

screen_shot_2019-10-04_at_2_50_41_pm.png

Testing the Deployed API

To test the actual deployed API, we can call one of the API’s resources using an HTTP client, such as Postman. To locate the URL used to invoke the API resource, look at the ‘Prod’ Stage for the new API. This can be found in the Stages tab of the API Gateway console. For example, note the Invoke URL for the POST HTTP method of the /message resource, shown below.

screen_shot_2019-10-04_at_3_02_21_pm

Below, we see an example of using Postman to make an HTTP GET request the /message/{date}?time={time} resource. We pass the required query and path parameters for date and for time. The request should receive a single item in response from DynamoDB, via the API Gateway and the associated Lambda. Here, the request was successful, and the Lambda function returns a 200 OK status.

screen_shot_2019-09-30_at_9_20_45_pm

Similarly, below, we see an example of calling the same /message endpoint using the HTTP POST method. In the body of the POST request, we pass the DynamoDB table name and the Item object. Again, the POST is successful, and the Lambda function returns a 201 Created status.

screen_shot_2019-10-03_at_10_05_31_pm

Cleaning Up

To complete the demonstration and remove the AWS resources, run the following commands. It is necessary to delete all objects from the S3 data bucket, first, before deleting the CloudFormation stack. Else, the stack deletion will fail.

S3_DATA_BUCKET=your_data_bucket_name
STACK_NAME=your_stack_name

aws s3 rm s3://$S3_DATA_BUCKET/data.csv # and any other objects

aws cloudformation delete-stack \
  --stack-name $STACK_NAME

Conclusion

In this post, we explored a simple example of building a modern application using an event-driven serverless architecture on AWS. We used several services, all part of the AWS Serverless Computing platform, including Lambda, API Gateway, SQS, S3, and DynamoDB. In addition to these, AWS has additional serverless services, which could enhance this demonstration, in particular, Amazon Kinesis, AWS Step Functions, Amazon SNS, and AWS AppSync.

In a future post, we will look at how to further test the individual components within this demonstration’s application stack, and how to automate its deployment using DevOps and CI/CD principals on AWS.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

Leave a comment