Posts Tagged Big Data
Running Spark Jobs on Amazon EMR with Apache Airflow: Using the new Amazon Managed Workflows for Apache Airflow (Amazon MWAA) Service on AWS
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, DevOps, Python, Software Development on December 24, 2020
Introduction
In the first post of this series, we explored several ways to run PySpark applications on Amazon EMR using AWS services, including AWS CloudFormation, AWS Step Functions, and the AWS SDK for Python. This second post in the series will examine running Spark jobs on Amazon EMR using the recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA) service.
Amazon EMR
According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics.

Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including the use of EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.
AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.
Amazon MWAA
Apache Airflow is a popular open-source platform designed to schedule and monitor workflows. According to Wikipedia, Airflow was created at Airbnb in 2014 to manage the company’s increasingly complex workflows. From the beginning, the project was made open source, becoming an Apache Incubator project in 2016 and a top-level Apache Software Foundation project (TLP) in 2019.
Many organizations build, manage, and maintain Apache Airflow on AWS using compute services such as Amazon EC2 or Amazon EKS. Amazon recently announced Amazon Managed Workflows for Apache Airflow (Amazon MWAA). With the announcement of Amazon MWAA in November 2020, AWS customers can now focus on developing workflow automation, while leaving the management of Airflow to AWS. Amazon MWAA can be used as an alternative to AWS Step Functions for workflow automation on AWS.

Apache recently announced the release of Airflow 2.0.0 on December 17, 2020. The latest 1.x version of Airflow is 1.10.14, released December 12, 2020. However, at the time of this post, Amazon MWAA was running Airflow 1.10.12, released August 25, 2020. Ensure that when you are developing workflows for Amazon MWAA, you are using the correct Apache Airflow 1.10.12 documentation.
The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI.
Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. Given its integration capabilities, Airflow has extensive support for AWS, including Amazon EMR, Amazon S3, AWS Batch, Amazon RedShift, Amazon DynamoDB, AWS Lambda, Amazon Kinesis, and Amazon SageMaker. Outside of support for Amazon S3, most AWS integrations can be found in the Hooks, Secrets, Sensors, and Operators of Airflow codebase’s contrib section.
Getting Started
Source Code
Using this git clone
command, download a copy of this post’s GitHub repository to your local environment.
git clone --branch main --single-branch --depth 1 --no-tags \ https://github.com/garystafford/aws-airflow-demo.git
Preliminary Steps
This post assumes the reader has completed the demonstration in the previous post, Running PySpark Applications on Amazon EMR Methods for Interacting with PySpark on Amazon Elastic MapReduce. This post will re-use many of the last post’s AWS resources, including the EMR VPC, Subnets, Security Groups, AWS Glue Data Catalog, Amazon S3 buckets, EMR Roles, EC2 key pair, AWS Systems Manager Parameter Store parameters, PySpark applications, and Kaggle datasets.
Configuring Amazon MWAA
The easiest way to create a new MWAA Environment is through the AWS Management Console. I strongly suggest that you review the pricing for Amazon MWAA before continuing. The service can be quite costly to operate, even when idle, with the smallest Environment class potentially running into the hundreds of dollars per month.

Using the Console, create a new Amazon MWAA Environment. The Amazon MWAA interface will walk you through the creation process. Note the current ‘Airflow version’, 1.10.12
.

Amazon MWAA requires an Amazon S3 bucket to store Airflow assets. Create a new Amazon S3 bucket. According to the documentation, the bucket must start with the prefix airflow-
. You must also enable Bucket Versioning on the bucket. Specify a dags
folder within the bucket to store Airflow’s Directed Acyclic Graphs (DAG). You can leave the next two options blank since we have no additional Airflow plugins or additional Python packages to install.

With Amazon MWAA, your data is secure by default as workloads run within their own Amazon Virtual Private Cloud (Amazon VPC). As part of the MWAA Environment creation process, you are given the option to have AWS create an MWAA VPC CloudFormation stack.

For this demonstration, choose to have MWAA create a new VPC and associated networking resources.

The MWAA CloudFormation stack contains approximately 22 AWS resources, including a VPC, a pair of public and private subnets, route tables, an Internet Gateway, two NAT Gateways, and associated Elastic IPs (EIP). See the MWAA documentation for more details.


As part of the Amazon MWAA Networking configuration, you must decide if you want web access to Airflow to be public or private. The details of the network configuration can be found in the MWAA documentation. I am choosing public webserver access for this demonstration, but the recommended choice is private for greater security. With the public option, AWS still requires IAM authentication to sign in to the AWS Management Console in order to access the Airflow UI.
You must select an existing VPC Security Group or have MWAA create a new one. For this demonstration, choose to have MWAA create a Security Group for you.
Lastly, select an appropriately-sized Environment class for Airflow based on the scale of your needs. The mw1.small
class will be sufficient for this demonstration.

Finally, for Permissions, you must select an existing Airflow execution service role or create a new role. For this demonstration, create a new Airflow service role. We will later add additional permissions.

Airflow Execution Role
As part of this demonstration, we will be using Airflow to run Spark jobs on EMR (EMR Steps). To allow Airflow to interact with EMR, we must increase the new Airflow execution role’s default permissions. Additional permissions include allowing the new Airflow role to assume the EMR roles using iam:PassRole
. For this demonstration, we will include the two default EMR Service and JobFlow roles, EMR_DefaultRole
and EMR_EC2_DefaultRole
. We will also include the corresponding custom EMR roles created in the previous post, EMR_DemoRole
and EMR_EC2_DemoRole
. For this demonstration, the Airflow service role also requires three specific EMR permissions as shown below. Later in the post, Airflow will also read files from S3, which requires s3:GetObject
permission.
Create a new policy by importing the project’s JSON file, iam_policy/airflow_emr_policy.json
, and attach the new policy to the Airflow service role. Be sure to update the AWS Account ID in the file with your own Account ID.
The Airflow service role, created by MWAA, is shown below with the new policy attached.

Final Architecture
Below is the final high-level architecture for the post’s demonstration. The diagram shows the approximate route of a DAG Run request, in red. The diagram includes an optional S3 Gateway VPC endpoint, not detailed in the post, but recommended for additional security. According to AWS, a VPC endpoint enables you to privately connect your VPC to supported AWS services and VPC endpoint services powered by AWS PrivateLink without requiring an internet gateway. In this case a private connection between the MWAA VPC and Amazon S3. It is also possible to create an EMR Interface VPC Endpoint to securely route traffic directly to EMR from MWAA, instead of connecting over the Internet.

Amazon MWAA Environment
The new MWAA Environment will include a link to the Airflow UI.

Airflow UI
Using the supplied link, you should be able to access the Airflow UI using your web browser.

Our First DAG
The Amazon MWAA documentation includes an example DAG, which contains one of several sample programs, SparkPi, which comes with Spark. I have created a similar DAG that is included in the GitHub project, dags/emr_steps_demo.py
. The DAG will create a minimally-sized single-node EMR cluster with no Core or Task nodes. The DAG will then use that cluster to submit the calculate_pi
job to Spark. Once the job is complete, the DAG will terminate the EMR cluster.
Upload the DAG to the Airflow S3 bucket’s dags
directory. Substitute your Airflow S3 bucket name in the AWS CLI command below, then run it from the project’s root.
aws s3 cp dags/spark_pi_example.py \
s3://<your_airflow_bucket_name>/dags/
The DAG, spark_pi_example
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

The DAG has no optional configuration to input as JSON. Select ‘Trigger’ to submit the job, as shown below.

The DAG should complete all three tasks successfully, as shown in the DAG’s ‘Graph View’ tab below.

Switching to the EMR Console, you should see the single-node EMR cluster being created.

On the ‘Steps’ tab, you should see that the ‘calculate_pi’ Spark job has been submitted and is waiting for the cluster to be ready to be run.

Triggering DAGs Programmatically
The Amazon MWAA service is available using the AWS Management Console, as well as the Amazon MWAA API using the latest versions of the AWS SDK and AWS CLI. To automate the DAG Run, we could use the AWS CLI and invoke the Airflow CLI via an endpoint on the Apache Airflow Webserver. The Amazon MWAA documentation and Airflow’s CLI documentation explains how.
Below is an example of triggering the spark_pi_example
DAG programmatically using Airflow’s trigger_dag
CLI command. You will need to replace the WEB_SERVER_HOSTNAME
variable with your own Airflow Web Server’s hostname. The ENVIROMENT_NAME
variable assumes only one MWAA environment is returned by jq
.
Analytics Job with Airflow
Next, we will submit an actual analytics job to EMR. If you recall from the previous post, we had four different analytics PySpark applications, which performed analyses on the three Kaggle datasets. For the next DAG, we will run a Spark job that executes the bakery_sales_ssm.py
PySpark application. This job should already exist in the processed
data S3 bucket.
The DAG, dags/bakery_sales.py
, creates an EMR cluster identical to the EMR cluster created with the run_job_flow.py Python script in the previous post. All EMR configuration options available when using AWS Step Functions are available with Airflow’s airflow.contrib.operators
and airflow.contrib.sensors
packages for EMR.
Airflow leverages Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. The Bakery Sales DAG contains eleven Jinja template variables. Seven variables will be configured in the Airflow UI by importing a JSON file into the ‘Admin’ ⇨ ‘Variables’ tab. These template variables are prefixed with var.value
in the DAG. The other three variables will be passed as a DAG Run configuration as a JSON blob, similar to the previous DAG example. These template variables are prefixed with dag_run.conf
.
Import Variables into Airflow UI
First, to import the required variables, change the values in the project’s airflow_variables/admin_variables_bakery.json
file. You will need to update the values for bootstrap_bucket
, emr_ec2_key_pair
, logs_bucket
, and work_bucket
. The three S3 buckets should all exist from the previous post.
Next, import the variables file from the ‘Admin’ ⇨ ‘Variables’ tab of the Airflow UI.

Upload the DAG, dags/bakery_sales.py
, to the Airflow S3 bucket, similar to the first DAG.
aws s3 cp dags/bakery_sales.py \
s3://<your_airflow_bucket_name>/dags/
The second DAG, bakery_sales
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job.

Input the three required parameters in the ‘Trigger DAG’ interface, used to pass the DAG Run configuration, and select ‘Trigger’. A sample of the JSON blob can be found in the project, airflow_variables/dag_run.conf_bakery.json
.
{ "airflow_email": "analytics_team@example.com", "email_on_failure": false, "email_on_retry": false }
This is just for demonstration purposes. To send and receive emails, you will need to configure Airflow.

Switching to the EMR Console, you should see the ‘Bakery Sales’ Spark job in the ‘Steps’ tab.

Multi-Step DAG
In our last example, we will use a single DAG to run four Spark jobs in parallel. The Spark job arguments (EmrAddStepsOperator
steps
parameter) will be loaded from an external JSON file residing in Amazon S3, instead of defined in the DAG, as in the previous two DAG examples. Additionally, the EMR cluster specifications (EmrCreateJobFlowOperator
job_flow_overrides
parameter) will also be loaded from an external JSON file. Using this method, we decouple the EMR provisioning and job details from the DAG. DataOps or DevOps Engineers might manage the EMR cluster specifications as code, while Data Analysts manage the Spark job arguments, separately. A third team might manage the DAG itself.
We still maintain the variables in the JSON files. The DAG will read the JSON file-based configuration into the tasks as JSON blobs, then replace the Jinja template variables (expressions) in the DAG with variable values defined in Airflow or input as parameters when the DAG is triggered.
Below we see a snippet of two of the four Spark submit-job
job definitions (steps
), which have been moved to a separate JSON file, emr_steps/emr_steps.json
.
Below are the EMR cluster specifications (job_flow_overrides)
, which have been moved to a separate JSON file, job_flow_overrides/job_flow_overrides.json
.
Decoupling the configurations reduces the DAG from well over 200 lines of code to less than 75 lines. Note lines 56 and 63 of the DAG below. Instead of referencing a local object variable, the parameters now reference the function, get_objects(key, bucket_name)
, which loads the JSON.
This time, we need to upload three files to S3, the DAG to the Airflow S3 bucket, and the two JSON files to the EMR Work S3 bucket. Change the bucket names to match your environment, then run the three AWS CLI commands shown below.
aws s3 cp emr_steps/emr_steps.json \
s3://emr-demo-work-123412341234
-us-east-1/emr_steps/
aws s3 cp job_flow_overrides/job_flow_overrides.json \
s3://emr-demo-work-123412341234
-us-east-1/job_flow_overrides/
aws s3 cp dags/multiple_steps.py \
s3://airflow-123412341234
-us-east-1/dags/
The second DAG, multiple_steps
, should automatically appear in the Airflow UI. Click on ‘Trigger DAG’ to create a new EMR cluster and start the Spark job. The three required input parameters in the ‘Trigger DAG’ interface are identical to the previous bakery_sales
DAG. A sample of that JSON blob can be found in the project at airflow_variables/dag_run.conf_bakery.json
.

Below we see that the EMR cluster has completed the four Spark jobs (EMR Steps) and has auto-terminated. Note that all four jobs were started at the exact same time. If you recall from the previous post, this is possible because we preset the ‘Concurrency’ level to 5.

Triggering DAGs Programmatically
AWS CLI
Similar to the previous example, below we can trigger the multiple_steps
DAG programmatically using Airflow’s trigger_dag
CLI command. Note the addition of the —-conf
named argument, which passes the configuration, containing three key/value pairs, to the trigger command as a JSON blob.
AWS SDK
Airflow DAGs can also be triggered using the AWS SDK. For example, with boto3
for Python, we could use a script, similar to the following to remotely trigger a DAG.
Cleaning Up
Once you are done with the MWAA Environment, be sure to delete it as soon as possible to save additional costs. Also, delete the MWAA-VPC
CloudFormation stack. These resources, like the two NAT Gateways, will also continue to generate additional costs.
aws mwaa delete-environment --name <your_mwaa_environment_name>
aws cloudformation delete-stack --stack-name MWAA-VPC
Conclusion
In this second post in the series, we explored using the newly released Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to run PySpark applications on Amazon Elastic MapReduce (Amazon EMR). In future posts, we will explore the use of Jupyter and Zeppelin notebooks for data science, scientific computing, and machine learning on EMR.
If you are interested in learning more about configuring Amazon MWAA and Airflow, see my recent post, Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options.
This blog represents my own viewpoints and not of my employer, Amazon Web Services. All product names, logos, and brands are the property of their respective owners.
Executing Amazon Athena Queries from JetBrains PyCharm
Posted by Gary A. Stafford in AWS, Big Data, Cloud, Python, Software Development, SQL on January 8, 2020
Amazon Athena
According to Amazon, Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Amazon Athena supports and works with a variety of popular data file formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet.
The underlying technology behind Amazon Athena is Presto, the popular, open-source distributed SQL query engine for big data, created by Facebook. According to AWS, the Athena query engine is based on Presto 0.172. Athena is ideal for quick, ad-hoc querying, but it can also handle complex analysis, including large joins, window functions, and arrays. In addition to Presto, Athena also uses Apache Hive to define tables.
Athena Query Editor
In the previous post, Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight, we used the Athena Query Editor to construct and test SQL queries against semi-structured data in an S3-based Data Lake. The Athena Query Editor has many of the basic features Data Engineers and Analysts expect, including SQL syntax highlighting, code auto-completion, and query formatting. Queries can be run directly from the Editor, saved for future reference, and query results downloaded. The Editor can convert SELECT
queries to CREATE TABLE AS
(CTAS) and CREATE VIEW AS
statements. Access to AWS Glue data sources is also available from within the Editor.
Full-Featured IDE
Although the Athena Query Editor is fairly functional, many Engineers perform a majority of their software development work in a fuller-featured IDE. The choice of IDE may depend on one’s predominant programming language. According to the PYPL Index, the ten most popular, current IDEs are:
- Microsoft Visual Studio
- Android Studio
- Eclipse
- Visual Studio Code
- Apache NetBeans
- JetBrains PyCharm
- JetBrains IntelliJ
- Apple Xcode
- Sublime Text
- Atom
Within the domains of data science, big data analytics, and data analysis, languages such as SQL, Python, Java, Scala, and R are common. Although I work in a variety of IDEs, my go-to choices are JetBrains PyCharm for Python (including for PySpark and Jupyter Notebook development) and JetBrains IntelliJ for Java and Scala (including Apache Spark development). Both these IDEs also support many common SQL-based technologies, out-of-the-box, and are easily extendable to add new technologies.
Athena Integration with PyCharm
Utilizing the extensibility of the JetBrains suite of professional development IDEs, it is simple to add Amazon Athena to the list of available database drivers and make JDBC (Java Database Connectivity) connections to Athena instances on AWS.
Downloading the Athena JDBC Driver
To start, download the Athena JDBC Driver from Amazon. There are two versions, based on your choice of Java JDKs. Considering Java 8 was released six years ago (March 2014), most users will likely want the AthenaJDBC42-2.0.9.jar is compatible with JDBC 4.2 and JDK 8.0 or later.
Installation Guide
AWS also supplies a JDBC Driver Installation and Configuration Guide. The guide, as well as the Athena JDBC and ODBC Drivers, are produced by Simba Technologies (acquired by Magnitude Software). Instructions for creating an Athena Driver starts on page 23.
Creating a New Athena Driver
From PyCharm’s Database Tool Window, select the Drivers dialog box, select the downloaded Athena JDBC Driver JAR. Select com.simba.athena.jdbc.Driver
in the Class dropdown. Name the Driver, ‘Amazon Athena.’
You can configure the Athena Driver further, using the Options and Advanced tabs.
Creating a New Athena Data Source
From PyCharm’s Database Tool Window, select the Data Source dialog box to create a new connection to your Athena instance. Choose ‘Amazon Athena’ from the list of available Database Drivers.
You will need four items to create an Athena Data Source:
- Your IAM User Access Key ID
- Your IAM User Secret Access Key
- The AWS Region of your Athena instance (e.g., us-east-1)
- An existing S3 bucket location to store query results
The Athena connection URL is a combination of the AWS Region and the S3 bucket, items 3 and 4, above. The format of the Athena connection URL is as follows.
jdbc:awsathena://AwsRegion=your-region;S3OutputLocation=s3://your-bucket-name/query-results-path
Give the new Athena Data Source a logical Name, input the User (Access Key ID), Password (Secret Access Key), and the Athena URL. To test the Athena Data Source, use the ‘Test Connection’ button.
You can create multiple Athena Data Sources using the Athena Driver. For example, you may have separate Development, Test, and Production instances of Athena, each in a different AWS Account.
Data Access
Once a successful connection has been made, switching to the Schemas tab, you should see a list of available AWS Glue Data Catalog databases. Below, we see the AWS Glue Catalog, which we created in the prior post. This Glue Data Catalog database contains ten metadata tables, each corresponding to a semi-structured, file-based data source in an S3-based data lake.
In the example below, I have chosen to limit the new Athena Data Source to a single Data Catalog database, to which the Data Source’s IAM User has access. Applying the core AWS security principle of granting least privilege, IAM Users should only have the permissions required to perform a specific set of approved tasks. This principle applies to the Glue Data Catalog databases, metadata tables, and the underlying S3 data sources.
Querying Athena from PyCharm
From within the PyCharm’s Database Tool Window, you should now see a list of the metadata tables defined in your AWS Glue Data Catalog database(s), as well as the individual columns within each table.
Similar to the Athena Query Editor, you can write SQL queries against the database tables in PyCharm. Like the Athena Query Editor, PyCharm has standard features SQL syntax highlighting, code auto-completion, and query formatting. Right-click on the Athena Data Source and choose New, then Console, to start.
Be mindful when writing queries and searching the Internet for SQL references, the Athena query engine is based on Presto 0.172. The current version of Presto, 0.234, is more than 50 releases ahead of the current Athena version. Both Athena and Presto functionality continue to change and diverge. There are also additional considerations and limitations for SQL queries in Athena to be aware of.
Whereas the Athena Query Editor is limited to only one query per query tab, in PyCharm, we can write and run multiple SQL queries in the same console window and have multiple console sessions opened to Athena at the same time.
By default, PyCharm’s query results are limited to the first ten rows of data. The number of rows displayed, as well as many other preferences, can be changed in the PyCharm’s Database Preferences dialog box.
Saving Queries and Exporting Results
In PyCharm, Athena queries can be saved as part of your PyCharm projects, as .sql files. Whereas the Athena Query Editor is limited to CSV, in PyCharm, query results can be exported in a variety of standard data file formats.
Athena Query History
All Athena queries ran from PyCharm are recorded in the History tab of the Athena Console. Although PyCharm shows query run times, the Athena History tab also displays the amount of data scanned. Knowing the query run time and volume of data scanned is useful when performance tuning queries.
Other IDEs
The technique shown for JetBrains PyCharm can also be applied to other JetBrains products, including GoLand, DataGrip, PhpStorm, and IntelliJ (shown below).
This blog represents my own view points and not of my employer, Amazon Web Services.
Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 1
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Cloud, Python, Serverless, Software Development on January 5, 2020
Introduction
According to Wikipedia, data analysis is “a process of inspecting, cleansing, transforming, and modeling data with the goal of discovering useful information, informing conclusion, and supporting decision-making.” In this two-part post, we will explore how to get started with data analysis on AWS, using the serverless capabilities of Amazon Athena, AWS Glue, Amazon QuickSight, Amazon S3, and AWS Lambda. We will learn how to use these complementary services to transform, enrich, analyze, and visualize semi-structured data.
Data Analysis—discovering useful information, informing conclusion, and supporting decision-making. –Wikipedia
In part one, we will begin with raw, semi-structured data in multiple formats. We will discover how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We will build an S3-based data lake, and learn how AWS leverages open-source technologies, such as Presto, Apache Hive, and Apache Parquet. In part two, we will learn how to further analyze and visualize the data using Amazon QuickSight. Here’s a quick preview of what we will build in part one of the post.
Demonstration
In this demonstration, we will adopt the persona of a large, US-based electric energy provider. The energy provider has developed its next-generation Smart Electrical Monitoring Hub (Smart Hub). They have sold the Smart Hub to a large number of residential customers throughout the United States. The hypothetical Smart Hub wirelessly collects detailed electrical usage data from individual, smart electrical receptacles and electrical circuit meters, spread throughout the residence. Electrical usage data is encrypted and securely transmitted from the customer’s Smart Hub to the electric provider, who is running their business on AWS.
Customers are able to analyze their electrical usage with fine granularity, per device, and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.
Preview of post’s data in Amazon QuickSight.
The original concept for the Smart Hub was developed as part of a multi-day training and hackathon, I recently attended with an AWSome group of AWS Solutions Architects in San Francisco. As a team, we developed the concept of the Smart Hub integrated with a real-time, serverless, streaming data architecture, leveraging AWS IoT Core, Amazon Kinesis, AWS Lambda, and Amazon DynamoDB.
From left: Bruno Giorgini, Mahalingam (‘Mahali’) Sivaprakasam, Gary Stafford, Amit Kumar Agrawal, and Manish Agarwal.
This post will focus on data analysis, as opposed to the real-time streaming aspect of data capture or how the data is persisted on AWS.
High-level AWS architecture diagram of the demonstration.
Featured Technologies
The following AWS services and open-source technologies are featured prominently in this post.
Amazon S3-based Data Lake
An Amazon S3-based Data Lake uses Amazon S3 as its primary storage platform. Amazon S3 provides an optimal foundation for a data lake because of its virtually unlimited scalability, from gigabytes to petabytes of content. Amazon S3 provides ‘11 nines’ (99.999999999%) durability. It has scalable performance, ease-of-use features, and native encryption and access control capabilities.
AWS Glue
AWS Glue is a fully managed extract, transform, and load (ETL) service to prepare and load data for analytics. AWS Glue discovers your data and stores the associated metadata (e.g., table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.
AWS Glue Data Catalog
The AWS Glue Data Catalog is an Apache Hive Metastore compatible, central repository to store structural and operational metadata for data assets. For a given data set, store table definition, physical location, add business-relevant attributes, as well as track how the data has changed over time.
AWS Glue Crawler
An AWS Glue Crawler connects to a data store, progresses through a prioritized list of classifiers to extract the schema of your data and other statistics, and then populates the Glue Data Catalog with this metadata. Crawlers can run periodically to detect the availability of new data as well as changes to existing data, including table definition changes. Crawlers automatically add new tables, new partitions to an existing table, and new versions of table definitions. You can even customize Glue Crawlers to classify your own file types.
AWS Glue ETL Job
An AWS Glue ETL Job is the business logic that performs extract, transform, and load (ETL) work in AWS Glue. When you start a job, AWS Glue runs a script that extracts data from sources, transforms the data, and loads it into targets. AWS Glue generates a PySpark or Scala script, which runs on Apache Spark.
Amazon Athena
Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena supports and works with a variety of standard data formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet. Athena is integrated, out-of-the-box, with AWS Glue Data Catalog. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.
The underlying technology behind Amazon Athena is Presto, the open-source distributed SQL query engine for big data, created by Facebook. According to the AWS, the Athena query engine is based on Presto 0.172 (released April 9, 2017). In addition to Presto, Athena uses Apache Hive to define tables.
Amazon QuickSight
Amazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that can then be accessed from any device, and embedded into your applications, portals, and websites.
AWS Lambda
AWS Lambda automatically runs code without requiring the provisioning or management servers. AWS Lambda automatically scales applications by running code in response to triggers. Lambda code runs in parallel. With AWS Lambda, you are charged for every 100ms your code executes and the number of times your code is triggered. You pay only for the compute time you consume.
Smart Hub Data
Everything in this post revolves around data. For the post’s demonstration, we will start with four categories of raw, synthetic data. Those data categories include Smart Hub electrical usage data, Smart Hub sensor mapping data, Smart Hub residential locations data, and electrical rate data. To demonstrate the capabilities of AWS Glue to handle multiple data formats, the four categories of raw data consist of three distinct file formats: XML, JSON, and CSV. I have attempted to incorporate as many ‘real-world’ complexities into the data without losing focus on the main subject of the post. The sample datasets are intentionally small to keep your AWS costs to a minimum for the demonstration.
To further reduce costs, we will use a variety of data partitioning schemes. According to AWS, by partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost. We have very little data for the demonstration, in which case partitioning may negatively impact query performance. However, in a ‘real-world’ scenario, there would be millions of potential residential customers generating terabytes of data. In that case, data partitioning would be essential for both cost and performance.
Smart Hub Electrical Usage Data
The Smart Hub’s time-series electrical usage data is collected from the customer’s Smart Hub. In the demonstration’s sample electrical usage data, each row represents a completely arbitrary five-minute time interval. There are a total of ten electrical sensors whose electrical usage in kilowatt-hours (kW) is recorded and transmitted. Each Smart Hub records and transmits electrical usage for 10 device sensors, 288 times per day (24 hr / 5 min intervals), for a total of 2,880 data points per day, per Smart Hub. There are two days worth of usage data for the demonstration, for a total of 5,760 data points. The data is stored in JSON Lines format. The usage data will be partitioned in the Amazon S3-based data lake by date (e.g., ‘dt=2019-12-21’).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"loc_id":"b6a8d42425fde548","ts":1576915200,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04167}} | |
{"loc_id":"b6a8d42425fde548","ts":1576915500,"data":{"s_01":0,"s_02":0.00552,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04147}} | |
{"loc_id":"b6a8d42425fde548","ts":1576915800,"data":{"s_01":0.29267,"s_02":0.00642,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04207}} | |
{"loc_id":"b6a8d42425fde548","ts":1576916100,"data":{"s_01":0.29207,"s_02":0.00592,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04137}} | |
{"loc_id":"b6a8d42425fde548","ts":1576916400,"data":{"s_01":0.29217,"s_02":0.00622,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04157}} | |
{"loc_id":"b6a8d42425fde548","ts":1576916700,"data":{"s_01":0,"s_02":0.00562,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04197}} | |
{"loc_id":"b6a8d42425fde548","ts":1576917000,"data":{"s_01":0,"s_02":0.00512,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04257}} | |
{"loc_id":"b6a8d42425fde548","ts":1576917300,"data":{"s_01":0,"s_02":0.00522,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04177}} | |
{"loc_id":"b6a8d42425fde548","ts":1576917600,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04267}} | |
{"loc_id":"b6a8d42425fde548","ts":1576917900,"data":{"s_01":0,"s_02":0.00612,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04237}} |
Note the electrical usage data contains nested data. The electrical usage for each of the ten sensors is contained in a JSON array, within each time series entry. The array contains ten numeric values of type, double.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"loc_id": "b6a8d42425fde548", | |
"ts": 1576916400, | |
"data": { | |
"s_01": 0.29217, | |
"s_02": 0.00622, | |
"s_03": 0, | |
"s_04": 0, | |
"s_05": 0, | |
"s_06": 0, | |
"s_07": 0, | |
"s_08": 0, | |
"s_09": 0, | |
"s_10": 0.04157 | |
} | |
} |
Real data is often complex and deeply nested. Later in the post, we will see that AWS Glue can map many common data types, including nested data objects, as illustrated below.
Smart Hub Sensor Mappings
The Smart Hub sensor mappings data maps a sensor column in the usage data (e.g., ‘s_01’ to the corresponding actual device (e.g., ‘Central Air Conditioner’). The data contains the device location, wattage, and the last time the record was modified. The data is also stored in JSON Lines format. The sensor mappings data will be partitioned in the Amazon S3-based data lake by the state of the residence (e.g., ‘state=or’ for Oregon).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"loc_id":"b6a8d42425fde548","id":"s_01","description":"Central Air Conditioner","location":"N/A","watts":3500,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_02","description":"Ceiling Fan","location":"Master Bedroom","watts":65,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_03","description":"Clothes Dryer","location":"Basement","watts":5000,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_04","description":"Clothes Washer","location":"Basement","watts":1800,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_05","description":"Dishwasher","location":"Kitchen","watts":900,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_06","description":"Flat Screen TV","location":"Living Room","watts":120,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_07","description":"Microwave Oven","location":"Kitchen","watts":1000,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_08","description":"Coffee Maker","location":"Kitchen","watts":900,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_09","description":"Hair Dryer","location":"Master Bathroom","watts":2000,"last_modified":1559347200} | |
{"loc_id":"b6a8d42425fde548","id":"s_10","description":"Refrigerator","location":"Kitchen","watts":500,"last_modified":1559347200} |
Smart Hub Locations
The Smart Hub locations data contains the geospatial coordinates, home address, and timezone for each residential Smart Hub. The data is stored in CSV format. The data for the four cities included in this demonstration originated from OpenAddresses, ‘the free and open global address collection.’ There are approximately 4k location records. The location data will be partitioned in the Amazon S3-based data lake by the state of the residence where the Smart Hub is installed (e.g., ‘state=or’ for Oregon).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lon | lat | number | street | unit | city | district | region | postcode | id | hash | tz | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
-122.8077278 | 45.4715614 | 6635 | SW JUNIPER TER | 97008 | b6a8d42425fde548 | America/Los_Angeles | ||||||
-122.8356634 | 45.4385864 | 11225 | SW PINTAIL LOOP | 97007 | 08ae3df798df8b90 | America/Los_Angeles | ||||||
-122.8252379 | 45.4481709 | 9930 | SW WRANGLER PL | 97008 | 1c7e1f7df752663e | America/Los_Angeles | ||||||
-122.8354211 | 45.4535977 | 9174 | SW PLATINUM PL | 97007 | b364854408ee431e | America/Los_Angeles | ||||||
-122.8315771 | 45.4949449 | 15040 | SW MILLIKAN WAY | # 233 | 97003 | 0e97796ba31ba3b4 | America/Los_Angeles | |||||
-122.7950339 | 45.4470259 | 10006 | SW CONESTOGA DR | # 113 | 97008 | 2b5307be5bfeb026 | America/Los_Angeles | |||||
-122.8072836 | 45.4908594 | 12600 | SW CRESCENT ST | # 126 | 97005 | 4d74167f00f63f50 | America/Los_Angeles | |||||
-122.8211801 | 45.4689303 | 7100 | SW 140TH PL | 97008 | c5568631f0b9de9c | America/Los_Angeles | ||||||
-122.831154 | 45.4317057 | 15050 | SW MALLARD DR | # 101 | 97007 | dbd1321080ce9682 | America/Los_Angeles | |||||
-122.8162856 | 45.4442878 | 10460 | SW 136TH PL | 97008 | 008faab8a9a3e519 | America/Los_Angeles |
Electrical Rates
Lastly, the electrical rate data contains the cost of electricity. In this demonstration, the assumption is that the rate varies by state, by month, and by the hour of the day. The data is stored in XML, a data export format still common to older, legacy systems. The electrical rate data will not be partitioned in the Amazon S3-based data lake.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<root> | |
<row> | |
<state>or</state> | |
<year>2019</year> | |
<month>12</month> | |
<from>19:00:00</from> | |
<to>19:59:59</to> | |
<type>peak</type> | |
<rate>12.623</rate> | |
</row> | |
<row> | |
<state>or</state> | |
<year>2019</year> | |
<month>12</month> | |
<from>20:00:00</from> | |
<to>20:59:59</to> | |
<type>partial-peak</type> | |
<rate>7.232</rate> | |
</row> | |
<row> | |
<state>or</state> | |
<year>2019</year> | |
<month>12</month> | |
<from>21:00:00</from> | |
<to>21:59:59</to> | |
<type>partial-peak</type> | |
<rate>7.232</rate> | |
</row> | |
<row> | |
<state>or</state> | |
<year>2019</year> | |
<month>12</month> | |
<from>22:00:00</from> | |
<to>22:59:59</to> | |
<type>off-peak</type> | |
<rate>4.209</rate> | |
</row> | |
</root> |
Data Analysis Process
Due to the number of steps involved in the data analysis process in the demonstration, I have divided the process into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).
Full data analysis workflow diagram (click to enlarge…)
Raw Data Ingestion
In the Raw Data Ingestion stage, semi-structured CSV-, XML-, and JSON-format data files are copied to a secure Amazon Simple Storage Service (S3) bucket. Within the bucket, data files are organized into folders based on their physical data structure (schema). Due to the potentially unlimited number of data files, files are further organized (partitioned) into subfolders. Organizational strategies for data files are based on date, time, geographic location, customer id, or other common data characteristics.
This collection of semi-structured data files, S3 buckets, and partitions form what is referred to as a Data Lake. According to AWS, a data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.
A series of AWS Glue Crawlers process the raw CSV-, XML-, and JSON-format files, extracting metadata, and creating table definitions in the AWS Glue Data Catalog. According to AWS, an AWS Glue Data Catalog contains metadata tables, where each table specifies a single data store.
Data Transformation
In the Data Transformation stage, the raw data in the previous stage is transformed. Data transformation may include both modifying the data and changing the data format. Data modifications include data cleansing, re-casting data types, changing date formats, field-level computations, and field concatenation.
The data is then converted from CSV-, XML-, and JSON-format to Apache Parquet format and written back to the Amazon S3-based data lake. Apache Parquet is a compressed, efficient columnar storage format. Amazon Athena, like many Cloud-based services, charges you by the amount of data scanned per query. Hence, using data partitioning, bucketing, compression, and columnar storage formats, like Parquet, will reduce query cost.
Lastly, the transformed Parquet-format data is cataloged to new tables, alongside the raw CSV, XML, and JSON data, in the Glue Data Catalog.
Data Enrichment
According to ScienceDirect, data enrichment or augmentation is the process of enhancing existing information by supplementing missing or incomplete data. Typically, data enrichment is achieved by using external data sources, but that is not always the case.
Data Enrichment—the process of enhancing existing information by supplementing missing or incomplete data. –ScienceDirect
In the Data Enrichment stage, the Parquet-format Smart Hub usage data is augmented with related data from the three other data sources: sensor mappings, locations, and electrical rates. The customer’s Smart Hub usage data is enriched with the customer’s device types, the customer’s timezone, and customer’s electricity cost per monitored period based on the customer’s geographic location and time of day.
Once the data is enriched, it is converted to Parquet and optimized for query performance, stored in the data lake, and cataloged. At this point, the original CSV-, XML-, and JSON-format raw data files, the transformed Parquet-format data files, and the Parquet-format enriched data files are all stored in the Amazon S3-based data lake and cataloged in the Glue Data Catalog.
Data Visualization
In the final Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for visualization and Business Intelligence, which integrate with Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo. In this demonstration, we will focus on Amazon QuickSight.
Getting Started
Requirements
To follow along with the demonstration, you will need an AWS Account and a current version of the AWS CLI. To get the most from the demonstration, you should also have Python 3 and jq installed in your work environment.
Source Code
All source code for this post can be found on GitHub. Use the following command to clone a copy of the project.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone \ | |
–branch master –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/athena-glue-quicksight-demo.git |
Source code samples in this post are displayed as GitHub Gists, which will not display correctly on some mobile and social media browsers.
TL;DR?
Just want the jump in without reading the instructions? All the AWS CLI commands, found within the post, are consolidated in the GitHub project’s README file.
CloudFormation Stack
To start, create the ‘smart-hub-athena-glue-stack’ CloudFormation stack using the smart-hub-athena-glue.yml template. The template will create (3) Amazon S3 buckets, (1) AWS Glue Data Catalog Database, (5) Data Catalog Database Tables, (6) AWS Glue Crawlers, (1) AWS Glue ETL Job, and (1) IAM Service Role for AWS Glue.
Make sure to change the DATA_BUCKET
, SCRIPT_BUCKET
, and LOG_BUCKET
variables, first, to your own unique S3 bucket names. I always suggest using the standard AWS 3-part convention of 1) descriptive name, 2) AWS Account ID or Account Alias, and 3) AWS Region, to name your bucket (e.g. ‘smart-hub-data-123456789012-us-east-1’).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# *** CHANGE ME *** | |
BUCKET_SUFFIX="123456789012-us-east-1" | |
DATA_BUCKET="smart-hub-data-${BUCKET_SUFFIX}" | |
SCRIPT_BUCKET="smart-hub-scripts-${BUCKET_SUFFIX}" | |
LOG_BUCKET="smart-hub-logs-${BUCKET_SUFFIX}" | |
aws cloudformation create-stack \ | |
–stack-name smart-hub-athena-glue-stack \ | |
–template-body file://cloudformation/smart-hub-athena-glue.yml \ | |
–parameters ParameterKey=DataBucketName,ParameterValue=${DATA_BUCKET} \ | |
ParameterKey=ScriptBucketName,ParameterValue=${SCRIPT_BUCKET} \ | |
ParameterKey=LogBucketName,ParameterValue=${LOG_BUCKET} \ | |
–capabilities CAPABILITY_NAMED_IAM |
Raw Data Files
Next, copy the raw CSV-, XML-, and JSON-format data files from the local project to the DATA_BUCKET
S3 bucket (steps 1a-1b in workflow diagram). These files represent the beginnings of the S3-based data lake. Each category of data uses a different strategy for organizing and separating the files. Note the use of the Apache Hive-style partitions (e.g., /smart_hub_data_json/dt=2019-12-21
). As discussed earlier, the assumption is that the actual, large volume of data in the data lake would necessitate using partitioning to improve query performance.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# location data | |
aws s3 cp data/locations/denver_co_1576656000.csv \ | |
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=co/ | |
aws s3 cp data/locations/palo_alto_ca_1576742400.csv \ | |
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ca/ | |
aws s3 cp data/locations/portland_metro_or_1576742400.csv \ | |
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=or/ | |
aws s3 cp data/locations/stamford_ct_1576569600.csv \ | |
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ct/ | |
# sensor mapping data | |
aws s3 cp data/mappings/ \ | |
s3://${DATA_BUCKET}/sensor_mappings_json/state=or/ \ | |
–recursive | |
# electrical usage data | |
aws s3 cp data/usage/2019-12-21/ \ | |
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-21/ \ | |
–recursive | |
aws s3 cp data/usage/2019-12-22/ \ | |
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-22/ \ | |
–recursive | |
# electricity rates data | |
aws s3 cp data/rates/ \ | |
s3://${DATA_BUCKET}/electricity_rates_xml/ \ | |
–recursive |
Confirm the contents of the DATA_BUCKET
S3 bucket with the following command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 ls s3://${DATA_BUCKET}/ \ | |
–recursive –human-readable –summarize |
There should be a total of (14) raw data files in the DATA_BUCKET
S3 bucket.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2020-01-04 14:39:51 20.0 KiB electricity_rates_xml/2019_12_1575270000.xml | |
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/08ae3df798df8b90_1550908800.json | |
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/1c7e1f7df752663e_1559347200.json | |
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/b6a8d42425fde548_1568314800.json | |
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/08ae3df798df8b90_1576915200.json | |
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/1c7e1f7df752663e_1576915200.json | |
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/b6a8d42425fde548_1576915200.json | |
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/08ae3df798df8b90_15770016000.json | |
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/1c7e1f7df752663e_1577001600.json | |
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/b6a8d42425fde548_15770016001.json | |
2020-01-04 14:39:39 89.7 KiB smart_hub_locations_csv/state=ca/palo_alto_ca_1576742400.csv | |
2020-01-04 14:39:37 84.2 KiB smart_hub_locations_csv/state=co/denver_co_1576656000.csv | |
2020-01-04 14:39:44 78.6 KiB smart_hub_locations_csv/state=ct/stamford_ct_1576569600.csv | |
2020-01-04 14:39:42 91.6 KiB smart_hub_locations_csv/state=or/portland_metro_or_1576742400.csv | |
Total Objects: 14 | |
Total Size: 636.7 KiB |
Lambda Functions
Next, package the (5) Python3.8-based AWS Lambda functions for deployment.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pushd lambdas/athena-json-to-parquet-data || exit | |
zip -r package.zip index.py | |
popd || exit | |
pushd lambdas/athena-csv-to-parquet-locations || exit | |
zip -r package.zip index.py | |
popd || exit | |
pushd lambdas/athena-json-to-parquet-mappings || exit | |
zip -r package.zip index.py | |
popd || exit | |
pushd lambdas/athena-complex-etl-query || exit | |
zip -r package.zip index.py | |
popd || exit | |
pushd lambdas/athena-parquet-to-parquet-elt-data || exit | |
zip -r package.zip index.py | |
popd || exit |
Copy the five Lambda packages to the SCRIPT_BUCKET
S3 bucket. The ZIP archive Lambda packages are accessed by the second CloudFormation stack, smart-hub-serverless. This CloudFormation stack, which creates the Lambda functions, will fail to deploy if the packages are not found in the SCRIPT_BUCKET
S3 bucket.
I have chosen to place the packages in a different S3 bucket then the raw data files. In a real production environment, these two types of files would be separated, minimally, into separate buckets for security. Remember, only data should go into the data lake.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 cp lambdas/athena-json-to-parquet-data/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_data/ | |
aws s3 cp lambdas/athena-csv-to-parquet-locations/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_csv_to_parquet_locations/ | |
aws s3 cp lambdas/athena-json-to-parquet-mappings/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_mappings/ | |
aws s3 cp lambdas/athena-complex-etl-query/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_complex_etl_query/ | |
aws s3 cp lambdas/athena-parquet-to-parquet-elt-data/package.zip \ | |
s3://${SCRIPT_BUCKET}/lambdas/athena_parquet_to_parquet_elt_data/ |
Create the second ‘smart-hub-lambda-stack’ CloudFormation stack using the smart-hub-lambda.yml CloudFormation template. The template will create (5) AWS Lambda functions and (1) Lambda execution IAM Service Role.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws cloudformation create-stack \ | |
–stack-name smart-hub-lambda-stack \ | |
–template-body file://cloudformation/smart-hub-lambda.yml \ | |
–capabilities CAPABILITY_NAMED_IAM |
At this point, we have deployed all of the AWS resources required for the demonstration using CloudFormation. We have also copied all of the raw CSV-, XML-, and JSON-format data files in the Amazon S3-based data lake.
AWS Glue Crawlers
If you recall, we created five tables in the Glue Data Catalog database as part of the CloudFormation stack. One table for each of the four raw data types and one table to hold temporary ELT data later in the demonstration. To confirm the five tables were created in the Glue Data Catalog database, use the Glue Data Catalog Console, or run the following AWS CLI / jq command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue get-tables \ | |
–database-name smart_hub_data_catalog \ | |
| jq -r '.TableList[].Name' |
The five data catalog tables should be as follows.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
electricity_rates_xml | |
etl_tmp_output_parquet | |
sensor_mappings_json | |
smart_hub_data_json | |
smart_hub_locations_csv |
We also created six Glue Crawlers as part of the CloudFormation template. Four of these Crawlers are responsible for cataloging the raw CSV-, XML-, and JSON-format data from S3 into the corresponding, existing Glue Data Catalog database tables. The Crawlers will detect any new partitions and add those to the tables as well. Each Crawler corresponds to one of the four raw data types. Crawlers can be scheduled to run periodically, cataloging new data and updating data partitions. Crawlers will also create a Data Catalog database tables. We use Crawlers to create new tables, later in the post.
Run the four Glue Crawlers using the AWS CLI (step 1c in workflow diagram).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-crawler –name smart-hub-locations-csv | |
aws glue start-crawler –name smart-hub-sensor-mappings-json | |
aws glue start-crawler –name smart-hub-data-json | |
aws glue start-crawler –name smart-hub-rates-xml |
You can check the Glue Crawler Console to ensure the four Crawlers finished successfully.
Alternately, use another AWS CLI / jq command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue get-crawler-metrics \ | |
| jq -r '.CrawlerMetricsList[] | "\(.CrawlerName): \(.StillEstimating), \(.TimeLeftSeconds)"' \ | |
| grep "^smart-hub-[A-Za-z-]*" |
When complete, all Crawlers should all be in a state of ‘Still Estimating = false’ and ‘TimeLeftSeconds = 0’. In my experience, the Crawlers can take up one minute to start, after the estimation stage, and one minute to stop when complete.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
smart-hub-data-json: true, 0 | |
smart-hub-etl-tmp-output-parquet: false, 0 | |
smart-hub-locations-csv: false, 15 | |
smart-hub-rates-parquet: false, 0 | |
smart-hub-rates-xml: false, 15 | |
smart-hub-sensor-mappings-json: false, 15 |
Successfully running the four Crawlers completes the Raw Data Ingestion stage of the demonstration.
Converting to Parquet with CTAS
With the Raw Data Ingestion stage completed, we will now transform the raw Smart Hub usage data, sensor mapping data, and locations data into Parquet-format using three AWS Lambda functions. Each Lambda subsequently calls Athena, which executes a CREATE TABLE AS SELECT
SQL statement (aka CTAS) . Each Lambda executes a similar command, varying only by data source, data destination, and partitioning scheme. Below, is an example of the command used for the Smart Hub electrical usage data, taken from the Python-based Lambda, athena-json-to-parquet-data/index.py.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query = \ | |
"CREATE TABLE IF NOT EXISTS " + data_catalog + "." + output_directory + " " \ | |
"WITH ( " \ | |
" format = 'PARQUET', " \ | |
" parquet_compression = 'SNAPPY', " \ | |
" partitioned_by = ARRAY['dt'], " \ | |
" external_location = 's3://" + data_bucket + "/" + output_directory + "' " \ | |
") AS " \ | |
"SELECT * " \ | |
"FROM " + data_catalog + "." + input_directory + ";" |
This compact, yet powerful CTAS statement converts a copy of the raw JSON- and CSV-format data files into Parquet-format, and partitions and stores the resulting files back into the S3-based data lake. Additionally, the CTAS SQL statement catalogs the Parquet-format data files into the Glue Data Catalog database, into new tables. Unfortunately, this method will not work for the XML-format raw data files, which we will tackle next.
The five deployed Lambda functions should be visible from the Lambda Console’s Functions tab.
Invoke the three Lambda functions using the AWS CLI. (part of step 2a in workflow diagram).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws lambda invoke \ | |
–function-name athena-json-to-parquet-data \ | |
response.json | |
aws lambda invoke \ | |
–function-name athena-csv-to-parquet-locations \ | |
response.json | |
aws lambda invoke \ | |
–function-name athena-json-to-parquet-mappings \ | |
response.json |
Here is an example of the same CTAS command, shown above for the Smart Hub electrical usage data, as it is was executed successfully by Athena.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE IF NOT EXISTS smart_hub_data_catalog.smart_hub_data_parquet | |
WITH (format = 'PARQUET', | |
parquet_compression = 'SNAPPY', | |
partitioned_by = ARRAY['dt'], | |
external_location = 's3://smart-hub-data-demo-account-1-us-east-1/smart_hub_data_parquet') | |
AS | |
SELECT * | |
FROM smart_hub_data_catalog.smart_hub_data_json |
We can view any Athena SQL query from the Athena Console’s History tab. Clicking on a query (in pink) will copy it to the Query Editor tab and execute it. Below, we see the three SQL statements executed by the Lamba functions.
AWS Glue ETL Job for XML
If you recall, the electrical rate data is in XML format. The Lambda functions we just executed, converted the CSV and JSON data to Parquet using Athena. Currently, unlike CSV, JSON, ORC, Parquet, and Avro, Athena does not support the older XML data format. For the XML data files, we will use an AWS Glue ETL Job to convert the XML data to Parquet. The Glue ETL Job is written in Python and uses Apache Spark, along with several AWS Glue PySpark extensions. For this job, I used an existing script created in the Glue ETL Jobs Console as a base, then modified the script to meet my needs.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
args = getResolvedOptions(sys.argv, [ | |
'JOB_NAME', | |
's3_output_path', | |
'source_glue_database', | |
'source_glue_table' | |
]) | |
s3_output_path = args['s3_output_path'] | |
source_glue_database = args['source_glue_database'] | |
source_glue_table = args['source_glue_table'] | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
job = Job(glueContext) | |
job.init(args['JOB_NAME'], args) | |
datasource0 = glueContext. \ | |
create_dynamic_frame. \ | |
from_catalog(database=source_glue_database, | |
table_name=source_glue_table, | |
transformation_ctx="datasource0") | |
applymapping1 = ApplyMapping.apply( | |
frame=datasource0, | |
mappings=[("from", "string", "from", "string"), | |
("to", "string", "to", "string"), | |
("type", "string", "type", "string"), | |
("rate", "double", "rate", "double"), | |
("year", "int", "year", "int"), | |
("month", "int", "month", "int"), | |
("state", "string", "state", "string")], | |
transformation_ctx="applymapping1") | |
resolvechoice2 = ResolveChoice.apply( | |
frame=applymapping1, | |
choice="make_struct", | |
transformation_ctx="resolvechoice2") | |
dropnullfields3 = DropNullFields.apply( | |
frame=resolvechoice2, | |
transformation_ctx="dropnullfields3") | |
datasink4 = glueContext.write_dynamic_frame.from_options( | |
frame=dropnullfields3, | |
connection_type="s3", | |
connection_options={ | |
"path": s3_output_path, | |
"partitionKeys": ["state"] | |
}, | |
format="parquet", | |
transformation_ctx="datasink4") | |
job.commit() |
The three Python command-line arguments the script expects (lines 10–12, above) are defined in the CloudFormation template, smart-hub-athena-glue.yml. Below, we see them on lines 10–12 of the CloudFormation snippet. They are injected automatically when the job is run and can be overridden from the command line when starting the job.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GlueJobRatesToParquet: | |
Type: AWS::Glue::Job | |
Properties: | |
GlueVersion: 1.0 | |
Command: | |
Name: glueetl | |
PythonVersion: 3 | |
ScriptLocation: !Sub "s3://${ScriptBucketName}/glue_scripts/rates_xml_to_parquet.py" | |
DefaultArguments: { | |
"–s3_output_path": !Sub "s3://${DataBucketName}/electricity_rates_parquet", | |
"–source_glue_database": !Ref GlueDatabase, | |
"–source_glue_table": "electricity_rates_xml", | |
"–job-bookmark-option": "job-bookmark-enable", | |
"–enable-spark-ui": "true", | |
"–spark-event-logs-path": !Sub "s3://${LogBucketName}/glue-etl-jobs/" | |
} | |
Description: "Convert electrical rates XML data to Parquet" | |
ExecutionProperty: | |
MaxConcurrentRuns: 2 | |
MaxRetries: 0 | |
Name: rates-xml-to-parquet | |
Role: !GetAtt "CrawlerRole.Arn" | |
DependsOn: | |
– CrawlerRole | |
– GlueDatabase | |
– DataBucket | |
– ScriptBucket | |
– LogBucket |
First, copy the Glue ETL Job Python script to the SCRIPT_BUCKET
S3 bucket.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 cp glue-scripts/rates_xml_to_parquet.py \ | |
s3://${SCRIPT_BUCKET}/glue_scripts/ |
Next, start the Glue ETL Job (part of step 2a in workflow diagram). Although the conversion is a relatively simple set of tasks, the creation of the Apache Spark environment, to execute the tasks, will take several minutes. Whereas the Glue Crawlers took about 2 minutes on average, the Glue ETL Job could take 10–15 minutes in my experience. The actual execution time only takes about 1–2 minutes of the 10–15 minutes to complete. In my opinion, waiting up to 15 minutes is too long to be viable for ad-hoc jobs against smaller datasets; Glue ETL Jobs are definitely targeted for big data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-job-run –job-name rates-xml-to-parquet |
To check on the status of the job, use the Glue ETL Jobs Console, or use the AWS CLI.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# get status of most recent job (the one that is running) | |
aws glue get-job-run \ | |
–job-name rates-xml-to-parquet \ | |
–run-id "$(aws glue get-job-runs \ | |
–job-name rates-xml-to-parquet \ | |
| jq -r '.JobRuns[0].Id')" |
When complete, you should see results similar to the following. Note the ‘JobRunState’ is ‘SUCCEEDED.’ This particular job ran for a total of 14.92 minutes, while the actual execution time was 2.25 minutes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"JobRun": { | |
"Id": "jr_f7186b26bf042ea7773ad08704d012d05299f080e7ac9b696ca8dd575f79506b", | |
"Attempt": 0, | |
"JobName": "rates-xml-to-parquet", | |
"StartedOn": 1578022390.301, | |
"LastModifiedOn": 1578023285.632, | |
"CompletedOn": 1578023285.632, | |
"JobRunState": "SUCCEEDED", | |
"PredecessorRuns": [], | |
"AllocatedCapacity": 10, | |
"ExecutionTime": 135, | |
"Timeout": 2880, | |
"MaxCapacity": 10.0, | |
"LogGroupName": "/aws-glue/jobs", | |
"GlueVersion": "1.0" | |
} | |
} |
The job’s progress and the results are also visible in the AWS Glue Console’s ETL Jobs tab.
Detailed Apache Spark logs are also available in CloudWatch Management Console, which is accessible directly from the Logs link in the AWS Glue Console’s ETL Jobs tab.
The last step in the Data Transformation stage is to convert catalog the Parquet-format electrical rates data, created with the previous Glue ETL Job, using yet another Glue Crawler (part of step 2b in workflow diagram). Start the following Glue Crawler to catalog the Parquet-format electrical rates data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-crawler –name smart-hub-rates-parquet |
This concludes the Data Transformation stage. The raw and transformed data is in the data lake, and the following nine tables should exist in the Glue Data Catalog.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
electricity_rates_parquet | |
electricity_rates_xml | |
etl_tmp_output_parquet | |
sensor_mappings_json | |
sensor_mappings_parquet | |
smart_hub_data_json | |
smart_hub_data_parquet | |
smart_hub_locations_csv | |
smart_hub_locations_parquet |
If we examine the tables, we should observe the data partitions we used to organize the data files in the Amazon S3-based data lake are contained in the table metadata. Below, we see the four partitions, based on state, of the Parquet-format locations data.
Data Enrichment
To begin the Data Enrichment stage, we will invoke the AWS Lambda, athena-complex-etl-query/index.py. This Lambda accepts input parameters (lines 28–30, below), passed in the Lambda handler’s event parameter. The arguments include the Smart Hub ID, the start date for the data requested, and the end date for the data requested. The scenario for the demonstration is that a customer with the location id value, using the electrical provider’s application, has requested data for a particular range of days (start date and end date), to visualize and analyze.
The Lambda executes a series of Athena INSERT INTO
SQL statements, one statement for each of the possible Smart Hub connected electrical sensors, s_01
through s_10
, for which there are values in the Smart Hub electrical usage data. Amazon just released the Amazon Athena INSERT INTO
a table using the results of a SELECT query capability in September 2019, an essential addition to Athena. New Athena features are listed in the release notes.
Here, the SELECT query is actually a series of chained subqueries, using Presto SQL’s WITH clause capability. The queries join the Parquet-format Smart Hub electrical usage data sources in the S3-based data lake, with the other three Parquet-format, S3-based data sources: sensor mappings, locations, and electrical rates. The Parquet-format data is written as individual files to S3 and inserted into the existing ‘etl_tmp_output_parquet’ Glue Data Catalog database table. Compared to traditional relational database-based queries, the capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!
The capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!
Below, we see the SQL statement starting on line 43.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import os | |
import logging | |
import json | |
from typing import Dict | |
# environment variables | |
data_catalog = os.getenv('DATA_CATALOG') | |
data_bucket = os.getenv('DATA_BUCKET') | |
# variables | |
output_directory = 'etl_tmp_output_parquet' | |
# uses list comprehension to generate the equivalent of: | |
# ['s_01', 's_02', …, 's_09', 's_10'] | |
sensors = [f's_{i:02d}' for i in range(1, 11)] | |
# logging | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
# athena client | |
athena_client = boto3.client('athena') | |
def handler(event, context): | |
args = { | |
"loc_id": event['loc_id'], | |
"date_from": event['date_from'], | |
"date_to": event['date_to'] | |
} | |
athena_query(args) | |
return { | |
'statusCode': 200, | |
'body': json.dumps("function 'athena-complex-etl-query' complete") | |
} | |
def athena_query(args: Dict[str, str]): | |
for sensor in sensors: | |
query = \ | |
"INSERT INTO " + data_catalog + "." + output_directory + " " \ | |
"WITH " \ | |
" t1 AS " \ | |
" (SELECT d.loc_id, d.ts, d.data." + sensor + " AS kwh, l.state, l.tz " \ | |
" FROM smart_hub_data_catalog.smart_hub_data_parquet d " \ | |
" LEFT OUTER JOIN smart_hub_data_catalog.smart_hub_locations_parquet l " \ | |
" ON d.loc_id = l.hash " \ | |
" WHERE d.loc_id = '" + args['loc_id'] + "' " \ | |
" AND d.dt BETWEEN cast('" + args['date_from'] + \ | |
"' AS date) AND cast('" + args['date_to'] + "' AS date)), " \ | |
" t2 AS " \ | |
" (SELECT at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz) AS ts, " \ | |
" date_format(at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz), '%H') AS rate_period, " \ | |
" m.description AS device, m.location, t1.loc_id, t1.state, t1.tz, t1.kwh " \ | |
" FROM t1 LEFT OUTER JOIN smart_hub_data_catalog.sensor_mappings_parquet m " \ | |
" ON t1.loc_id = m.loc_id " \ | |
" WHERE t1.loc_id = '" + args['loc_id'] + "' " \ | |
" AND m.state = t1.state " \ | |
" AND m.description = (SELECT m2.description " \ | |
" FROM smart_hub_data_catalog.sensor_mappings_parquet m2 " \ | |
" WHERE m2.loc_id = '" + args['loc_id'] + "' AND m2.id = '" + sensor + "')), " \ | |
" t3 AS " \ | |
" (SELECT substr(r.to, 1, 2) AS rate_period, r.type, r.rate, r.year, r.month, r.state " \ | |
" FROM smart_hub_data_catalog.electricity_rates_parquet r " \ | |
" WHERE r.year BETWEEN cast(date_format(cast('" + args['date_from'] + \ | |
"' AS date), '%Y') AS integer) AND cast(date_format(cast('" + args['date_to'] + \ | |
"' AS date), '%Y') AS integer)) " \ | |
"SELECT replace(cast(t2.ts AS VARCHAR), concat(' ', t2.tz), '') AS ts, " \ | |
" t2.device, t2.location, t3.type, t2.kwh, t3.rate AS cents_per_kwh, " \ | |
" round(t2.kwh * t3.rate, 4) AS cost, t2.state, t2.loc_id " \ | |
"FROM t2 LEFT OUTER JOIN t3 " \ | |
" ON t2.rate_period = t3.rate_period " \ | |
"WHERE t3.state = t2.state " \ | |
"ORDER BY t2.ts, t2.device;" | |
logger.info(query) | |
response = athena_client.start_query_execution( | |
QueryString=query, | |
QueryExecutionContext={ | |
'Database': data_catalog | |
}, | |
ResultConfiguration={ | |
'OutputLocation': 's3://' + data_bucket + '/tmp/' + output_directory | |
}, | |
WorkGroup='primary' | |
) | |
logger.info(response) |
Below, is an example of one of the final queries, for the s_10
sensor, as executed by Athena. All the input parameter values, Python variables, and environment variables have been resolved into the query.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT INTO smart_hub_data_catalog.etl_tmp_output_parquet | |
WITH t1 AS (SELECT d.loc_id, d.ts, d.data.s_10 AS kwh, l.state, l.tz | |
FROM smart_hub_data_catalog.smart_hub_data_parquet d | |
LEFT OUTER JOIN smart_hub_data_catalog.smart_hub_locations_parquet l ON d.loc_id = l.hash | |
WHERE d.loc_id = 'b6a8d42425fde548' | |
AND d.dt BETWEEN cast('2019-12-21' AS date) AND cast('2019-12-22' AS date)), | |
t2 AS (SELECT at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz) AS ts, | |
date_format(at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz), '%H') AS rate_period, | |
m.description AS device, | |
m.location, | |
t1.loc_id, | |
t1.state, | |
t1.tz, | |
t1.kwh | |
FROM t1 | |
LEFT OUTER JOIN smart_hub_data_catalog.sensor_mappings_parquet m ON t1.loc_id = m.loc_id | |
WHERE t1.loc_id = 'b6a8d42425fde548' | |
AND m.state = t1.state | |
AND m.description = (SELECT m2.description | |
FROM smart_hub_data_catalog.sensor_mappings_parquet m2 | |
WHERE m2.loc_id = 'b6a8d42425fde548' | |
AND m2.id = 's_10')), | |
t3 AS (SELECT substr(r.to, 1, 2) AS rate_period, r.type, r.rate, r.year, r.month, r.state | |
FROM smart_hub_data_catalog.electricity_rates_parquet r | |
WHERE r.year BETWEEN cast(date_format(cast('2019-12-21' AS date), '%Y') AS integer) | |
AND cast(date_format(cast('2019-12-22' AS date), '%Y') AS integer)) | |
SELECT replace(cast(t2.ts AS VARCHAR), concat(' ', t2.tz), '') AS ts, | |
t2.device, | |
t2.location, | |
t3.type, | |
t2.kwh, | |
t3.rate AS cents_per_kwh, | |
round(t2.kwh * t3.rate, 4) AS cost, | |
t2.state, | |
t2.loc_id | |
FROM t2 | |
LEFT OUTER JOIN t3 ON t2.rate_period = t3.rate_period | |
WHERE t3.state = t2.state | |
ORDER BY t2.ts, t2.device; |
Along with enriching the data, the query performs additional data transformation using the other data sources. For example, the Unix timestamp is converted to a localized timestamp containing the date and time, according to the customer’s location (line 7, above). Transforming dates and times is a frequent, often painful, data analysis task. Another example of data enrichment is the augmentation of the data with a new, computed column. The column’s values are calculated using the values of two other columns (line 33, above).
Invoke the Lambda with the following three parameters in the payload (step 3a in workflow diagram).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws lambda invoke \ | |
–function-name athena-complex-etl-query \ | |
–payload "{ \"loc_id\": \"b6a8d42425fde548\", | |
\"date_from\": \"2019-12-21\", \"date_to\": \"2019-12-22\"}" \ | |
response.json |
The ten INSERT INTO
SQL statement’s result statuses (one per device sensor) are visible from the Athena Console’s History tab.
Each Athena query execution saves that query’s results to the S3-based data lake as individual, uncompressed Parquet-format data files. The data is partitioned in the Amazon S3-based data lake by the Smart Meter location ID (e.g., ‘loc_id=b6a8d42425fde548’).
Below is a snippet of the enriched data for a customer’s clothes washer (sensor ‘s_04’). Note the timestamp is now an actual date and time in the local timezone of the customer (e.g., ‘2019-12-21 20:10:00.000’). The sensor ID (‘s_04’) is replaced with the actual device name (‘Clothes Washer’). The location of the device (‘Basement’) and the type of electrical usage period (e.g. ‘peak’ or ‘partial-peak’) has been added. Finally, the cost column has been computed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ts | device | location | type | kwh | cents_per_kwh | cost | state | loc_id | |
---|---|---|---|---|---|---|---|---|---|
2019-12-21 19:40:00.000 | Clothes Washer | Basement | peak | 0.0 | 12.623 | 0.0 | or | b6a8d42425fde548 | |
2019-12-21 19:45:00.000 | Clothes Washer | Basement | peak | 0.0 | 12.623 | 0.0 | or | b6a8d42425fde548 | |
2019-12-21 19:50:00.000 | Clothes Washer | Basement | peak | 0.1501 | 12.623 | 1.8947 | or | b6a8d42425fde548 | |
2019-12-21 19:55:00.000 | Clothes Washer | Basement | peak | 0.1497 | 12.623 | 1.8897 | or | b6a8d42425fde548 | |
2019-12-21 20:00:00.000 | Clothes Washer | Basement | partial-peak | 0.1501 | 7.232 | 1.0855 | or | b6a8d42425fde548 | |
2019-12-21 20:05:00.000 | Clothes Washer | Basement | partial-peak | 0.2248 | 7.232 | 1.6258 | or | b6a8d42425fde548 | |
2019-12-21 20:10:00.000 | Clothes Washer | Basement | partial-peak | 0.2247 | 7.232 | 1.625 | or | b6a8d42425fde548 | |
2019-12-21 20:15:00.000 | Clothes Washer | Basement | partial-peak | 0.2248 | 7.232 | 1.6258 | or | b6a8d42425fde548 | |
2019-12-21 20:20:00.000 | Clothes Washer | Basement | partial-peak | 0.2253 | 7.232 | 1.6294 | or | b6a8d42425fde548 | |
2019-12-21 20:25:00.000 | Clothes Washer | Basement | partial-peak | 0.151 | 7.232 | 1.092 | or | b6a8d42425fde548 |
To transform the enriched CSV-format data to Parquet-format, we need to catalog the CSV-format results using another Crawler, first (step 3d in workflow diagram).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-crawler –name smart-hub-etl-tmp-output-parquet |
Optimizing Enriched Data
The previous step created enriched Parquet-format data. However, this data is not as optimized for query efficiency as it should be. Using the Athena INSERT INTO WITH
SQL statement, allowed the data to be partitioned. However, the method does not allow the Parquet data to be easily combined into larger files and compressed. To perform both these optimizations, we will use one last Lambda, athena-parquet-to-parquet-elt-data/index.py. The Lambda will create a new location in the Amazon S3-based data lake, containing all the enriched data, in a single file and compressed using Snappy compression.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws lambda invoke \ | |
–function-name athena-parquet-to-parquet-elt-data \ | |
response.json |
The resulting Parquet file is visible in the S3 Management Console.
The final step in the Data Enrichment stage is to catalog the optimized Parquet-format enriched ETL data. To catalog the data, run the following Glue Crawler (step 3i in workflow diagram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue start-crawler –name smart-hub-etl-output-parquet |
Final Data Lake and Data Catalog
We should now have the following ten top-level folders of partitioned data in the S3-based data lake. The ‘tmp’ folder may be ignored.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 ls s3://${DATA_BUCKET}/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PRE electricity_rates_parquet/ | |
PRE electricity_rates_xml/ | |
PRE etl_output_parquet/ | |
PRE etl_tmp_output_parquet/ | |
PRE sensor_mappings_json/ | |
PRE sensor_mappings_parquet/ | |
PRE smart_hub_data_json/ | |
PRE smart_hub_data_parquet/ | |
PRE smart_hub_locations_csv/ | |
PRE smart_hub_locations_parquet/ |
Similarly, we should now have the following ten corresponding tables in the Glue Data Catalog. Use the AWS Glue Console to confirm the tables exist.
Alternately, use the following AWS CLI / jq command to list the table names.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws glue get-tables \ | |
–database-name smart_hub_data_catalog \ | |
| jq -r '.TableList[].Name' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
electricity_rates_parquet | |
electricity_rates_xml | |
etl_output_parquet | |
etl_tmp_output_parquet | |
sensor_mappings_json | |
sensor_mappings_parquet | |
smart_hub_data_json | |
smart_hub_data_parquet | |
smart_hub_locations_csv | |
smart_hub_locations_parquet |
‘Unknown’ Bug
You may have noticed the four tables created with the AWS Lambda functions, using the CTAS SQL statement, erroneously have the ‘Classification’ of ‘Unknown’ as opposed to ‘parquet’. I am not sure why, I believe it is a possible bug with the CTAS feature. It seems to have no adverse impact on the table’s functionality. However, to fix the issue, run the following set of commands. This aws glue update-table
hack will switch the table’s ‘Classification’ to ‘parquet’.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
database=smart_hub_data_catalog | |
tables=(smart_hub_locations_parquet sensor_mappings_parquet smart_hub_data_parquet etl_output_parquet) | |
for table in ${tables}; do | |
fixed_table=$(aws glue get-table \ | |
–database-name "${database}" \ | |
–name "${table}" \ | |
| jq '.Table.Parameters.classification = "parquet" | del(.Table.DatabaseName) | del(.Table.CreateTime) | del(.Table.UpdateTime) | del(.Table.CreatedBy) | del(.Table.IsRegisteredWithLakeFormation)') | |
fixed_table=$(echo ${fixed_table} | jq .Table) | |
aws glue update-table \ | |
–database-name "${database}" \ | |
–table-input "${fixed_table}" | |
echo "table '${table}' classification changed to 'parquet'" | |
done |
The results of the fix may be seen from the AWS Glue Console. All ten tables are now classified correctly.
Explore the Data
Before starting to visualize and analyze the data with Amazon QuickSight, try executing a few Athena queries against the tables in the Glue Data Catalog database, using the Athena Query Editor. Working in the Editor is the best way to understand the data, learn Athena, and debug SQL statements and queries. The Athena Query Editor has convenient developer features like SQL auto-complete and query formatting capabilities.
Be mindful when writing queries and searching the Internet for SQL references, the Athena query engine is based on Presto 0.172. The current version of Presto, 0.229, is more than 50 releases ahead of the current Athena version. Both Athena and Presto functionality has changed and diverged. There are additional considerations and limitations for SQL queries in Athena to be aware of.
Here are a few simple, ad-hoc queries to run in the Athena Query Editor.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
— preview the final etl data | |
SELECT * | |
FROM smart_hub_data_catalog.etl_output_parquet | |
LIMIT 10; | |
— total cost in $'s for each device, at location 'b6a8d42425fde548' | |
— from high to low, on December 21, 2019 | |
SELECT device, | |
concat('$', cast(cast(sum(cost) / 100 AS decimal(10, 2)) AS varchar)) AS total_cost | |
FROM smart_hub_data_catalog.etl_tmp_output_parquet | |
WHERE loc_id = 'b6a8d42425fde548' | |
AND date (cast(ts AS timestamp)) = date '2019-12-21' | |
GROUP BY device | |
ORDER BY total_cost DESC; | |
— count of smart hub residential locations in Oregon and California, | |
— grouped by zip code, sorted by count | |
SELECT DISTINCT postcode, upper(state), count(postcode) AS smart_hub_count | |
FROM smart_hub_data_catalog.smart_hub_locations_parquet | |
WHERE state IN ('or', 'ca') | |
AND length(cast(postcode AS varchar)) >= 5 | |
GROUP BY state, postcode | |
ORDER BY smart_hub_count DESC, postcode; | |
— electrical usage for the clothes washer | |
— over a 30-minute period, on December 21, 2019 | |
SELECT ts, device, location, type, cost | |
FROM smart_hub_data_catalog.etl_tmp_output_parquet | |
WHERE loc_id = 'b6a8d42425fde548' | |
AND device = 'Clothes Washer' | |
AND cast(ts AS timestamp) | |
BETWEEN timestamp '2019-12-21 08:45:00' | |
AND timestamp '2019-12-21 09:15:00' | |
ORDER BY ts; |
Cleaning Up
You may choose to save the AWS resources created in part one of this demonstration, to be used in part two. Since you are not actively running queries against the data, ongoing AWS costs will be minimal. If you eventually choose to clean up the AWS resources created in part one of this demonstration, execute the following AWS CLI commands. To avoid failures, make sure each command completes before running the subsequent command. You will need to confirm the CloudFormation stacks are deleted using the AWS CloudFormation Console or the AWS CLI. These commands will not remove Amazon QuickSight data sets, analyses, and dashboards created in part two. However, deleting the AWS Glue Data Catalog and the underlying data sources will impact the ability to visualize the data in QuickSight.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# delete s3 contents first | |
aws s3 rm s3://${DATA_BUCKET} –recursive | |
aws s3 rm s3://${SCRIPT_BUCKET} –recursive | |
aws s3 rm s3://${LOG_BUCKET} –recursive | |
# then, delete lambda cfn stack | |
aws cloudformation delete-stack –stack-name smart-hub-lambda-stack | |
# finally, delete athena-glue-s3 stack | |
aws cloudformation delete-stack –stack-name smart-hub-athena-glue-stack |
Part Two
In part one, starting with raw, semi-structured data in multiple formats, we learned how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we will use the transformed and enriched datasets, stored in the data lake, to create compelling visualizations using Amazon QuickSight.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker
Posted by Gary A. Stafford in Bash Scripting, Big Data, Build Automation, DevOps, Python, Software Development on December 6, 2019
There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the marketing hype, these technologies are having a significant influence on many aspects of our modern lives. Due to their popularity and potential benefits, commercial enterprises, academic institutions, and the public sector are rushing to develop hardware and software solutions to lower the barriers to entry and increase the velocity of ML and Data Scientists and Engineers.
(courtesy Google Trends and Plotly)
Many open-source software projects are also lowering the barriers to entry into these technologies. An excellent example of one such open-source project working on this challenge is Project Jupyter. Similar to Apache Zeppelin and the newly open-sourced Netflix’s Polynote, Jupyter Notebooks enables data-driven, interactive, and collaborative data analytics.
This post will demonstrate the creation of a containerized data analytics environment using Jupyter Docker Stacks. The particular environment will be suited for learning and developing applications for Apache Spark using the Python, Scala, and R programming languages. We will focus on Python and Spark, using PySpark.
Featured Technologies
The following technologies are featured prominently in this post.
Jupyter Notebooks
According to Project Jupyter, the Jupyter Notebook, formerly known as the IPython Notebook, is an open-source web application that allows users to create and share documents that contain live code, equations, visualizations, and narrative text. Uses include data cleansing and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more. The word, Jupyter, is a loose acronym for Julia, Python, and R, but today, Jupyter supports many programming languages.
Interest in Jupyter Notebooks has grown dramatically over the last 3–5 years, fueled in part by the major Cloud providers, AWS, Google Cloud, and Azure. Amazon Sagemaker, Amazon EMR (Elastic MapReduce), Google Cloud Dataproc, Google Colab (Collaboratory), and Microsoft Azure Notebooks all have direct integrations with Jupyter notebooks for big data analytics and machine learning.
(courtesy Google Trends and Plotly)
Jupyter Docker Stacks
To enable quick and easy access to Jupyter Notebooks, Project Jupyter has created Jupyter Docker Stacks. The stacks are ready-to-run Docker images containing Jupyter applications, along with accompanying technologies. Currently, the Jupyter Docker Stacks focus on a variety of specializations, including the r-notebook, scipy-notebook, tensorflow-notebook, datascience-notebook, pyspark-notebook, and the subject of this post, the all-spark-notebook. The stacks include a wide variety of well-known packages to extend their functionality, such as scikit-learn, pandas, Matplotlib, Bokeh, NumPy, and Facets.
Apache Spark
According to Apache, Spark is a unified analytics engine for large-scale data processing. Starting as a research project at the UC Berkeley AMPLab in 2009, Spark was open-sourced in early 2010 and moved to the Apache Software Foundation in 2013. Reviewing the postings on any major career site will confirm that Spark is widely used by well-known modern enterprises, such as Netflix, Adobe, Capital One, Lockheed Martin, JetBlue Airways, Visa, and Databricks. At the time of this post, LinkedIn, alone, had approximately 3,500 listings for jobs that reference the use of Apache Spark, just in the United States.
With speeds up to 100 times faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine. Spark’s polyglot programming model allows users to write applications quickly in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). You can run Spark using its standalone cluster mode, Apache Hadoop YARN, Mesos, or Kubernetes.
PySpark
The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API and uses Py4J. According to Apache, Py4J, a bridge between Python and Java, enables Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine (JVM). Data is processed in Python and cached and shuffled in the JVM.
Docker
According to Docker, their technology gives developers and IT the freedom to build, manage, and secure business-critical applications without the fear of technology or infrastructure lock-in. For this post, I am using the current stable version of Docker Desktop Community version for macOS, as of March 2020.
Docker Swarm
Current versions of Docker include both a Kubernetes and Swarm orchestrator for deploying and managing containers. We will choose Swarm for this demonstration. According to Docker, the cluster management and orchestration features embedded in the Docker Engine are built using swarmkit. Swarmkit is a separate project that implements Docker’s orchestration layer and is used directly within Docker.
PostgreSQL
PostgreSQL is a powerful, open-source, object-relational database system. According to their website, PostgreSQL comes with many features aimed to help developers build applications, administrators to protect data integrity and build fault-tolerant environments, and help manage data no matter how big or small the dataset.
Demonstration
In this demonstration, we will explore the capabilities of the Spark Jupyter Docker Stack to provide an effective data analytics development environment. We will explore a few everyday uses, including executing Python scripts, submitting PySpark jobs, and working with Jupyter Notebooks, and reading and writing data to and from different file formats and a database. We will be using the latest jupyter/all-spark-notebook
Docker Image. This image includes Python, R, and Scala support for Apache Spark, using Apache Toree.
Architecture
As shown below, we will deploy a Docker stack to a single-node Docker swarm. The stack consists of a Jupyter All-Spark-Notebook, PostgreSQL (Alpine Linux version 12), and Adminer container. The Docker stack will have two local directories bind-mounted into the containers. Files from our GitHub project will be shared with the Jupyter application container through a bind-mounted directory. Our PostgreSQL data will also be persisted through a bind-mounted directory. This allows us to persist data external to the ephemeral containers. If the containers are restarted or recreated, the data is preserved locally.
Source Code
All source code for this post can be found on GitHub. Use the following command to clone the project. Note this post uses the v2
branch.
git clone \ --branch v2 --single-branch --depth 1 --no-tags \ https://github.com/garystafford/pyspark-setup-demo.git
Source code samples are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers.
Deploy Docker Stack
To start, create the $HOME/data/postgres
directory to store PostgreSQL data files.
mkdir -p ~/data/postgres
This directory will be bind-mounted into the PostgreSQL container on line 41 of the stack.yml file, $HOME/data/postgres:/var/lib/postgresql/data
. The HOME
environment variable assumes you are working on Linux or macOS and is equivalent to HOMEPATH
on Windows.
The Jupyter container’s working directory is set on line 15 of the stack.yml file, working_dir: /home/$USER/work
. The local bind-mounted working directory is $PWD/work
. This path is bind-mounted to the working directory in the Jupyter container, on line 29 of the Docker stack file, $PWD/work:/home/$USER/work
. The PWD
environment variable assumes you are working on Linux or macOS (CD
on Windows).
By default, the user within the Jupyter container is jovyan
. We will override that user with our own local host’s user account, as shown on line 21 of the Docker stack file, NB_USER: $USER
. We will use the host’s USER
environment variable value (equivalent to USERNAME
on Windows). There are additional options for configuring the Jupyter container. Several of those options are used on lines 17–22 of the Docker stack file (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# docker stack deploy -c stack.yml jupyter | |
# optional pgadmin container | |
version: "3.7" | |
networks: | |
demo-net: | |
services: | |
spark: | |
image: jupyter/all-spark-notebook:latest | |
ports: | |
– "8888:8888/tcp" | |
– "4040:4040/tcp" | |
networks: | |
– demo-net | |
working_dir: /home/$USER/work | |
environment: | |
CHOWN_HOME: "yes" | |
GRANT_SUDO: "yes" | |
NB_UID: 1000 | |
NB_GID: 100 | |
NB_USER: $USER | |
NB_GROUP: staff | |
user: root | |
deploy: | |
replicas: 1 | |
restart_policy: | |
condition: on-failure | |
volumes: | |
– $PWD/work:/home/$USER/work | |
postgres: | |
image: postgres:12-alpine | |
environment: | |
POSTGRES_USERNAME: postgres | |
POSTGRES_PASSWORD: postgres1234 | |
POSTGRES_DB: bakery | |
ports: | |
– "5432:5432/tcp" | |
networks: | |
– demo-net | |
volumes: | |
– $HOME/data/postgres:/var/lib/postgresql/data | |
deploy: | |
restart_policy: | |
condition: on-failure | |
adminer: | |
image: adminer:latest | |
ports: | |
– "8080:8080/tcp" | |
networks: | |
– demo-net | |
deploy: | |
restart_policy: | |
condition: on-failure | |
# pgadmin: | |
# image: dpage/pgadmin4:latest | |
# environment: | |
# PGADMIN_DEFAULT_EMAIL: user@domain.com | |
# PGADMIN_DEFAULT_PASSWORD: 5up3rS3cr3t! | |
# ports: | |
# – "8180:80/tcp" | |
# networks: | |
# – demo-net | |
# deploy: | |
# restart_policy: | |
# condition: on-failure |
The jupyter/all-spark-notebook
Docker image is large, approximately 5 GB. Depending on your Internet connection, if this is the first time you have pulled this image, the stack may take several minutes to enter a running state. Although not required, I usually pull new Docker images in advance.
docker pull jupyter/all-spark-notebook:latest docker pull postgres:12-alpine docker pull adminer:latest
Assuming you have a recent version of Docker installed on your local development machine and running in swarm mode, standing up the stack is as easy as running the following docker command from the root directory of the project.
docker stack deploy -c stack.yml jupyter
The Docker stack consists of a new overlay network, jupyter_demo-net
, and three containers. To confirm the stack deployed successfully, run the following docker command.
docker stack ps jupyter --no-trunc
To access the Jupyter Notebook application, you need to obtain the Jupyter URL and access token. The Jupyter URL and the access token are output to the Jupyter container log, which can be accessed with the following command.
docker logs $(docker ps | grep jupyter_spark | awk '{print $NF}')
You should observe log output similar to the following. Retrieve the complete URL, for example, http://127.0.0.1:8888/?token=f78cbe...
, to access the Jupyter web-based user interface.
From the Jupyter dashboard landing page, you should see all the files in the project’s work/
directory. Note the types of files you can create from the dashboard, including Python 3, R, and Scala (using Apache Toree or spylon-kernal) notebooks, and text. You can also open a Jupyter terminal or create a new Folder from the drop-down menu. At the time of this post (March 2020), the latest jupyter/all-spark-notebook
Docker Image runs Spark 2.4.5, Scala 2.11.12, Python 3.7.6, and OpenJDK 64-Bit Server VM, Java 1.8.0 Update 242.
Bootstrap Environment
Included in the project is a bootstrap script, bootstrap_jupyter.sh. The script will install the required Python packages using pip, the Python package installer, and a requirement.txt file. The bootstrap script also installs the latest PostgreSQL driver JAR, configures Apache Log4j to reduce log verbosity when submitting Spark jobs, and installs htop. Although these tasks could also be done from a Jupyter terminal or from within a Jupyter notebook, using a bootstrap script ensures you will have a consistent work environment every time you spin up the Jupyter Docker stack. Add or remove items from the bootstrap script as necessary.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -ex | |
# update/upgrade and install htop | |
sudo apt-get update -y && sudo apt-get upgrade -y | |
sudo apt-get install htop | |
# install required python packages | |
python3 -m pip install –user –upgrade pip | |
python3 -m pip install -r requirements.txt –upgrade | |
# download latest postgres driver jar | |
POSTGRES_JAR="postgresql-42.2.17.jar" | |
if [ -f "$POSTGRES_JAR" ]; then | |
echo "$POSTGRES_JAR exist" | |
else | |
wget -nv "https://jdbc.postgresql.org/download/${POSTGRES_JAR}" | |
fi | |
# spark-submit logging level from INFO to WARN | |
sudo cp log4j.properties /usr/local/spark/conf/log4j.properties |
That’s it, our new Jupyter environment is ready to start exploring.
Running Python Scripts
One of the simplest tasks we could perform in our new Jupyter environment is running Python scripts. Instead of worrying about installing and maintaining the correct versions of Python and multiple Python packages on your own development machine, we can run Python scripts from within the Jupyter container. At the time of this post update, the latest jupyter/all-spark-notebook
Docker image runs Python 3.7.3 and Conda 4.7.12. Let’s start with a simple Python script, 01_simple_script.py.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import random | |
technologies = [ | |
'PySpark', 'Python', 'Spark', 'Scala', 'Java', 'Project Jupyter', 'R' | |
] | |
print("Technologies: %s\n" % technologies) | |
technologies.sort() | |
print("Sorted: %s\n" % technologies) | |
print("I'm interested in learning about %s." % random.choice(technologies)) |
From a Jupyter terminal window, use the following command to run the script.
python3 01_simple_script.py
You should observe the following output.
Kaggle Datasets
To explore more features of the Jupyter and PySpark, we will use a publicly available dataset from Kaggle. Kaggle is an excellent open-source resource for datasets used for big-data and ML projects. Their tagline is ‘Kaggle is the place to do data science projects’. For this demonstration, we will use the Transactions from a bakery dataset from Kaggle. The dataset is available as a single CSV-format file. A copy is also included in the project.
The ‘Transactions from a bakery’ dataset contains 21,294 rows with 4 columns of data. Although certainly not big data, the dataset is large enough to test out the Spark Jupyter Docker Stack functionality. The data consists of 9,531 customer transactions for 21,294 bakery items between 2016-10-30 and 2017-04-09 (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Date | Time | Transaction | Item | |
---|---|---|---|---|
2016-10-30 | 09:58:11 | 1 | Bread | |
2016-10-30 | 10:05:34 | 2 | Scandinavian | |
2016-10-30 | 10:05:34 | 2 | Scandinavian | |
2016-10-30 | 10:07:57 | 3 | Hot chocolate | |
2016-10-30 | 10:07:57 | 3 | Jam | |
2016-10-30 | 10:07:57 | 3 | Cookies | |
2016-10-30 | 10:08:41 | 4 | Muffin | |
2016-10-30 | 10:13:03 | 5 | Coffee | |
2016-10-30 | 10:13:03 | 5 | Pastry | |
2016-10-30 | 10:13:03 | 5 | Bread |
Submitting Spark Jobs
We are not limited to Jupyter notebooks to interact with Spark. We can also submit scripts directly to Spark from the Jupyter terminal. This is typically how Spark is used in a Production for performing analysis on large datasets, often on a regular schedule, using tools such as Apache Airflow. With Spark, you are load data from one or more data sources. After performing operations and transformations on the data, the data is persisted to a datastore, such as a file or a database, or conveyed to another system for further processing.
The project includes a simple Python PySpark ETL script, 02_pyspark_job.py. The ETL script loads the original Kaggle Bakery dataset from the CSV file into memory, into a Spark DataFrame. The script then performs a simple Spark SQL query, calculating the total quantity of each type of bakery item sold, sorted in descending order. Finally, the script writes the results of the query to a new CSV file, output/items-sold.csv
.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from pyspark.sql import SparkSession | |
spark = SparkSession \ | |
.builder \ | |
.appName('spark-demo') \ | |
.getOrCreate() | |
df_bakery = spark.read \ | |
.format('csv') \ | |
.option('header', 'true') \ | |
.option('delimiter', ',') \ | |
.option('inferSchema', 'true') \ | |
.load('BreadBasket_DMS.csv') | |
df_sorted = df_bakery.cube('item').count() \ | |
.filter('item NOT LIKE \'NONE\'') \ | |
.filter('item NOT LIKE \'Adjustment\'') \ | |
.orderBy(['count', 'item'], ascending=[False, True]) | |
df_sorted.show(10, False) | |
df_sorted.coalesce(1) \ | |
.write.format('csv') \ | |
.option('header', 'true') \ | |
.save('output/items-sold.csv', mode='overwrite') |
Run the script directly from a Jupyter terminal using the following command.
python3 02_pyspark_job.py
An example of the output of the Spark job is shown below.
Typically, you would submit the Spark job using the spark-submit
command. Use a Jupyter terminal to run the following command.
$SPARK_HOME/bin/spark-submit 02_pyspark_job.py
Below, we see the output from the spark-submit
command. Printing the results in the output is merely for the purposes of the demo. Typically, Spark jobs are submitted non-interactively, and the results are persisted directly to a datastore or conveyed to another system.
Using the following commands, we can view the resulting CVS file, created by the spark job.
ls -alh output/items-sold.csv/ head -5 output/items-sold.csv/*.csv
An example of the files created by the spark job is shown below. We should have discovered that coffee is the most commonly sold bakery item with 5,471 sales, followed by bread with 3,325 sales.
Interacting with Databases
To demonstrate the flexibility of Jupyter to work with databases, PostgreSQL is part of the Docker Stack. We can read and write data from the Jupyter container to the PostgreSQL instance, running in a separate container. To begin, we will run a SQL script, written in Python, to create our database schema and some test data in a new database table. To do so, we will use the psycopg2, the PostgreSQL database adapter package for the Python, we previously installed into our Jupyter container using the bootstrap script. The below Python script, 03_load_sql.py, will execute a set of SQL statements contained in a SQL file, bakery.sql, against the PostgreSQL container instance.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import psycopg2 | |
# connect to database | |
connect_str = 'host=postgres port=5432 dbname=bakery user=postgres password=postgres1234' | |
conn = psycopg2.connect(connect_str) | |
conn.autocommit = True | |
cursor = conn.cursor() | |
# execute sql script | |
sql_file = open('bakery.sql', 'r') | |
sqlFile = sql_file.read() | |
sql_file.close() | |
sqlCommands = sqlFile.split(';') | |
for command in sqlCommands: | |
print(command) | |
if command.strip() != '': | |
cursor.execute(command) | |
# import data from csv file | |
with open('BreadBasket_DMS.csv', 'r') as f: | |
next(f) # Skip the header row. | |
cursor.copy_from( | |
f, | |
'transactions', | |
sep=',', | |
columns=('date', 'time', 'transaction', 'item') | |
) | |
conn.commit() | |
# confirm by selecting record | |
command = 'SELECT COUNT(*) FROM public.transactions;' | |
cursor.execute(command) | |
recs = cursor.fetchall() | |
print('Row count: %d' % recs[0]) |
The SQL file, bakery.sql.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP TABLE IF EXISTS "transactions"; | |
DROP SEQUENCE IF EXISTS transactions_id_seq; | |
CREATE SEQUENCE transactions_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1; | |
CREATE TABLE "public"."transactions" | |
( | |
"id" integer DEFAULT nextval('transactions_id_seq') NOT NULL, | |
"date" character varying(10) NOT NULL, | |
"time" character varying(8) NOT NULL, | |
"transaction" integer NOT NULL, | |
"item" character varying(50) NOT NULL | |
) WITH (oids = false); |
To execute the script, run the following command.
python3 03_load_sql.py
This should result in the following output, if successful.
Adminer
To confirm the SQL script’s success, use Adminer. Adminer (formerly phpMinAdmin) is a full-featured database management tool written in PHP. Adminer natively recognizes PostgreSQL, MySQL, SQLite, and MongoDB, among other database engines. The current version is 4.7.6 (March 2020).
Adminer should be available on localhost port 8080. The password credentials, shown below, are located in the stack.yml file. The server name, postgres
, is the name of the PostgreSQL Docker container. This is the domain name the Jupyter container will use to communicate with the PostgreSQL container.
Connecting to the new bakery
database with Adminer, we should see the transactions
table.
The table should contain 21,293 rows, each with 5 columns of data.
pgAdmin
Another excellent choice for interacting with PostgreSQL, in particular, is pgAdmin 4. It is my favorite tool for the administration of PostgreSQL. Although limited to PostgreSQL, the user interface and administrative capabilities of pgAdmin is superior to Adminer, in my opinion. For brevity, I chose not to include pgAdmin in this post. The Docker stack also contains a pgAdmin container, which has been commented out. To use pgAdmin, just uncomment the container and re-run the Docker stack deploy command. pgAdmin should then be available on localhost port 81. The pgAdmin login credentials are in the Docker stack file.
Developing Jupyter Notebooks
The real power of the Jupyter Docker Stacks containers is Jupyter Notebooks. According to the Jupyter Project, the notebook extends the console-based approach to interactive computing in a qualitatively new direction, providing a web-based application suitable for capturing the whole computation process, including developing, documenting, and executing code, as well as communicating the results. Notebook documents contain the inputs and outputs of an interactive session as well as additional text that accompanies the code but is not meant for execution.
To explore the capabilities of Jupyter notebooks, the project includes two simple Jupyter notebooks. The first notebooks, 04_notebook.ipynb, demonstrates typical PySpark functions, such as loading data from a CSV file and from the PostgreSQL database, performing basic data analysis with Spark SQL including the use of PySpark user-defined functions (UDF), graphing the data using BokehJS, and finally, saving data back to the database, as well as to the fast and efficient Apache Parquet file format. Below we see several notebook cells demonstrating these features.
IDE Integration
Recall, the working directory, containing the GitHub source code for the project, is bind-mounted to the Jupyter container. Therefore, you can also edit any of the files, including notebooks, in your favorite IDE, such as JetBrains PyCharm and Microsoft Visual Studio Code. PyCharm has built-in language support for Jupyter Notebooks, as shown below.
As does Visual Studio Code using the Python extension.
Using Additional Packages
As mentioned in the Introduction, the Jupyter Docker Stacks come ready-to-run, with a wide variety of Python packages to extend their functionality. To demonstrate the use of these packages, the project contains a second Jupyter notebook document, 05_notebook.ipynb. This notebook uses SciPy, the well-known Python package for mathematics, science, and engineering, NumPy, the well-known Python package for scientific computing, and the Plotly Python Graphing Library. While NumPy and SciPy are included on the Jupyter Docker Image, the bootstrap script uses pip to install the required Plotly packages. Similar to Bokeh, shown in the previous notebook, we can use these libraries to create richly interactive data visualizations.
Plotly
To use Plotly from within the notebook, you will first need to sign up for a free account and obtain a username and API key. To ensure we do not accidentally save sensitive Plotly credentials in the notebook, we are using python-dotenv. This Python package reads key/value pairs from a .env
file, making them available as environment variables to our notebook. Modify and run the following two commands from a Jupyter terminal to create the .env
file and set you Plotly username and API key credentials. Note that the .env
file is part of the .gitignore
file and will not be committed to back to git, potentially compromising the credentials.
echo "PLOTLY_USERNAME=your-username" >> .env echo "PLOTLY_API_KEY=your-api-key" >> .env
The notebook expects to find the two environment variables, which it uses to authenticate with Plotly.
Shown below, we use Plotly to construct a bar chart of daily bakery items sold. The chart uses SciPy and NumPy to construct a linear fit (regression) and plot a line of best fit for the bakery data and overlaying the vertical bars. The chart also uses SciPy’s Savitzky-Golay Filter to plot the second line, illustrating a smoothing of our bakery data.
Plotly also provides Chart Studio Online Chart Maker. Plotly describes Chart Studio as the world’s most modern enterprise data visualization solutions. We can enhance, stylize, and share our data visualizations using the free version of Chart Studio Cloud.
Jupyter Notebook Viewer
Notebooks can also be viewed using nbviewer, an open-source project under Project Jupyter. Thanks to Rackspace hosting, the nbviewer instance is a free service.
Using nbviewer, below, we see the output of a cell within the 04_notebook.ipynb notebook. View this notebook, here, using nbviewer.
Monitoring Spark Jobs
The Jupyter Docker container exposes Spark’s monitoring and instrumentation web user interface. We can review each completed Spark Job in detail.
We can review details of each stage of the Spark job, including a visualization of the DAG (Directed Acyclic Graph), which Spark constructs as part of the job execution plan, using the DAG Scheduler.
We can also review the task composition and timing of each event occurring as part of the stages of the Spark job.
We can also use the Spark interface to review and confirm the runtime environment configuration, including versions of Java, Scala, and Spark, as well as packages available on the Java classpath.
Local Spark Performance
Running Spark on a single node within the Jupyter Docker container on your local development system is not a substitute for a true Spark cluster, Production-grade, multi-node Spark clusters running on bare metal or robust virtualized hardware, and managed with Apache Hadoop YARN, Apache Mesos, or Kubernetes. In my opinion, you should only adjust your Docker resources limits to support an acceptable level of Spark performance for running small exploratory workloads. You will not realistically replace the need to process big data and execute jobs requiring complex calculations on a Production-grade, multi-node Spark cluster.
We can use the following docker stats command to examine the container’s CPU and memory metrics.
docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"
Below, we see the stats from the Docker stack’s three containers showing little or no activity.
Compare those stats with the ones shown below, recorded while a notebook was reading and writing data, and executing Spark SQL queries. The CPU and memory output show spikes, but both appear to be within acceptable ranges.
Linux Process Monitors
Another option to examine container performance metrics is top, which is pre-installed in our Jupyter container. For example, execute the following top
command from a Jupyter terminal, sorting processes by CPU usage.
top -o %CPU
We should observe the individual performance of each process running in the Jupyter container.
A step up from top
is htop, an interactive process viewer for Unix. It was installed in the container by the bootstrap script. For example, we can execute the htop
command from a Jupyter terminal, sorting processes by CPU % usage.
htop --sort-key PERCENT_CPU
With htop
, observe the individual CPU activity. Here, the four CPUs at the top left of the htop
window are the CPUs assigned to Docker. We get insight into the way Spark is using multiple CPUs, as well as other performance metrics, such as memory and swap.
Assuming your development machine is robust, it is easy to allocate and deallocate additional compute resources to Docker if required. Be careful not to allocate excessive resources to Docker, starving your host machine of available compute resources for other applications.
Notebook Extensions
There are many ways to extend the Jupyter Docker Stacks. A popular option is jupyter-contrib-nbextensions. According to their website, the jupyter-contrib-nbextensions
package contains a collection of community-contributed unofficial extensions that add functionality to the Jupyter notebook. These extensions are mostly written in JavaScript and will be loaded locally in your browser. Installed notebook extensions can be enabled, either by using built-in Jupyter commands, or more conveniently by using the jupyter_nbextensions_configurator server extension.
The project contains an alternate Docker stack file, stack-nbext.yml. The stack uses an alternative Docker image, garystafford/all-spark-notebook-nbext:latest
, This Dockerfile, which builds it, uses the jupyter/all-spark-notebook:latest
image as a base image. The alternate image adds in the jupyter-contrib-nbextensions
and jupyter_nbextensions_configurator
packages. Since Jupyter would need to be restarted after nbextensions
is deployed, it cannot be done from within a running jupyter/all-spark-notebook
container.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM jupyter/all-spark-notebook:latest | |
USER root | |
RUN pip install jupyter_contrib_nbextensions \ | |
&& jupyter contrib nbextension install –system \ | |
&& pip install jupyter_nbextensions_configurator \ | |
&& jupyter nbextensions_configurator enable –system \ | |
&& pip install yapf # for code pretty | |
USER $NB_UID |
Using this alternate stack, below in our Jupyter container, we see the sizable list of extensions available. Some of my favorite extensions include ‘spellchecker’, ‘Codefolding’, ‘Code pretty’, ‘ExecutionTime’, ‘Table of Contents’, and ‘Toggle all line numbers’.
Below, we see five new extension icons that have been added to the menu bar of 04_notebook.ipynb. You can also observe the extensions have been applied to the notebook, including the table of contents, code-folding, execution time, and line numbering. The spellchecking and pretty code extensions were both also applied.
Conclusion
In this brief post, we have seen how easy it is to get started learning and performing data analytics using Jupyter notebooks, Python, Spark, and PySpark, all thanks to the Jupyter Docker Stacks. We could use this same stack to learn and perform machine learning using Scala and R. Extending the stack’s capabilities is as simple as swapping out this Jupyter image for another, with a different set of tools, as well as adding additional containers to the stack, such as MySQL, MongoDB, RabbitMQ, Apache Kafka, and Apache Cassandra.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients.
Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 2
Posted by Gary A. Stafford in AWS, Bash Scripting, Big Data, Build Automation, Cloud, DevOps, Python, Software Development on November 30, 2019
Introduction
In Part 1 of this two-part post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on Amazon Elastic MapReduce (EMR). Further, we configured Zeppelin integrations with AWS Glue Data Catalog, Amazon Relational Database Service (RDS) for PostgreSQL, and Amazon Simple Cloud Storage Service (S3) Data Lake. We also covered how to obtain the project’s source code from the two GitHub repositories, zeppelin-emr-demo and zeppelin-emr-config. Below is a high-level architectural diagram of the infrastructure we constructed in Part 1 for this demonstration.
Part 2
In Part 2 of this post, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using a series of four Zeppelin notebooks. Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.
Notebook 1
The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.
Interpreters
When you open a notebook for the first time, you are given the choice of interpreters to bind and unbind to the notebook. The last interpreter in the list shown below, postgres
, is the new PostgreSQL JDBC Zeppelin interpreter we created in Part 1 of this post. We will use this interpreter in Notebook 3.
Application Versions
The first two paragraphs of the notebook are used to confirm the version of Spark, Scala, OpenJDK, and Python we are using. Recall we updated the Spark and Python interpreters to use Python 3.
Helium Visualizations
If you recall from Part 1 of the post, we pre-installed several additional Helium Visualizations, including the Ultimate Pie Chart. Below, we see the use of the Spark SQL (%sql
) interpreter to query a Spark DataFrame, return results, and visualize the data using the Ultimate Pie Chart. In addition to the pie chart, we see the other pre-installed Helium visualizations proceeding the five default visualizations, in the menu bar. With Zeppelin, all we have to do is write Spark SQL queries against the Spark DataFrame created earlier in the notebook, and Zeppelin will handle the visualization. You have some basic controls over charts using the ‘settings’ option.
Building the Data Lake
Notebook 1 demonstrates how to read and write data to S3. We read and write the Bakery dataset to both CSV-format and Apache Parquet-format, using Spark (PySpark). We also write the results of Spark SQL queries, like the one above, in Parquet, to S3.
With Parquet, data may be split into multiple files, as shown in the S3 bucket directory below. Parquet is much faster to read into a Spark DataFrame than CSV. Spark provides support for both reading and writing Parquet files. We will write all of our data to Parquet in S3, making future re-use of the data much more efficient than downloading data from the Internet, like GroupLens or kaggle, or consuming CSV from S3.
Preview S3 Data
In addition to using the Zeppelin notebook, we can preview data right in the S3 bucket web interface using the Amazon S3 Select feature. This query in place feature is helpful to quickly understand the structure and content of new data files with which you want to interact within Zeppelin.
Saving Changes to GitHub
In Part 1, we configured Zeppelin to read and write the notebooks from your own copy of the GitHub notebook repository. Using the ‘version control’ menu item, changes made to the notebooks can be committed directly to GitHub.
In GitHub, note the committer is the zeppelin
user.
Notebook 2
The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.
Multi-Node EMR Cluster
If you recall from Part 1, we waited to create this cluster due to the compute costs of running the cluster’s large EC2 instances. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.
Normalized Instance Hours
Understanding the costs of EMR requires understanding the concept of normalized instance hours. Cluster displayed in the EMR AWS Console contains two columns, ‘Elapsed time’ and ‘Normalized instance hours’. The ‘Elapsed time’ column reflects the actual wall-clock time the cluster was used. The ‘Normalized instance hours’ column indicates the approximate number of compute hours the cluster has used, rounded up to the nearest hour.
Normalized instance hours calculations are based on a normalization factor. The normalization factor ranges from 1 for a small instance, up to 64 for an 8xlarge. Based on the type and quantity of instances in our multi-node cluster, we would use approximately 56 compute hours (aka normalized instance hours) for every one hour of wall-clock time our EMR cluster is running. Note the multi-node cluster used in our demo, highlighted in yellow above. The cluster ran for two hours, which equated to 112 normalized instance hours.
Create the Multi-Node Cluster
Create the multi-node EMR cluster using CloudFormation. Change the following nine variable values, then run the emr cloudformation create-stack
API command, using the AWS CLI.
# change me ZEPPELIN_DEMO_BUCKET="your-bucket-name" EC2_KEY_NAME="your-key-name" LOG_BUCKET="aws-logs-your_aws_account_id-your_region" GITHUB_ACCOUNT="your-account-name" GITHUB_REPO="your-new-project-name" GITHUB_TOKEN="your-token-value" MASTER_INSTANCE_TYPE="m5.xlarge" # optional CORE_INSTANCE_TYPE="m5.2xlarge" # optional CORE_INSTANCE_COUNT=3 # optional aws cloudformation create-stack \ --stack-name zeppelin-emr-prod-stack \ --template-body file://cloudformation/emr_cluster.yml \ --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \ ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \ ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \ ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \ ParameterKey=CoreInstanceType,ParameterValue=${CORE_INSTANCE_TYPE} \ ParameterKey=CoreInstanceCount,ParameterValue=${CORE_INSTANCE_COUNT} \ ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \ ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \ ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}
Use the Amazon EMR web interface to confirm the success of the CloudFormation stack. The fully-provisioned cluster should be in the ‘Waiting’ state when ready.
Configuring the EMR Cluster
Refer to Part 1 for the configuration steps necessary to prepare the EMR cluster and Zeppelin before continuing. Repeat all the steps used for the single-node cluster.
Monitoring with Ganglia
In Part 1, we installed Ganglia as part of creating the EMR cluster. Ganglia, according to its website, is a scalable distributed monitoring system for high-performance computing systems such as clusters and grids. Ganglia can be used to evaluate the performance of the single-node and multi-node EMR clusters. With Ganglia, we can easily view cluster and individual instance CPU, memory, and network I/O performance.