Posts Tagged Data Catalog

Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 2

Introduction

In part one, we learned how to ingest, transform, and enrich raw, semi-structured data, in multiple formats, using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we will use the transformed and enriched data sources, stored in the data lake, to create compelling visualizations using Amazon QuickSight.

athena-glue-architecture-v2High-level AWS architecture diagram of the demonstration.

Background

If you recall the demonstration from part one of the post, we had adopted the persona of a large, US-based electric energy provider. The energy provider had developed and sold its next-generation Smart Electrical Monitoring Hub (Smart Hub) to residential customers. Customers can analyze their electrical usage with a fine level of granularity, per device and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.

Data Visualization and BI

The data analysis process in the demonstration was divided into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).

athena-glue-0.pngFull data analysis workflow diagram (click to enlarge…)

In the final, Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for data visualization and business intelligence, which integrate with Amazon Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, many available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo.

In this demonstration, we will focus on Amazon QuickSight. Amazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that include ML Insights. Dashboards can be accessed from any device, and embedded into your applications, portals, and websites. QuickSight serverlessly scales automatically from tens of users to tens of thousands without any infrastructure management.

Athena-Glue-4

Using QuickSight

QuickSight APIs

Amazon recently added a full set of aws quicksight APIs for interacting with QuickSight. For example, to preview the three QuickSight data sets created for this part of the demo, with the AWS CLI, we would use the list-data-sets comand.

aws quicksight list-data-sets –aws-account-id 123456789012

view raw
list-data-sets.sh
hosted with ❤ by GitHub

{
"Status": 200,
"DataSetSummaries": [
{
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/9eb88a69-20de-d8be-aefd-2c7ac4e23748",
"DataSetId": "9eb88a69-20de-d8be-aefd-2c7ac4e23748",
"Name": "etl_output_parquet",
"CreatedTime": 1578028774.897,
"LastUpdatedTime": 1578955245.02,
"ImportMode": "SPICE"
},
{
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/78e81193-189c-6dd0-864fb-a33244c9654",
"DataSetId": "78e81193-189c-6dd0-864fb-a33244c9654",
"Name": "electricity_rates_parquet",
"CreatedTime": 1578029224.996,
"LastUpdatedTime": 1578945179.472,
"ImportMode": "SPICE"
},
{
"Arn": "arn:aws:quicksight:us-east-1:123456789012:dataset/a474214d-c838-b384-bcca-ea1fcd2dd094",
"DataSetId": "a474214d-c838-b384-bcca-ea1fcd2dd094",
"Name": "smart_hub_locations_parquet",
"CreatedTime": 1578029124.565,
"LastUpdatedTime": 1578888788.135,
"ImportMode": "SPICE"
}
],
"RequestId": "2524e80c-7c67-7fbd-c3f1-b700c521badc"
}

view raw
list-data-sets.json
hosted with ❤ by GitHub

To examine details of a single data set, with the AWS CLI, we would use the describe-data-set command.

aws quicksight describe-data-set \
–aws-account-id 123456789012 \
–data-set-id 9eb88a69-20de-d8be-aefd-2c7ac4e23748

view raw
describe-data-set.sh
hosted with ❤ by GitHub

QuickSight Console

However, for this final part of the demonstration, we will be working from the Amazon QuickSight Console, as opposed to the AWS CLI, AWS CDK, or CloudFormation templates.

Signing Up for QuickSight

To use Amazon QuickSight, you must sign up for QuickSight.

screen_shot_2020-01-02_at_9_07_00_pm

There are two Editions of Amazon QuickSight, Standard and Enterprise. For this demonstration, the Standard Edition will suffice.

screen_shot_2020-01-02_at_9_07_40_pm

QuickSight Data Sets

Amazon QuickSight uses Data Sets as the basis for all data visualizations. According to AWS, QuickSight data sets can be created from a wide variety of data sources, including Amazon RDS, Amazon Aurora, Amazon Redshift, Amazon Athena, and Amazon S3. You can also upload Excel spreadsheets or flat files (CSV, TSV, CLF, ELF, and JSON), connect to on-premises databases like SQL Server, MySQL, and PostgreSQL and import data from SaaS applications like Salesforce. Below, we see a list of the latest data sources available in the QuickSight New Data Set Console.

screen_shot_2020-01-12_at_8_18_05_pm_v2

Demonstration Data Sets

For the demonstration, I have created three QuickSight data sets, all based on Amazon Athena. You have two options when using Amazon Athena as a data source. The first option is to select a table from an AWS Glue Data Catalog database, such as the database we created in part one of the post, ‘smart_hub_data_catalog.’ The second option is to create a custom SQL query, based on one or more tables in an AWS Glue Data Catalog database.

screen_shot_2020-01-13_at_9_05_49_pm.png

Of the three data sets created for part two of this demonstration, two data sets use tables directly from the Data Catalog, including ‘etl_output_parquet’ and ‘electricity_rates_parquet.’ The third data set uses a custom SQL query, based on the single Data Catalog table, ‘smart_hub_locations_parquet.’ All three tables used to create the data sets represent the enriched, highly efficient Parquet-format data sources in the S3-based Data Lake.

screen_shot_2020-01-13_at_9_16_54_pm.png

Data Set Features

There are a large number of features available when creating and configuring data sets. We cannot possibly cover all of them in this post. Let’s look at three features: geospatial field types, calculated fields, and custom SQL.

Geospatial Data Types

QuickSight can intelligently detect common types of geographic fields in a data source and assign QuickSight geographic data type, including Country, County, City, Postcode, and State. QuickSight can also detect geospatial data, including Latitude and Longitude. We will take advantage of this QuickSight feature for our three data set’s data sources, including the State, Postcode, Latitude, and Longitude field types.

screen_shot_2020-01-13_at_9_53_19_pm.png

Calculated Fields

A commonly-used QuickSight data set feature is the ‘Calculated field.’ For the ‘etl_output_parquet’ data set, I have created a new field (column), cost_dollar.

screen_shot_2020-01-13_at_4_35_20_pm

The cost field is the electrical cost of the device, over a five minute time interval, in cents (¢). The calculated cost_dollar field is the quotient of the cost field divided by 100. This value represents the electrical cost of the device, over a five minute time interval, in dollars ($). This is a straightforward example. However, a calculated field can be very complex, built from multiple arithmetic, comparison, and conditional functions, string calculations, and data set fields.

screen_shot_2020-01-13_at_9_35_14_pm

Data set calculated fields can also be created and edited from the QuickSight Analysis Console (discussed later).

screen_shot_2020-01-13_at_4_45_47_pm.png

Custom SQL

The third QuickSight data set is based on an Amazon Athena custom SQL query.

SELECT lon, lat, postcode, hash, tz, state
FROM smart_hub_data_catalog.smart_hub_locations_parquet;

Although you can write queries in the QuickSight Data Prep Console, I prefer to write custom Athena queries using the Athena Query Editor. Using the Editor, you can write, run, debug, and optimize queries to ensure they function correctly, first.

screen_shot_2020-01-13_at_12_50_14_pm

The Athena query can then be pasted into the Custom SQL window. Clicking ‘Finish’ in the window is the equivalent of ‘Run query’ in the Athena Query Editor Console. The query runs and returns data.

screen_shot_2020-01-12_at_8_12_44_pm

Similar to the Athena Query Editor, queries executed in the QuickSight Data Prep Console will show up in the Athena History tab, with a /* QuickSight */ comment prefix.

screen_shot_2020-01-14_at_9_10_16_pm

SPICE

You will notice the three QuickSight data sets are labeled, ‘SPICE.’ According to AWS, the acronym, SPICE, stands for ‘Super-fast, Parallel, In-memory, Calculation Engine.’ QuickSight’s in-memory calculation engine, SPICE, achieves blazing fast performance at scale. SPICE automatically replicates data for high availability allowing thousands of users to simultaneously perform fast, interactive analysis while shielding your underlying data infrastructure, saving you time and resources. With the Standard Edition of QuickSight, as the first Author, you get 1 GB of SPICE in-memory data for free.

QuickSight Analysis

The QuickSight Analysis Console is where Analyses are created. A specific QuickSight Analysis will contain a collection of data sets and data visualizations (visuals). Each visual is associated with a single data set.

Types of QuickSight Analysis visuals include: horizontal and vertical, single and stacked bar charts, line graphs, combination charts, area line charts, scatter plots, heat maps, pie and donut charts, tree maps, pivot tables, gauges, key performance indicators (KPI), geospatial diagrams, and word clouds. Individual visual titles, legends, axis, and other visual aspects can be easily modified. Visuals can contain drill-downs.

A data set’s fields can be modified from within the Analysis Console. Field types and formats, such as date, numeric, currency fields, can be customized for display. The Analysis can include a Title and subtitle. There are some customizable themes available to change the overall look of the Analysis.

screen_shot_2020-01-15_at_12_45_37_am.png

Analysis Filters

Data displayed in the visuals can be further shaped using a combination of Filters, Conditional formatting, and Parameters. Below, we see an example of a typical filter based on a range of dates and times. The data set contains two full days’ worth of data. Here, we are filtering the data to a 14-hour peak electrical usage period, between 8 AM and 10 PM on the same day, 12/21/2019.

screen_shot_2020-01-13_at_10_46_57_pm.png

Drill-Down, Drill-Up, Focus, and Exclude

According to AWS, all visual types except pivot tables offer the ability to create a hierarchy of fields for a visual element. The hierarchy lets you drill down or up to see data at different levels of the hierarchy. Focus allows you to concentrate on a single element within a hierarchy of fields. Exclude allows you to remove an element from a hierarchy of fields. Below, we see an example of all four of these features, available to apply to the ‘Central Air Conditioner’. Since the AC unit is the largest consumer of electricity on average per day, applying these filters to understand its impact on the overall electrical usage may be useful to an analysis. We can also drill down to minutes from hours or up to days from hours.

screen_shot_2020-01-15_at_12_56_54_am.png

Example QuickSight Analysis

A QuickSight Analysis is shared by the Analysis Author as a QuickSight Dashboard. Below, we see an example of a QuickSight Dashboard, built and shared for this demonstration. The ‘Residential Electrical Usage Analysis’ is built from the three data sets created earlier. From those data sets, we have constructed several visuals, including a geospatial diagram, donut chart, heat map, KPI, combination chart, stacked vertical bar chart, and line graph. Each visual’s title, layout, and field display has all customized. The data displayed in the visuals have been filtered differently, including by date and time, by customer id (loc_id), and by state. Conditional formatting is used to enhance the visual appearance of visuals, such as the ‘Total Electrical Cost’ KPI.

screen_shot_2020-01-13_at_7_57_47_pm_v4.png

Conclusion

In part one, we learned how to ingest, transform, and enrich raw, semi-structured data, in multiple formats, using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we used the transformed and enriched datasets, stored in the data lake, to create compelling visualizations using Amazon QuickSight.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , ,

2 Comments

Getting Started with Data Analysis on AWS using AWS Glue, Amazon Athena, and QuickSight: Part 1

Introduction

According to Wikipedia, data analysis is “a process of inspecting, cleansing, transforming, and modeling data with the goal of discovering useful information, informing conclusion, and supporting decision-making.” In this two-part post, we will explore how to get started with data analysis on AWS, using the serverless capabilities of Amazon Athena, AWS Glue, Amazon QuickSight, Amazon S3, and AWS Lambda. We will learn how to use these complementary services to transform, enrich, analyze, and visualize semi-structured data.

Data Analysis—discovering useful information, informing conclusion, and supporting decision-making. –Wikipedia

In part one, we will begin with raw, semi-structured data in multiple formats. We will discover how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We will build an S3-based data lake, and learn how AWS leverages open-source technologies, such as Presto, Apache Hive, and Apache Parquet. In part two, we will learn how to further analyze and visualize the data using Amazon QuickSight. Here’s a quick preview of what we will build in part one of the post.

Demonstration

In this demonstration, we will adopt the persona of a large, US-based electric energy provider. The energy provider has developed its next-generation Smart Electrical Monitoring Hub (Smart Hub). They have sold the Smart Hub to a large number of residential customers throughout the United States. The hypothetical Smart Hub wirelessly collects detailed electrical usage data from individual, smart electrical receptacles and electrical circuit meters, spread throughout the residence. Electrical usage data is encrypted and securely transmitted from the customer’s Smart Hub to the electric provider, who is running their business on AWS.

Customers are able to analyze their electrical usage with fine granularity, per device, and over time. The goal of the Smart Hub is to enable the customers, using data, to reduce their electrical costs. The provider benefits from a reduction in load on the existing electrical grid and a better distribution of daily electrical load as customers shift usage to off-peak times to save money.

screen_shot_2020-01-13_at_7_57_47_pm_v4.pngPreview of post’s data in Amazon QuickSight.

The original concept for the Smart Hub was developed as part of a multi-day training and hackathon, I recently attended with an AWSome group of AWS Solutions Architects in San Francisco. As a team, we developed the concept of the Smart Hub integrated with a real-time, serverless, streaming data architecture, leveraging AWS IoT Core, Amazon Kinesis, AWS Lambda, and Amazon DynamoDB.

SA_Team_PhotoFrom left: Bruno Giorgini, Mahalingam (‘Mahali’) Sivaprakasam, Gary Stafford, Amit Kumar Agrawal, and Manish Agarwal.

This post will focus on data analysis, as opposed to the real-time streaming aspect of data capture or how the data is persisted on AWS.

athena-glue-architecture-v2High-level AWS architecture diagram of the demonstration.

Featured Technologies

The following AWS services and open-source technologies are featured prominently in this post.

Athena-Glue-v2.png

Amazon S3-based Data Lake

Screen Shot 2020-01-02 at 5.09.05 PMAn Amazon S3-based Data Lake uses Amazon S3 as its primary storage platform. Amazon S3 provides an optimal foundation for a data lake because of its virtually unlimited scalability, from gigabytes to petabytes of content. Amazon S3 provides ‘11 nines’ (99.999999999%) durability. It has scalable performance, ease-of-use features, and native encryption and access control capabilities.

AWS Glue

Screen Shot 2020-01-02 at 5.11.37 PMAWS Glue is a fully managed extract, transform, and load (ETL) service to prepare and load data for analytics. AWS Glue discovers your data and stores the associated metadata (e.g., table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

AWS Glue Data Catalog

Screen Shot 2020-01-02 at 5.13.01 PM.pngThe AWS Glue Data Catalog is an Apache Hive Metastore compatible, central repository to store structural and operational metadata for data assets. For a given data set, store table definition, physical location, add business-relevant attributes, as well as track how the data has changed over time.

AWS Glue Crawler

Screen Shot 2020-01-02 at 5.14.57 PMAn AWS Glue Crawler connects to a data store, progresses through a prioritized list of classifiers to extract the schema of your data and other statistics, and then populates the Glue Data Catalog with this metadata. Crawlers can run periodically to detect the availability of new data as well as changes to existing data, including table definition changes. Crawlers automatically add new tables, new partitions to an existing table, and new versions of table definitions. You can even customize Glue Crawlers to classify your own file types.

AWS Glue ETL Job

Screen Shot 2020-01-02 at 5.11.37 PMAn AWS Glue ETL Job is the business logic that performs extract, transform, and load (ETL) work in AWS Glue. When you start a job, AWS Glue runs a script that extracts data from sources, transforms the data, and loads it into targets. AWS Glue generates a PySpark or Scala script, which runs on Apache Spark.

Amazon Athena

Screen Shot 2020-01-02 at 5.17.49 PMAmazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena supports and works with a variety of standard data formats, including CSV, JSON, Apache ORC, Apache Avro, and Apache Parquet. Athena is integrated, out-of-the-box, with AWS Glue Data Catalog. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

The underlying technology behind Amazon Athena is Presto, the open-source distributed SQL query engine for big data, created by Facebook. According to the AWS, the Athena query engine is based on Presto 0.172 (released April 9, 2017). In addition to Presto, Athena uses Apache Hive to define tables.

Amazon QuickSight

Screen Shot 2020-01-02 at 5.18.40 PMAmazon QuickSight is a fully managed business intelligence (BI) service. QuickSight lets you create and publish interactive dashboards that can then be accessed from any device, and embedded into your applications, portals, and websites.

AWS Lambda

Screen Shot 2020-01-02 at 5.25.57 PMAWS Lambda automatically runs code without requiring the provisioning or management servers. AWS Lambda automatically scales applications by running code in response to triggers. Lambda code runs in parallel. With AWS Lambda, you are charged for every 100ms your code executes and the number of times your code is triggered. You pay only for the compute time you consume.

Smart Hub Data

Everything in this post revolves around data. For the post’s demonstration, we will start with four categories of raw, synthetic data. Those data categories include Smart Hub electrical usage data, Smart Hub sensor mapping data, Smart Hub residential locations data, and electrical rate data. To demonstrate the capabilities of AWS Glue to handle multiple data formats, the four categories of raw data consist of three distinct file formats: XML, JSON, and CSV. I have attempted to incorporate as many ‘real-world’ complexities into the data without losing focus on the main subject of the post. The sample datasets are intentionally small to keep your AWS costs to a minimum for the demonstration.

To further reduce costs, we will use a variety of data partitioning schemes. According to AWS, by partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost. We have very little data for the demonstration, in which case partitioning may negatively impact query performance. However, in a ‘real-world’ scenario, there would be millions of potential residential customers generating terabytes of data. In that case, data partitioning would be essential for both cost and performance.

Smart Hub Electrical Usage Data

The Smart Hub’s time-series electrical usage data is collected from the customer’s Smart Hub. In the demonstration’s sample electrical usage data, each row represents a completely arbitrary five-minute time interval. There are a total of ten electrical sensors whose electrical usage in kilowatt-hours (kW) is recorded and transmitted. Each Smart Hub records and transmits electrical usage for 10 device sensors, 288 times per day (24 hr / 5 min intervals), for a total of 2,880 data points per day, per Smart Hub. There are two days worth of usage data for the demonstration, for a total of 5,760 data points. The data is stored in JSON Lines format. The usage data will be partitioned in the Amazon S3-based data lake by date (e.g., ‘dt=2019-12-21’).

{"loc_id":"b6a8d42425fde548","ts":1576915200,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04167}}
{"loc_id":"b6a8d42425fde548","ts":1576915500,"data":{"s_01":0,"s_02":0.00552,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04147}}
{"loc_id":"b6a8d42425fde548","ts":1576915800,"data":{"s_01":0.29267,"s_02":0.00642,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04207}}
{"loc_id":"b6a8d42425fde548","ts":1576916100,"data":{"s_01":0.29207,"s_02":0.00592,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04137}}
{"loc_id":"b6a8d42425fde548","ts":1576916400,"data":{"s_01":0.29217,"s_02":0.00622,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04157}}
{"loc_id":"b6a8d42425fde548","ts":1576916700,"data":{"s_01":0,"s_02":0.00562,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04197}}
{"loc_id":"b6a8d42425fde548","ts":1576917000,"data":{"s_01":0,"s_02":0.00512,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04257}}
{"loc_id":"b6a8d42425fde548","ts":1576917300,"data":{"s_01":0,"s_02":0.00522,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04177}}
{"loc_id":"b6a8d42425fde548","ts":1576917600,"data":{"s_01":0,"s_02":0.00502,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04267}}
{"loc_id":"b6a8d42425fde548","ts":1576917900,"data":{"s_01":0,"s_02":0.00612,"s_03":0,"s_04":0,"s_05":0,"s_06":0,"s_07":0,"s_08":0,"s_09":0,"s_10":0.04237}}

view raw
smart_data.json
hosted with ❤ by GitHub

Note the electrical usage data contains nested data. The electrical usage for each of the ten sensors is contained in a JSON array, within each time series entry. The array contains ten numeric values of type, double.

{
"loc_id": "b6a8d42425fde548",
"ts": 1576916400,
"data": {
"s_01": 0.29217,
"s_02": 0.00622,
"s_03": 0,
"s_04": 0,
"s_05": 0,
"s_06": 0,
"s_07": 0,
"s_08": 0,
"s_09": 0,
"s_10": 0.04157
}
}

Real data is often complex and deeply nested. Later in the post, we will see that AWS Glue can map many common data types, including nested data objects, as illustrated below.

screen_shot_2020-01-05_at_7_46_19_am

Smart Hub Sensor Mappings

The Smart Hub sensor mappings data maps a sensor column in the usage data (e.g., ‘s_01’ to the corresponding actual device (e.g., ‘Central Air Conditioner’). The data contains the device location, wattage, and the last time the record was modified. The data is also stored in JSON Lines format. The sensor mappings data will be partitioned in the Amazon S3-based data lake by the state of the residence (e.g., ‘state=or’ for Oregon).

{"loc_id":"b6a8d42425fde548","id":"s_01","description":"Central Air Conditioner","location":"N/A","watts":3500,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_02","description":"Ceiling Fan","location":"Master Bedroom","watts":65,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_03","description":"Clothes Dryer","location":"Basement","watts":5000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_04","description":"Clothes Washer","location":"Basement","watts":1800,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_05","description":"Dishwasher","location":"Kitchen","watts":900,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_06","description":"Flat Screen TV","location":"Living Room","watts":120,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_07","description":"Microwave Oven","location":"Kitchen","watts":1000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_08","description":"Coffee Maker","location":"Kitchen","watts":900,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_09","description":"Hair Dryer","location":"Master Bathroom","watts":2000,"last_modified":1559347200}
{"loc_id":"b6a8d42425fde548","id":"s_10","description":"Refrigerator","location":"Kitchen","watts":500,"last_modified":1559347200}

view raw
sensor_mappings.json
hosted with ❤ by GitHub

Smart Hub Locations

The Smart Hub locations data contains the geospatial coordinates, home address, and timezone for each residential Smart Hub. The data is stored in CSV format. The data for the four cities included in this demonstration originated from OpenAddresses, ‘the free and open global address collection.’ There are approximately 4k location records. The location data will be partitioned in the Amazon S3-based data lake by the state of the residence where the Smart Hub is installed (e.g., ‘state=or’ for Oregon).


lon lat number street unit city district region postcode id hash tz
-122.8077278 45.4715614 6635 SW JUNIPER TER 97008 b6a8d42425fde548 America/Los_Angeles
-122.8356634 45.4385864 11225 SW PINTAIL LOOP 97007 08ae3df798df8b90 America/Los_Angeles
-122.8252379 45.4481709 9930 SW WRANGLER PL 97008 1c7e1f7df752663e America/Los_Angeles
-122.8354211 45.4535977 9174 SW PLATINUM PL 97007 b364854408ee431e America/Los_Angeles
-122.8315771 45.4949449 15040 SW MILLIKAN WAY # 233 97003 0e97796ba31ba3b4 America/Los_Angeles
-122.7950339 45.4470259 10006 SW CONESTOGA DR # 113 97008 2b5307be5bfeb026 America/Los_Angeles
-122.8072836 45.4908594 12600 SW CRESCENT ST # 126 97005 4d74167f00f63f50 America/Los_Angeles
-122.8211801 45.4689303 7100 SW 140TH PL 97008 c5568631f0b9de9c America/Los_Angeles
-122.831154 45.4317057 15050 SW MALLARD DR # 101 97007 dbd1321080ce9682 America/Los_Angeles
-122.8162856 45.4442878 10460 SW 136TH PL 97008 008faab8a9a3e519 America/Los_Angeles

Electrical Rates

Lastly, the electrical rate data contains the cost of electricity. In this demonstration, the assumption is that the rate varies by state, by month, and by the hour of the day. The data is stored in XML, a data export format still common to older, legacy systems. The electrical rate data will not be partitioned in the Amazon S3-based data lake.

<?xml version="1.0" encoding="UTF-8"?>
<root>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>19:00:00</from>
<to>19:59:59</to>
<type>peak</type>
<rate>12.623</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>20:00:00</from>
<to>20:59:59</to>
<type>partial-peak</type>
<rate>7.232</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>21:00:00</from>
<to>21:59:59</to>
<type>partial-peak</type>
<rate>7.232</rate>
</row>
<row>
<state>or</state>
<year>2019</year>
<month>12</month>
<from>22:00:00</from>
<to>22:59:59</to>
<type>off-peak</type>
<rate>4.209</rate>
</row>
</root>

view raw
rates.xml
hosted with ❤ by GitHub

Data Analysis Process

Due to the number of steps involved in the data analysis process in the demonstration, I have divided the process into four logical stages: 1) Raw Data Ingestion, 2) Data Transformation, 3) Data Enrichment, and 4) Data Visualization and Business Intelligence (BI).

athena-glue-0.pngFull data analysis workflow diagram (click to enlarge…)

Raw Data Ingestion

In the Raw Data Ingestion stage, semi-structured CSV-, XML-, and JSON-format data files are copied to a secure Amazon Simple Storage Service (S3) bucket. Within the bucket, data files are organized into folders based on their physical data structure (schema). Due to the potentially unlimited number of data files, files are further organized (partitioned) into subfolders. Organizational strategies for data files are based on date, time, geographic location, customer id, or other common data characteristics.

This collection of semi-structured data files, S3 buckets, and partitions form what is referred to as a Data Lake. According to AWS, a data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.

A series of AWS Glue Crawlers process the raw CSV-, XML-, and JSON-format files, extracting metadata, and creating table definitions in the AWS Glue Data Catalog. According to AWS, an AWS Glue Data Catalog contains metadata tables, where each table specifies a single data store.

Athena-Glue-1

Data Transformation

In the Data Transformation stage, the raw data in the previous stage is transformed. Data transformation may include both modifying the data and changing the data format. Data modifications include data cleansing, re-casting data types, changing date formats, field-level computations, and field concatenation.

The data is then converted from CSV-, XML-, and JSON-format to Apache Parquet format and written back to the Amazon S3-based data lake. Apache Parquet is a compressed, efficient columnar storage format. Amazon Athena, like many Cloud-based services, charges you by the amount of data scanned per query. Hence, using data partitioning, bucketing, compression, and columnar storage formats, like Parquet, will reduce query cost.

Lastly, the transformed Parquet-format data is cataloged to new tables, alongside the raw CSV, XML, and JSON data, in the Glue Data Catalog.

Athena-Glue-2

Data Enrichment

According to ScienceDirect, data enrichment or augmentation is the process of enhancing existing information by supplementing missing or incomplete data. Typically, data enrichment is achieved by using external data sources, but that is not always the case.

Data Enrichment—the process of enhancing existing information by supplementing missing or incomplete data. –ScienceDirect

In the Data Enrichment stage, the Parquet-format Smart Hub usage data is augmented with related data from the three other data sources: sensor mappings, locations, and electrical rates. The customer’s Smart Hub usage data is enriched with the customer’s device types, the customer’s timezone, and customer’s electricity cost per monitored period based on the customer’s geographic location and time of day.

Athena-Glue-3a

Once the data is enriched, it is converted to Parquet and optimized for query performance, stored in the data lake, and cataloged. At this point, the original CSV-, XML-, and JSON-format raw data files, the transformed Parquet-format data files, and the Parquet-format enriched data files are all stored in the Amazon S3-based data lake and cataloged in the Glue Data Catalog.

Athena-Glue-3b

Data Visualization

In the final Data Visualization and Business Intelligence (BI) stage, the enriched data is presented and analyzed. There are many enterprise-grade services available for visualization and Business Intelligence, which integrate with Athena. Amazon services include Amazon QuickSight, Amazon EMR, and Amazon SageMaker. Third-party solutions from AWS Partners, available on the AWS Marketplace, include Tableau, Looker, Sisense, and Domo. In this demonstration, we will focus on Amazon QuickSight.

Athena-Glue-4

Getting Started

Requirements

To follow along with the demonstration, you will need an AWS Account and a current version of the AWS CLI. To get the most from the demonstration, you should also have Python 3 and jq installed in your work environment.

Source Code

All source code for this post can be found on GitHub. Use the following command to clone a copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/athena-glue-quicksight-demo.git

view raw
git_clone.sh
hosted with ❤ by GitHub

Source code samples in this post are displayed as GitHub Gists, which will not display correctly on some mobile and social media browsers.

TL;DR?

Just want the jump in without reading the instructions? All the AWS CLI commands, found within the post, are consolidated in the GitHub project’s README file.

CloudFormation Stack

To start, create the ‘smart-hub-athena-glue-stack’ CloudFormation stack using the smart-hub-athena-glue.yml template. The template will create (3) Amazon S3 buckets, (1) AWS Glue Data Catalog Database, (5) Data Catalog Database Tables, (6) AWS Glue Crawlers, (1) AWS Glue ETL Job, and (1) IAM Service Role for AWS Glue.

Make sure to change the DATA_BUCKET, SCRIPT_BUCKET, and LOG_BUCKET variables, first, to your own unique S3 bucket names. I always suggest using the standard AWS 3-part convention of 1) descriptive name, 2) AWS Account ID or Account Alias, and 3) AWS Region, to name your bucket (e.g. ‘smart-hub-data-123456789012-us-east-1’).

# *** CHANGE ME ***
BUCKET_SUFFIX="123456789012-us-east-1"
DATA_BUCKET="smart-hub-data-${BUCKET_SUFFIX}"
SCRIPT_BUCKET="smart-hub-scripts-${BUCKET_SUFFIX}"
LOG_BUCKET="smart-hub-logs-${BUCKET_SUFFIX}"
aws cloudformation create-stack \
–stack-name smart-hub-athena-glue-stack \
–template-body file://cloudformation/smart-hub-athena-glue.yml \
–parameters ParameterKey=DataBucketName,ParameterValue=${DATA_BUCKET} \
ParameterKey=ScriptBucketName,ParameterValue=${SCRIPT_BUCKET} \
ParameterKey=LogBucketName,ParameterValue=${LOG_BUCKET} \
–capabilities CAPABILITY_NAMED_IAM

view raw
step1-2.sh
hosted with ❤ by GitHub

Raw Data Files

Next, copy the raw CSV-, XML-, and JSON-format data files from the local project to the DATA_BUCKET S3 bucket (steps 1a-1b in workflow diagram). These files represent the beginnings of the S3-based data lake. Each category of data uses a different strategy for organizing and separating the files. Note the use of the Apache Hive-style partitions (e.g., /smart_hub_data_json/dt=2019-12-21). As discussed earlier, the assumption is that the actual, large volume of data in the data lake would necessitate using partitioning to improve query performance.

# location data
aws s3 cp data/locations/denver_co_1576656000.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=co/
aws s3 cp data/locations/palo_alto_ca_1576742400.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ca/
aws s3 cp data/locations/portland_metro_or_1576742400.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=or/
aws s3 cp data/locations/stamford_ct_1576569600.csv \
s3://${DATA_BUCKET}/smart_hub_locations_csv/state=ct/
# sensor mapping data
aws s3 cp data/mappings/ \
s3://${DATA_BUCKET}/sensor_mappings_json/state=or/ \
–recursive
# electrical usage data
aws s3 cp data/usage/2019-12-21/ \
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-21/ \
–recursive
aws s3 cp data/usage/2019-12-22/ \
s3://${DATA_BUCKET}/smart_hub_data_json/dt=2019-12-22/ \
–recursive
# electricity rates data
aws s3 cp data/rates/ \
s3://${DATA_BUCKET}/electricity_rates_xml/ \
–recursive

view raw
step3.sh
hosted with ❤ by GitHub

Confirm the contents of the DATA_BUCKET S3 bucket with the following command.

aws s3 ls s3://${DATA_BUCKET}/ \
–recursive –human-readable –summarize

view raw
step3.sh
hosted with ❤ by GitHub

There should be a total of (14) raw data files in the DATA_BUCKET S3 bucket.

2020-01-04 14:39:51 20.0 KiB electricity_rates_xml/2019_12_1575270000.xml
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/08ae3df798df8b90_1550908800.json
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/1c7e1f7df752663e_1559347200.json
2020-01-04 14:39:46 1.3 KiB sensor_mappings_json/state=or/b6a8d42425fde548_1568314800.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/08ae3df798df8b90_1576915200.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/1c7e1f7df752663e_1576915200.json
2020-01-04 14:39:47 44.9 KiB smart_hub_data_json/dt=2019-12-21/b6a8d42425fde548_1576915200.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/08ae3df798df8b90_15770016000.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/1c7e1f7df752663e_1577001600.json
2020-01-04 14:39:49 44.6 KiB smart_hub_data_json/dt=2019-12-22/b6a8d42425fde548_15770016001.json
2020-01-04 14:39:39 89.7 KiB smart_hub_locations_csv/state=ca/palo_alto_ca_1576742400.csv
2020-01-04 14:39:37 84.2 KiB smart_hub_locations_csv/state=co/denver_co_1576656000.csv
2020-01-04 14:39:44 78.6 KiB smart_hub_locations_csv/state=ct/stamford_ct_1576569600.csv
2020-01-04 14:39:42 91.6 KiB smart_hub_locations_csv/state=or/portland_metro_or_1576742400.csv
Total Objects: 14
Total Size: 636.7 KiB

view raw
raw_data_files.txt
hosted with ❤ by GitHub

Lambda Functions

Next, package the (5) Python3.8-based AWS Lambda functions for deployment.

pushd lambdas/athena-json-to-parquet-data || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-csv-to-parquet-locations || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-json-to-parquet-mappings || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-complex-etl-query || exit
zip -r package.zip index.py
popd || exit
pushd lambdas/athena-parquet-to-parquet-elt-data || exit
zip -r package.zip index.py
popd || exit

view raw
step4.sh
hosted with ❤ by GitHub

Copy the five Lambda packages to the SCRIPT_BUCKET S3 bucket. The ZIP archive Lambda packages are accessed by the second CloudFormation stack, smart-hub-serverless. This CloudFormation stack, which creates the Lambda functions, will fail to deploy if the packages are not found in the SCRIPT_BUCKET S3 bucket.

I have chosen to place the packages in a different S3 bucket then the raw data files. In a real production environment, these two types of files would be separated, minimally, into separate buckets for security. Remember, only data should go into the data lake.

aws s3 cp lambdas/athena-json-to-parquet-data/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_data/
aws s3 cp lambdas/athena-csv-to-parquet-locations/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_csv_to_parquet_locations/
aws s3 cp lambdas/athena-json-to-parquet-mappings/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_json_to_parquet_mappings/
aws s3 cp lambdas/athena-complex-etl-query/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_complex_etl_query/
aws s3 cp lambdas/athena-parquet-to-parquet-elt-data/package.zip \
s3://${SCRIPT_BUCKET}/lambdas/athena_parquet_to_parquet_elt_data/

view raw
step5.sh
hosted with ❤ by GitHub

Create the second ‘smart-hub-lambda-stack’ CloudFormation stack using the smart-hub-lambda.yml CloudFormation template. The template will create (5) AWS Lambda functions and (1) Lambda execution IAM Service Role.

aws cloudformation create-stack \
–stack-name smart-hub-lambda-stack \
–template-body file://cloudformation/smart-hub-lambda.yml \
–capabilities CAPABILITY_NAMED_IAM

view raw
step6.sh
hosted with ❤ by GitHub

At this point, we have deployed all of the AWS resources required for the demonstration using CloudFormation. We have also copied all of the raw CSV-, XML-, and JSON-format data files in the Amazon S3-based data lake.

AWS Glue Crawlers

If you recall, we created five tables in the Glue Data Catalog database as part of the CloudFormation stack. One table for each of the four raw data types and one table to hold temporary ELT data later in the demonstration. To confirm the five tables were created in the Glue Data Catalog database, use the Glue Data Catalog Console, or run the following AWS CLI / jq command.

aws glue get-tables \
–database-name smart_hub_data_catalog \
| jq -r '.TableList[].Name'

view raw
step8.sh
hosted with ❤ by GitHub

The five data catalog tables should be as follows.

electricity_rates_xml
etl_tmp_output_parquet
sensor_mappings_json
smart_hub_data_json
smart_hub_locations_csv

view raw
step8.txt
hosted with ❤ by GitHub

We also created six Glue Crawlers as part of the CloudFormation template. Four of these Crawlers are responsible for cataloging the raw CSV-, XML-, and JSON-format data from S3 into the corresponding, existing Glue Data Catalog database tables. The Crawlers will detect any new partitions and add those to the tables as well. Each Crawler corresponds to one of the four raw data types. Crawlers can be scheduled to run periodically, cataloging new data and updating data partitions. Crawlers will also create a Data Catalog database tables. We use Crawlers to create new tables, later in the post.

Run the four Glue Crawlers using the AWS CLI (step 1c in workflow diagram).

aws glue start-crawler –name smart-hub-locations-csv
aws glue start-crawler –name smart-hub-sensor-mappings-json
aws glue start-crawler –name smart-hub-data-json
aws glue start-crawler –name smart-hub-rates-xml

view raw
step7.sh
hosted with ❤ by GitHub

You can check the Glue Crawler Console to ensure the four Crawlers finished successfully.

screen_shot_2020-01-03_at_3_05_29_pm

Alternately, use another AWS CLI / jq command.

aws glue get-crawler-metrics \
| jq -r '.CrawlerMetricsList[] | "\(.CrawlerName): \(.StillEstimating), \(.TimeLeftSeconds)"' \
| grep "^smart-hub-[A-Za-z-]*"

view raw
step8.sh
hosted with ❤ by GitHub

When complete, all Crawlers should all be in a state of ‘Still Estimating = false’ and ‘TimeLeftSeconds = 0’. In my experience, the Crawlers can take up one minute to start, after the estimation stage, and one minute to stop when complete.

smart-hub-data-json: true, 0
smart-hub-etl-tmp-output-parquet: false, 0
smart-hub-locations-csv: false, 15
smart-hub-rates-parquet: false, 0
smart-hub-rates-xml: false, 15
smart-hub-sensor-mappings-json: false, 15

view raw
step8.txt
hosted with ❤ by GitHub

Successfully running the four Crawlers completes the Raw Data Ingestion stage of the demonstration.

Converting to Parquet with CTAS

With the Raw Data Ingestion stage completed, we will now transform the raw Smart Hub usage data, sensor mapping data, and locations data into Parquet-format using three AWS Lambda functions. Each Lambda subsequently calls Athena, which executes a CREATE TABLE AS SELECT SQL statement (aka CTAS) . Each Lambda executes a similar command, varying only by data source, data destination, and partitioning scheme. Below, is an example of the command used for the Smart Hub electrical usage data, taken from the Python-based Lambda, athena-json-to-parquet-data/index.py.

query = \
"CREATE TABLE IF NOT EXISTS " + data_catalog + "." + output_directory + " " \
"WITH ( " \
" format = 'PARQUET', " \
" parquet_compression = 'SNAPPY', " \
" partitioned_by = ARRAY['dt'], " \
" external_location = 's3://" + data_bucket + "/" + output_directory + "' " \
") AS " \
"SELECT * " \
"FROM " + data_catalog + "." + input_directory + ";"

view raw
athena_command.py
hosted with ❤ by GitHub

This compact, yet powerful CTAS statement converts a copy of the raw JSON- and CSV-format data files into Parquet-format, and partitions and stores the resulting files back into the S3-based data lake. Additionally, the CTAS SQL statement catalogs the Parquet-format data files into the Glue Data Catalog database, into new tables. Unfortunately, this method will not work for the XML-format raw data files, which we will tackle next.

The five deployed Lambda functions should be visible from the Lambda Console’s Functions tab.

screen_shot_2020-01-04_at_5_57_31_pm

Invoke the three Lambda functions using the AWS CLI. (part of step 2a in workflow diagram).

aws lambda invoke \
–function-name athena-json-to-parquet-data \
response.json
aws lambda invoke \
–function-name athena-csv-to-parquet-locations \
response.json
aws lambda invoke \
–function-name athena-json-to-parquet-mappings \
response.json

view raw
step9.sh
hosted with ❤ by GitHub

Here is an example of the same CTAS command, shown above for the Smart Hub electrical usage data, as it is was executed successfully by Athena.

CREATE TABLE IF NOT EXISTS smart_hub_data_catalog.smart_hub_data_parquet
WITH (format = 'PARQUET',
parquet_compression = 'SNAPPY',
partitioned_by = ARRAY['dt'],
external_location = 's3://smart-hub-data-demo-account-1-us-east-1/smart_hub_data_parquet')
AS
SELECT *
FROM smart_hub_data_catalog.smart_hub_data_json

view raw
athena_command.sql
hosted with ❤ by GitHub

We can view any Athena SQL query from the Athena Console’s History tab. Clicking on a query (in pink) will copy it to the Query Editor tab and execute it. Below, we see the three SQL statements executed by the Lamba functions.

screen_shot_2020-01-04_at_7_08_32_pm

AWS Glue ETL Job for XML

If you recall, the electrical rate data is in XML format. The Lambda functions we just executed, converted the CSV and JSON data to Parquet using Athena. Currently, unlike CSV, JSON, ORC, Parquet, and Avro, Athena does not support the older XML data format. For the XML data files, we will use an AWS Glue ETL Job to convert the XML data to Parquet. The Glue ETL Job is written in Python and uses Apache Spark, along with several AWS Glue PySpark extensions. For this job, I used an existing script created in the Glue ETL Jobs Console as a base, then modified the script to meet my needs.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
's3_output_path',
'source_glue_database',
'source_glue_table'
])
s3_output_path = args['s3_output_path']
source_glue_database = args['source_glue_database']
source_glue_table = args['source_glue_table']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext. \
create_dynamic_frame. \
from_catalog(database=source_glue_database,
table_name=source_glue_table,
transformation_ctx="datasource0")
applymapping1 = ApplyMapping.apply(
frame=datasource0,
mappings=[("from", "string", "from", "string"),
("to", "string", "to", "string"),
("type", "string", "type", "string"),
("rate", "double", "rate", "double"),
("year", "int", "year", "int"),
("month", "int", "month", "int"),
("state", "string", "state", "string")],
transformation_ctx="applymapping1")
resolvechoice2 = ResolveChoice.apply(
frame=applymapping1,
choice="make_struct",
transformation_ctx="resolvechoice2")
dropnullfields3 = DropNullFields.apply(
frame=resolvechoice2,
transformation_ctx="dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_options(
frame=dropnullfields3,
connection_type="s3",
connection_options={
"path": s3_output_path,
"partitionKeys": ["state"]
},
format="parquet",
transformation_ctx="datasink4")
job.commit()

The three Python command-line arguments the script expects (lines 10–12, above) are defined in the CloudFormation template, smart-hub-athena-glue.yml. Below, we see them on lines 10–12 of the CloudFormation snippet. They are injected automatically when the job is run and can be overridden from the command line when starting the job.

GlueJobRatesToParquet:
Type: AWS::Glue::Job
Properties:
GlueVersion: 1.0
Command:
Name: glueetl
PythonVersion: 3
ScriptLocation: !Sub "s3://${ScriptBucketName}/glue_scripts/rates_xml_to_parquet.py"
DefaultArguments: {
"–s3_output_path": !Sub "s3://${DataBucketName}/electricity_rates_parquet",
"–source_glue_database": !Ref GlueDatabase,
"–source_glue_table": "electricity_rates_xml",
"–job-bookmark-option": "job-bookmark-enable",
"–enable-spark-ui": "true",
"–spark-event-logs-path": !Sub "s3://${LogBucketName}/glue-etl-jobs/"
}
Description: "Convert electrical rates XML data to Parquet"
ExecutionProperty:
MaxConcurrentRuns: 2
MaxRetries: 0
Name: rates-xml-to-parquet
Role: !GetAtt "CrawlerRole.Arn"
DependsOn:
CrawlerRole
GlueDatabase
DataBucket
ScriptBucket
LogBucket

view raw
elt-job-cfn.yml
hosted with ❤ by GitHub

First, copy the Glue ETL Job Python script to the SCRIPT_BUCKET S3 bucket.

aws s3 cp glue-scripts/rates_xml_to_parquet.py \
s3://${SCRIPT_BUCKET}/glue_scripts/

view raw
step10.sh
hosted with ❤ by GitHub

Next, start the Glue ETL Job (part of step 2a in workflow diagram). Although the conversion is a relatively simple set of tasks, the creation of the Apache Spark environment, to execute the tasks, will take several minutes. Whereas the Glue Crawlers took about 2 minutes on average, the Glue ETL Job could take 10–15 minutes in my experience. The actual execution time only takes about 1–2 minutes of the 10–15 minutes to complete. In my opinion, waiting up to 15 minutes is too long to be viable for ad-hoc jobs against smaller datasets; Glue ETL Jobs are definitely targeted for big data.

aws glue start-job-run –job-name rates-xml-to-parquet

view raw
step11.sh
hosted with ❤ by GitHub

To check on the status of the job, use the Glue ETL Jobs Console, or use the AWS CLI.

# get status of most recent job (the one that is running)
aws glue get-job-run \
–job-name rates-xml-to-parquet \
–run-id "$(aws glue get-job-runs \
–job-name rates-xml-to-parquet \
| jq -r '.JobRuns[0].Id')"

view raw
step11.sh
hosted with ❤ by GitHub

When complete, you should see results similar to the following. Note the ‘JobRunState’ is ‘SUCCEEDED.’ This particular job ran for a total of 14.92 minutes, while the actual execution time was 2.25 minutes.

{
"JobRun": {
"Id": "jr_f7186b26bf042ea7773ad08704d012d05299f080e7ac9b696ca8dd575f79506b",
"Attempt": 0,
"JobName": "rates-xml-to-parquet",
"StartedOn": 1578022390.301,
"LastModifiedOn": 1578023285.632,
"CompletedOn": 1578023285.632,
"JobRunState": "SUCCEEDED",
"PredecessorRuns": [],
"AllocatedCapacity": 10,
"ExecutionTime": 135,
"Timeout": 2880,
"MaxCapacity": 10.0,
"LogGroupName": "/aws-glue/jobs",
"GlueVersion": "1.0"
}
}

view raw
job-results.json
hosted with ❤ by GitHub

The job’s progress and the results are also visible in the AWS Glue Console’s ETL Jobs tab.

screen_shot_2020-01-04_at_7_42_51_pm

Detailed Apache Spark logs are also available in CloudWatch Management Console, which is accessible directly from the Logs link in the AWS Glue Console’s ETL Jobs tab.

screen_shot_2020-01-04_at_7_44_08_pm

The last step in the Data Transformation stage is to convert catalog the Parquet-format electrical rates data, created with the previous Glue ETL Job, using yet another Glue Crawler (part of step 2b in workflow diagram). Start the following Glue Crawler to catalog the Parquet-format electrical rates data.

aws glue start-crawler –name smart-hub-rates-parquet

view raw
step11b.sh
hosted with ❤ by GitHub

This concludes the Data Transformation stage. The raw and transformed data is in the data lake, and the following nine tables should exist in the Glue Data Catalog.

electricity_rates_parquet
electricity_rates_xml
etl_tmp_output_parquet
sensor_mappings_json
sensor_mappings_parquet
smart_hub_data_json
smart_hub_data_parquet
smart_hub_locations_csv
smart_hub_locations_parquet

If we examine the tables, we should observe the data partitions we used to organize the data files in the Amazon S3-based data lake are contained in the table metadata. Below, we see the four partitions, based on state, of the Parquet-format locations data.

screen_shot_2020-01-05_at_7_45_46_am

Data Enrichment

To begin the Data Enrichment stage, we will invoke the AWS Lambda, athena-complex-etl-query/index.py. This Lambda accepts input parameters (lines 28–30, below), passed in the Lambda handler’s event parameter. The arguments include the Smart Hub ID, the start date for the data requested, and the end date for the data requested. The scenario for the demonstration is that a customer with the location id value, using the electrical provider’s application, has requested data for a particular range of days (start date and end date), to visualize and analyze.

The Lambda executes a series of Athena INSERT INTO SQL statements, one statement for each of the possible Smart Hub connected electrical sensors, s_01 through s_10, for which there are values in the Smart Hub electrical usage data. Amazon just released the Amazon Athena INSERT INTO a table using the results of a SELECT query capability in September 2019, an essential addition to Athena. New Athena features are listed in the release notes.

Here, the SELECT query is actually a series of chained subqueries, using Presto SQL’s WITH clause capability. The queries join the Parquet-format Smart Hub electrical usage data sources in the S3-based data lake, with the other three Parquet-format, S3-based data sources: sensor mappings, locations, and electrical rates. The Parquet-format data is written as individual files to S3 and inserted into the existing ‘etl_tmp_output_parquet’ Glue Data Catalog database table. Compared to traditional relational database-based queries, the capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!

The capabilities of Glue and Athena to enable complex SQL queries across multiple semi-structured data files, stored in S3, is truly amazing!

Below, we see the SQL statement starting on line 43.

import boto3
import os
import logging
import json
from typing import Dict
# environment variables
data_catalog = os.getenv('DATA_CATALOG')
data_bucket = os.getenv('DATA_BUCKET')
# variables
output_directory = 'etl_tmp_output_parquet'
# uses list comprehension to generate the equivalent of:
# ['s_01', 's_02', …, 's_09', 's_10']
sensors = [f's_{i:02d}' for i in range(1, 11)]
# logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# athena client
athena_client = boto3.client('athena')
def handler(event, context):
args = {
"loc_id": event['loc_id'],
"date_from": event['date_from'],
"date_to": event['date_to']
}
athena_query(args)
return {
'statusCode': 200,
'body': json.dumps("function 'athena-complex-etl-query' complete")
}
def athena_query(args: Dict[str, str]):
for sensor in sensors:
query = \
"INSERT INTO " + data_catalog + "." + output_directory + " " \
"WITH " \
" t1 AS " \
" (SELECT d.loc_id, d.ts, d.data." + sensor + " AS kwh, l.state, l.tz " \
" FROM smart_hub_data_catalog.smart_hub_data_parquet d " \
" LEFT OUTER JOIN smart_hub_data_catalog.smart_hub_locations_parquet l " \
" ON d.loc_id = l.hash " \
" WHERE d.loc_id = '" + args['loc_id'] + "' " \
" AND d.dt BETWEEN cast('" + args['date_from'] + \
"' AS date) AND cast('" + args['date_to'] + "' AS date)), " \
" t2 AS " \
" (SELECT at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz) AS ts, " \
" date_format(at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz), '%H') AS rate_period, " \
" m.description AS device, m.location, t1.loc_id, t1.state, t1.tz, t1.kwh " \
" FROM t1 LEFT OUTER JOIN smart_hub_data_catalog.sensor_mappings_parquet m " \
" ON t1.loc_id = m.loc_id " \
" WHERE t1.loc_id = '" + args['loc_id'] + "' " \
" AND m.state = t1.state " \
" AND m.description = (SELECT m2.description " \
" FROM smart_hub_data_catalog.sensor_mappings_parquet m2 " \
" WHERE m2.loc_id = '" + args['loc_id'] + "' AND m2.id = '" + sensor + "')), " \
" t3 AS " \
" (SELECT substr(r.to, 1, 2) AS rate_period, r.type, r.rate, r.year, r.month, r.state " \
" FROM smart_hub_data_catalog.electricity_rates_parquet r " \
" WHERE r.year BETWEEN cast(date_format(cast('" + args['date_from'] + \
"' AS date), '%Y') AS integer) AND cast(date_format(cast('" + args['date_to'] + \
"' AS date), '%Y') AS integer)) " \
"SELECT replace(cast(t2.ts AS VARCHAR), concat(' ', t2.tz), '') AS ts, " \
" t2.device, t2.location, t3.type, t2.kwh, t3.rate AS cents_per_kwh, " \
" round(t2.kwh * t3.rate, 4) AS cost, t2.state, t2.loc_id " \
"FROM t2 LEFT OUTER JOIN t3 " \
" ON t2.rate_period = t3.rate_period " \
"WHERE t3.state = t2.state " \
"ORDER BY t2.ts, t2.device;"
logger.info(query)
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': data_catalog
},
ResultConfiguration={
'OutputLocation': 's3://' + data_bucket + '/tmp/' + output_directory
},
WorkGroup='primary'
)
logger.info(response)

view raw
athena_query.py
hosted with ❤ by GitHub

Below, is an example of one of the final queries, for the s_10 sensor, as executed by Athena. All the input parameter values, Python variables, and environment variables have been resolved into the query.

INSERT INTO smart_hub_data_catalog.etl_tmp_output_parquet
WITH t1 AS (SELECT d.loc_id, d.ts, d.data.s_10 AS kwh, l.state, l.tz
FROM smart_hub_data_catalog.smart_hub_data_parquet d
LEFT OUTER JOIN smart_hub_data_catalog.smart_hub_locations_parquet l ON d.loc_id = l.hash
WHERE d.loc_id = 'b6a8d42425fde548'
AND d.dt BETWEEN cast('2019-12-21' AS date) AND cast('2019-12-22' AS date)),
t2 AS (SELECT at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz) AS ts,
date_format(at_timezone(from_unixtime(t1.ts, 'UTC'), t1.tz), '%H') AS rate_period,
m.description AS device,
m.location,
t1.loc_id,
t1.state,
t1.tz,
t1.kwh
FROM t1
LEFT OUTER JOIN smart_hub_data_catalog.sensor_mappings_parquet m ON t1.loc_id = m.loc_id
WHERE t1.loc_id = 'b6a8d42425fde548'
AND m.state = t1.state
AND m.description = (SELECT m2.description
FROM smart_hub_data_catalog.sensor_mappings_parquet m2
WHERE m2.loc_id = 'b6a8d42425fde548'
AND m2.id = 's_10')),
t3 AS (SELECT substr(r.to, 1, 2) AS rate_period, r.type, r.rate, r.year, r.month, r.state
FROM smart_hub_data_catalog.electricity_rates_parquet r
WHERE r.year BETWEEN cast(date_format(cast('2019-12-21' AS date), '%Y') AS integer)
AND cast(date_format(cast('2019-12-22' AS date), '%Y') AS integer))
SELECT replace(cast(t2.ts AS VARCHAR), concat(' ', t2.tz), '') AS ts,
t2.device,
t2.location,
t3.type,
t2.kwh,
t3.rate AS cents_per_kwh,
round(t2.kwh * t3.rate, 4) AS cost,
t2.state,
t2.loc_id
FROM t2
LEFT OUTER JOIN t3 ON t2.rate_period = t3.rate_period
WHERE t3.state = t2.state
ORDER BY t2.ts, t2.device;

Along with enriching the data, the query performs additional data transformation using the other data sources. For example, the Unix timestamp is converted to a localized timestamp containing the date and time, according to the customer’s location (line 7, above). Transforming dates and times is a frequent, often painful, data analysis task. Another example of data enrichment is the augmentation of the data with a new, computed column. The column’s values are calculated using the values of two other columns (line 33, above).

Invoke the Lambda with the following three parameters in the payload (step 3a in workflow diagram).

aws lambda invoke \
–function-name athena-complex-etl-query \
–payload "{ \"loc_id\": \"b6a8d42425fde548\",
\"date_from\": \"2019-12-21\", \"date_to\": \"2019-12-22\"}" \
response.json

view raw
step12.sh
hosted with ❤ by GitHub

The ten INSERT INTO SQL statement’s result statuses (one per device sensor) are visible from the Athena Console’s History tab.

screen_shot_2020-01-05_at_9_17_23_pm

Each Athena query execution saves that query’s results to the S3-based data lake as individual, uncompressed Parquet-format data files. The data is partitioned in the Amazon S3-based data lake by the Smart Meter location ID (e.g., ‘loc_id=b6a8d42425fde548’).

Below is a snippet of the enriched data for a customer’s clothes washer (sensor ‘s_04’). Note the timestamp is now an actual date and time in the local timezone of the customer (e.g., ‘2019-12-21 20:10:00.000’). The sensor ID (‘s_04’) is replaced with the actual device name (‘Clothes Washer’). The location of the device (‘Basement’) and the type of electrical usage period (e.g. ‘peak’ or ‘partial-peak’) has been added. Finally, the cost column has been computed.


ts device location type kwh cents_per_kwh cost state loc_id
2019-12-21 19:40:00.000 Clothes Washer Basement peak 0.0 12.623 0.0 or b6a8d42425fde548
2019-12-21 19:45:00.000 Clothes Washer Basement peak 0.0 12.623 0.0 or b6a8d42425fde548
2019-12-21 19:50:00.000 Clothes Washer Basement peak 0.1501 12.623 1.8947 or b6a8d42425fde548
2019-12-21 19:55:00.000 Clothes Washer Basement peak 0.1497 12.623 1.8897 or b6a8d42425fde548
2019-12-21 20:00:00.000 Clothes Washer Basement partial-peak 0.1501 7.232 1.0855 or b6a8d42425fde548
2019-12-21 20:05:00.000 Clothes Washer Basement partial-peak 0.2248 7.232 1.6258 or b6a8d42425fde548
2019-12-21 20:10:00.000 Clothes Washer Basement partial-peak 0.2247 7.232 1.625 or b6a8d42425fde548
2019-12-21 20:15:00.000 Clothes Washer Basement partial-peak 0.2248 7.232 1.6258 or b6a8d42425fde548
2019-12-21 20:20:00.000 Clothes Washer Basement partial-peak 0.2253 7.232 1.6294 or b6a8d42425fde548
2019-12-21 20:25:00.000 Clothes Washer Basement partial-peak 0.151 7.232 1.092 or b6a8d42425fde548

view raw
elt_data.csv
hosted with ❤ by GitHub

To transform the enriched CSV-format data to Parquet-format, we need to catalog the CSV-format results using another Crawler, first (step 3d in workflow diagram).

aws glue start-crawler –name smart-hub-etl-tmp-output-parquet

view raw
step13.sh
hosted with ❤ by GitHub

Optimizing Enriched Data

The previous step created enriched Parquet-format data. However, this data is not as optimized for query efficiency as it should be. Using the Athena INSERT INTO WITH SQL statement, allowed the data to be partitioned. However, the method does not allow the Parquet data to be easily combined into larger files and compressed. To perform both these optimizations, we will use one last Lambda, athena-parquet-to-parquet-elt-data/index.py. The Lambda will create a new location in the Amazon S3-based data lake, containing all the enriched data, in a single file and compressed using Snappy compression.

aws lambda invoke \
–function-name athena-parquet-to-parquet-elt-data \
response.json

view raw
step14.sh
hosted with ❤ by GitHub

The resulting Parquet file is visible in the S3 Management Console.

screen_shot_2020-01-04_at_6_07_23_pm

The final step in the Data Enrichment stage is to catalog the optimized Parquet-format enriched ETL data. To catalog the data, run the following Glue Crawler (step 3i in workflow diagram

aws glue start-crawler –name smart-hub-etl-output-parquet

view raw
step15.sh
hosted with ❤ by GitHub

Final Data Lake and Data Catalog

We should now have the following ten top-level folders of partitioned data in the S3-based data lake. The ‘tmp’ folder may be ignored.

aws s3 ls s3://${DATA_BUCKET}/

view raw
step16.sh
hosted with ❤ by GitHub

PRE electricity_rates_parquet/
PRE electricity_rates_xml/
PRE etl_output_parquet/
PRE etl_tmp_output_parquet/
PRE sensor_mappings_json/
PRE sensor_mappings_parquet/
PRE smart_hub_data_json/
PRE smart_hub_data_parquet/
PRE smart_hub_locations_csv/
PRE smart_hub_locations_parquet/

Similarly, we should now have the following ten corresponding tables in the Glue Data Catalog. Use the AWS Glue Console to confirm the tables exist.

screen_shot_2020-01-04_at_8_30_50_pm

Alternately, use the following AWS CLI / jq command to list the table names.

aws glue get-tables \
–database-name smart_hub_data_catalog \
| jq -r '.TableList[].Name'

view raw
step17.sh
hosted with ❤ by GitHub

electricity_rates_parquet
electricity_rates_xml
etl_output_parquet
etl_tmp_output_parquet
sensor_mappings_json
sensor_mappings_parquet
smart_hub_data_json
smart_hub_data_parquet
smart_hub_locations_csv
smart_hub_locations_parquet

view raw
gistfile1.txt
hosted with ❤ by GitHub

‘Unknown’ Bug

You may have noticed the four tables created with the AWS Lambda functions, using the CTAS SQL statement, erroneously have the ‘Classification’ of ‘Unknown’ as opposed to ‘parquet’. I am not sure why, I believe it is a possible bug with the CTAS feature. It seems to have no adverse impact on the table’s functionality. However, to fix the issue, run the following set of commands. This aws glue update-table hack will switch the table’s ‘Classification’ to ‘parquet’.

database=smart_hub_data_catalog
tables=(smart_hub_locations_parquet sensor_mappings_parquet smart_hub_data_parquet etl_output_parquet)
for table in ${tables}; do
fixed_table=$(aws glue get-table \
–database-name "${database}" \
–name "${table}" \
| jq '.Table.Parameters.classification = "parquet" | del(.Table.DatabaseName) | del(.Table.CreateTime) | del(.Table.UpdateTime) | del(.Table.CreatedBy) | del(.Table.IsRegisteredWithLakeFormation)')
fixed_table=$(echo ${fixed_table} | jq .Table)
aws glue update-table \
–database-name "${database}" \
–table-input "${fixed_table}"
echo "table '${table}' classification changed to 'parquet'"
done

The results of the fix may be seen from the AWS Glue Console. All ten tables are now classified correctly.

screen_shot_2020-01-05_at_11_43_50_pm

Explore the Data

Before starting to visualize and analyze the data with Amazon QuickSight, try executing a few Athena queries against the tables in the Glue Data Catalog database, using the Athena Query Editor. Working in the Editor is the best way to understand the data, learn Athena, and debug SQL statements and queries. The Athena Query Editor has convenient developer features like SQL auto-complete and query formatting capabilities.

Be mindful when writing queries and searching the Internet for SQL references, the Athena query engine is based on Presto 0.172. The current version of Presto, 0.229, is more than 50 releases ahead of the current Athena version. Both Athena and Presto functionality has changed and diverged. There are additional considerations and limitations for SQL queries in Athena to be aware of.

screen_shot_2020-01-05_at_10_32_25_am

Here are a few simple, ad-hoc queries to run in the Athena Query Editor.

preview the final etl data
SELECT *
FROM smart_hub_data_catalog.etl_output_parquet
LIMIT 10;
total cost in $'s for each device, at location 'b6a8d42425fde548'
from high to low, on December 21, 2019
SELECT device,
concat('$', cast(cast(sum(cost) / 100 AS decimal(10, 2)) AS varchar)) AS total_cost
FROM smart_hub_data_catalog.etl_tmp_output_parquet
WHERE loc_id = 'b6a8d42425fde548'
AND date (cast(ts AS timestamp)) = date '2019-12-21'
GROUP BY device
ORDER BY total_cost DESC;
count of smart hub residential locations in Oregon and California,
grouped by zip code, sorted by count
SELECT DISTINCT postcode, upper(state), count(postcode) AS smart_hub_count
FROM smart_hub_data_catalog.smart_hub_locations_parquet
WHERE state IN ('or', 'ca')
AND length(cast(postcode AS varchar)) >= 5
GROUP BY state, postcode
ORDER BY smart_hub_count DESC, postcode;
electrical usage for the clothes washer
over a 30-minute period, on December 21, 2019
SELECT ts, device, location, type, cost
FROM smart_hub_data_catalog.etl_tmp_output_parquet
WHERE loc_id = 'b6a8d42425fde548'
AND device = 'Clothes Washer'
AND cast(ts AS timestamp)
BETWEEN timestamp '2019-12-21 08:45:00'
AND timestamp '2019-12-21 09:15:00'
ORDER BY ts;

view raw
athena_queries.sql
hosted with ❤ by GitHub

Cleaning Up

You may choose to save the AWS resources created in part one of this demonstration, to be used in part two. Since you are not actively running queries against the data, ongoing AWS costs will be minimal. If you eventually choose to clean up the AWS resources created in part one of this demonstration, execute the following AWS CLI commands. To avoid failures, make sure each command completes before running the subsequent command. You will need to confirm the CloudFormation stacks are deleted using the AWS CloudFormation Console or the AWS CLI. These commands will not remove Amazon QuickSight data sets, analyses, and dashboards created in part two. However, deleting the AWS Glue Data Catalog and the underlying data sources will impact the ability to visualize the data in QuickSight.

# delete s3 contents first
aws s3 rm s3://${DATA_BUCKET} –recursive
aws s3 rm s3://${SCRIPT_BUCKET} –recursive
aws s3 rm s3://${LOG_BUCKET} –recursive
# then, delete lambda cfn stack
aws cloudformation delete-stack –stack-name smart-hub-lambda-stack
# finally, delete athena-glue-s3 stack
aws cloudformation delete-stack –stack-name smart-hub-athena-glue-stack

view raw
step18.sh
hosted with ❤ by GitHub

Part Two

In part one, starting with raw, semi-structured data in multiple formats, we learned how to ingest, transform, and enrich that data using Amazon S3, AWS Glue, Amazon Athena, and AWS Lambda. We built an S3-based data lake and learned how AWS leverages open-source technologies, including Presto, Apache Hive, and Apache Parquet. In part two of this post, we will use the transformed and enriched datasets, stored in the data lake, to create compelling visualizations using Amazon QuickSight.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , , , ,

2 Comments

Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 2

Introduction

In Part 1 of this two-part post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on Amazon Elastic MapReduce (EMR). Further, we configured Zeppelin integrations with AWS Glue Data CatalogAmazon Relational Database Service (RDS) for PostgreSQL, and Amazon Simple Cloud Storage Service (S3) Data Lake. We also covered how to obtain the project’s source code from the two GitHub repositories, zeppelin-emr-demo and zeppelin-emr-config. Below is a high-level architectural diagram of the infrastructure we constructed in Part 1 for this demonstration.

EMR-Zeppelin

Part 2

In Part 2 of this post, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using a series of four Zeppelin notebooks. Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.

Notebook 1

The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.

Interpreters

When you open a notebook for the first time, you are given the choice of interpreters to bind and unbind to the notebook. The last interpreter in the list shown below, postgres, is the new PostgreSQL JDBC Zeppelin interpreter we created in Part 1 of this post. We will use this interpreter in Notebook 3.

screen-shot-2019-11-24-at-8_03_50-pm

Application Versions

The first two paragraphs of the notebook are used to confirm the version of Spark, Scala, OpenJDK, and Python we are using. Recall we updated the Spark and Python interpreters to use Python 3.

screen_shot_2019-11-26_at_6_58_33_pm

Helium Visualizations

If you recall from Part 1 of the post, we pre-installed several additional Helium Visualizations, including the Ultimate Pie Chart. Below, we see the use of the Spark SQL (%sql) interpreter to query a Spark DataFrame, return results, and visualize the data using the Ultimate Pie Chart.  In addition to the pie chart, we see the other pre-installed Helium visualizations proceeding the five default visualizations, in the menu bar. With Zeppelin, all we have to do is write Spark SQL queries against the Spark DataFrame created earlier in the notebook, and Zeppelin will handle the visualization. You have some basic controls over charts using the ‘settings’ option.

screen-shot-2019-11-24-at-8_06_56-pm

Building the Data Lake

Notebook 1 demonstrates how to read and write data to S3. We read and write the Bakery dataset to both CSV-format and Apache Parquet-format, using Spark (PySpark). We also write the results of Spark SQL queries, like the one above, in Parquet, to S3.

screen-shot-2019-11-24-at-8_20_18-pm

With Parquet, data may be split into multiple files, as shown in the S3 bucket directory below. Parquet is much faster to read into a Spark DataFrame than CSV. Spark provides support for both reading and writing Parquet files. We will write all of our data to Parquet in S3, making future re-use of the data much more efficient than downloading data from the Internet, like GroupLens or kaggle, or consuming CSV from S3.

screen-shot-2019-11-24-at-8_22_46-pm

Preview S3 Data

In addition to using the Zeppelin notebook, we can preview data right in the S3 bucket web interface using the Amazon S3 Select feature. This query in place feature is helpful to quickly understand the structure and content of new data files with which you want to interact within Zeppelin.

screen-shot-2019-11-24-at-8_23_33-pm

screen-shot-2019-11-24-at-8_23_40-pm

screen_shot_2019-11-28_at_7_41_49_pm.png

Saving Changes to GitHub

In Part 1, we configured Zeppelin to read and write the notebooks from your own copy of the GitHub notebook repository. Using the ‘version control’ menu item, changes made to the notebooks can be committed directly to GitHub.

screen-shot-2019-11-24-at-8_16_36-pm

screen-shot-2019-11-24-at-8_38_19-pm

In GitHub, note the committer is the zeppelin user.

screen_shot_2019-11-26_at_7_48_42_pm.png

Notebook 2

The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.

screen-shot-2019-11-24-at-8_41_31-pm

Multi-Node EMR Cluster

If you recall from Part 1, we waited to create this cluster due to the compute costs of running the cluster’s large EC2 instances. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.

Normalized Instance Hours
Understanding the costs of EMR requires understanding the concept of normalized instance hours. Cluster displayed in the EMR AWS Console contains two columns, ‘Elapsed time’ and ‘Normalized instance hours’. The ‘Elapsed time’ column reflects the actual wall-clock time the cluster was used. The ‘Normalized instance hours’ column indicates the approximate number of compute hours the cluster has used, rounded up to the nearest hour.

screen_shot_2019-11-28_at_6_09_38_pm

Normalized instance hours calculations are based on a normalization factor. The normalization factor ranges from 1 for a small instance, up to 64 for an 8xlarge. Based on the type and quantity of instances in our multi-node cluster, we would use approximately 56 compute hours (aka normalized instance hours) for every one hour of wall-clock time our EMR cluster is running. Note the multi-node cluster used in our demo, highlighted in yellow above. The cluster ran for two hours, which equated to 112 normalized instance hours.

Screen Shot 2019-12-16 at 9.48.59 PM.png

Create the Multi-Node Cluster

Create the multi-node EMR cluster using CloudFormation. Change the following nine variable values, then run the emr cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
EC2_KEY_NAME="your-key-name"
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"
GITHUB_ACCOUNT="your-account-name"
GITHUB_REPO="your-new-project-name"
GITHUB_TOKEN="your-token-value"
MASTER_INSTANCE_TYPE="m5.xlarge" # optional
CORE_INSTANCE_TYPE="m5.2xlarge" # optional
CORE_INSTANCE_COUNT=3 # optional

aws cloudformation create-stack \
    --stack-name zeppelin-emr-prod-stack \
    --template-body file://cloudformation/emr_cluster.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
                 ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \
                 ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \
                 ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \
                 ParameterKey=CoreInstanceType,ParameterValue=${CORE_INSTANCE_TYPE} \
                 ParameterKey=CoreInstanceCount,ParameterValue=${CORE_INSTANCE_COUNT} \
                 ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \
                 ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \
                 ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}

Use the Amazon EMR web interface to confirm the success of the CloudFormation stack. The fully-provisioned cluster should be in the ‘Waiting’ state when ready.

screen_shot_2019-11-26_at_4_58_05_pm

Configuring the EMR Cluster

Refer to Part 1 for the configuration steps necessary to prepare the EMR cluster and Zeppelin before continuing. Repeat all the steps used for the single-node cluster.

Monitoring with Ganglia

In Part 1, we installed Ganglia as part of creating the EMR cluster. Ganglia, according to its website, is a scalable distributed monitoring system for high-performance computing systems such as clusters and grids. Ganglia can be used to evaluate the performance of the single-node and multi-node EMR clusters. With Ganglia, we can easily view cluster and individual instance CPU, memory, and network I/O performance.

screen-shot-2019-11-24-at-8_46_46-pm
Ganglia Example: Cluster CPU

screen-shot-2019-11-24-at-8_48_44-pm
Ganglia Example: Cluster Memory

screen_shot_2019-11-26_at_5_18_51_pm
Ganglia Example: Cluster Network I/O

YARN Resource Manager

The YARN Resource Manager Web UI is also available on our EMR cluster. Using the Resource Manager, we can view the compute resource load on the cluster, as well as the individual EMR Core nodes. Below, we see that the multi-node cluster has 24 vCPUs and 72 GiB of memory available, split evenly across the three Core cluster nodes.

You might recall, the m5.2xlarge EC2 instance type, used for the three Core nodes, each contains 8 vCPUs and 32 GiB of memory. However, by default, although all 8 vCPUs are available for computation per node, only 24 GiB of the node’s 32 GiB of memory are available for computation. EMR ensures a portion of the memory on each node is reserved for other system processes. The maximum available memory is controlled by the YARN memory configuration option, yarn.scheduler.maximum-allocation-mb.

screen_shot_2019-11-26_at_5_15_00_pm

The YARN Resource Manager preview above shows the load on the Code nodes as Notebook 2 is executing the Spark SQL queries on the large MovieLens with 27MM ratings. Note that only 4 of the 24 vCPUs (16.6%) are in use, but that 70.25 of the 72 GiB (97.6%) of available memory is being used. According to Spark, because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth. In this case, memory appears to be the most constrained resource. Using memory-optimized instances, such as r4 or r5 instance types, might be more effective for the core nodes than the m5 instance types.

MovieLens Datasets

By changing one variable in the notebook, we can work with the latest, smaller GroupLens MovieLens dataset containing approximately 100k rows (ml-latest-small) or the larger dataset, containing approximately 27M rows (ml-latest). For this demo, try both datasets on both the single-node and multi-node clusters. Compare the Spark SQL paragraph execution times for each of the four variations, including single-node with the small dataset, single-node with the large dataset, multi-node with the small dataset, and multi-node with the large dataset. Observe how fast the SQL queries are executed on the single-node versus multi-node cluster. Try switching to a different Core node instance type, such as r5.2xlarge. Try creating a cluster with additional Core nodes. How is the compute time effected?

screen_shot_2019-11-26_at_5_02_34_pm

Terminate the multi-node EMR cluster to save yourself the expense before continuing to Notebook 3.

aws cloudformation delete-stack \
    --stack-name=zeppelin-emr-prod-stack

Notebook 3

The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.

screen_shot_2019-11-27_at_11_44_44_pm

Glue Crawlers

Before continuing with Notebook 3, run the two Glue Crawlers using the AWS CLI.

aws glue start-crawler --name bakery-transactions-crawler
aws glue start-crawler --name movie-ratings-crawler

The two Crawlers will create a total of seven tables in the Glue Data Catalog database.

screen_shot_2019-11-27_at_8_50_09_pm

If we examine the Glue Data Catalog database, we should now observe several tables, one for each dataset found in the S3 bucket. The location of each dataset is shown in the ‘Location’ column of the tables view.

screen-shot-2019-11-24-at-9_14_19-pm

From the Zeppelin notebook, we can even use Spark SQL to query the AWS Glue Data Catalog, itself, for its databases and the tables within them.

screen-shot-2019-11-24-at-9_12_52-pm

According to Amazon, the Glue Data Catalog tables and databases are containers for the metadata definitions that define a schema for underlying source data. Using Zeppelin’s SQL interpreter, we can query the Data Catalog database and return the underlying source data. The SQL query example, below, demonstrates how we can perform a join across two tables in the data catalog database, representing two different data sources, and return results.

screen-shot-2019-11-24-at-9_09_26-pm

Notebook 4

The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.

screen-shot-2019-11-27-at-11_26_34-am.png

First, we create a new schema and four related tables for the RDS PostgreSQL movie ratings database, using the Psycopg 2 PostgreSQL adapter for Python and the SQL file we copied to S3 in Part 1.

screen_shot_2019-11-27_at_11_09_42_am

The RDS database’s schema, shown below, approximates the schema of the four CSV files from the GroupLens MovieLens rating dataset we used in Notebook 2.

screen_shot_2019-11-22_at_12_55_25_pm

Since the schema of the PostgreSQL database matches the MovieLens dataset files, we can import the data from the CVS files, downloaded from GroupLens, directly into the RDS database, again using the Psycopg PostgreSQL adapter for Python.

screen_shot_2019-11-27_at_11_09_52_am

According to the Spark documentation, Spark SQL also includes a data source that can read data from other databases using JDBC. Using Spark’s JDBC capability and the PostgreSQL JDBC Driver we installed in Part 1, we can perform Spark SQL queries against the RDS database using PySpark (%spark.pyspark). Below, we see a paragraph example of reading the RDS database’s movies table, using Spark.

screen_shot_2019-11-27_at_11_10_01_am

As a third method of querying the RDS database, we can use the custom Zeppelin PostgreSQL JDBC interpreter (%postgres) we created in Part 1. Although the default driver of the JDBC interpreter is set as PostgreSQL, and the associated JAR is included with Zeppelin, we overrode that older JAR, with the latest PostgreSQL JDBC Driver JAR.

Using the %postgres interpreter, we query the RDS database’s public schema, and return the four database tables we created earlier in the notebook.

screen_shot_2019-11-27_at_11_10_26_am

Again, below, using the %postgres interpreter in the notebook’s paragraph, we query the RDS database and return data, which we then visualize using Zeppelin’s bar chart. Finally, note the use of Zeppelin Dynamic Forms in this example. Dynamic Forms allows Zeppelin to dynamically creates input forms, whose input values are then available to use programmatically. Here, we use two form input values to control the data returned from our query and the resulting visualization.

screen_shot_2019-11-27_at_11_10_54_am

Conclusion

In this two-part post, we learned how effectively Apache Zeppelin integrates with Amazon EMR. We also learned how to extend Zeppelin’s capabilities, using  AWS Glue, Amazon RDS, and Amazon S3 as a Data Lake. Beyond what was covered in this post, there are dozens of more Zeppelin and EMR features, as well as dozens of more AWS services that integrate with Zeppelin and EMR, for you to discover.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

1 Comment

Getting Started with Apache Zeppelin on Amazon EMR, using AWS Glue, RDS, and S3: Part 1

Introduction

There is little question big data analyticsdata scienceartificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last 3–5 years. Behind the hype cycles and marketing buzz, these technologies are having a significant influence on many aspects of our modern lives. Due to their popularity, commercial enterprises, academic institutions, and the public sector have all rushed to develop hardware and software solutions to decrease the barrier to entry and increase the velocity of ML and Data Scientists and Engineers.

screen_shot_2019-11-17_at_7_24_10_am1Data Science: 5-Year Search Trend (courtesy Google Trends)

screen_shot_2019-11-17_at_7_24_10_am2Machine Learning: 5-Year Search Trend (courtesy Google Trends)

Technologies

All three major cloud providers, Amazon Web Services (AWS), Microsoft Azure, and Google Cloud, have rapidly maturing big data analytics, data science, and AI and ML services. AWS, for example, introduced Amazon Elastic MapReduce (EMR) in 2009, primarily as an Apache Hadoop-based big data processing service. Since then, according to Amazon, EMR has evolved into a service that uses Apache SparkApache Hadoop, and several other leading open-source frameworks to quickly and cost-effectively process and analyze vast amounts of data. More recently, in late 2017, Amazon released SageMaker, a service that provides the ability to build, train, and deploy machine learning models quickly and securely.

Simultaneously, organizations are building solutions that integrate and enhance these Cloud-based big data analytics, data science, AI, and ML services. One such example is Apache Zeppelin. Similar to the immensely popular Project Jupyter and the newly open-sourced Netflix’s Polynote, Apache Zeppelin is a web-based, polyglot, computational notebook. Zeppelin enables data-driven, interactive data analytics and document collaboration using a number of interpreters such as Scala (with Apache Spark), Python (with Apache Spark), Spark SQL, JDBC, Markdown, Shell and so on. Zeppelin is one of the core applications supported natively by Amazon EMR.

screen_shot_2019-11-15_at_11_59_16_pm

In the following two-part post, we will explore the use of Apache Zeppelin on EMR for data science and data analytics using a series of Zeppelin notebooks. The notebooks feature the use of AWS Glue, the fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load data for analytics. The notebooks also feature the use of Amazon Relational Database Service (RDS) for PostgreSQL and Amazon Simple Cloud Storage Service (S3). Amazon S3 will serve as a Data Lake to store our unstructured data. Given the current choice of Zeppelin’s more than twenty different interpreters, we will use Python3 and Apache Spark, specifically Spark SQL and PySpark, for all notebooks.

zeppelin_header

We will build an economical single-node EMR cluster for data exploration, as well as a larger multi-node EMR cluster for analyzing large data sets. Amazon S3 will be used to store input and output data, while intermediate results are stored in the Hadoop Distributed File System (HDFS) on the EMR cluster. Amazon provides a good overview of EMR architecture. Below is a high-level architectural diagram of the infrastructure we will construct during Part 1 for this demonstration.

EMR-Zeppelin.png

Notebook Features

Below is an overview of each Zeppelin notebook with a link to view it using Zepl’s free Notebook Explorer. Zepl was founded by the same engineers that developed Apache Zeppelin, including Moonsoo Lee, Zepl CTO and creator for Apache Zeppelin. Zepl’s enterprise collaboration platform, built on Apache Zeppelin, enables both Data Science and AI/ML teams to collaborate around data.

Notebook 1

The first notebook uses a small 21k row kaggle dataset, Transactions from a Bakery. The notebook demonstrates Zeppelin’s integration capabilities with the Helium plugin system for adding new chart types, the use of Amazon S3 for data storage and retrieval, and the use of Apache Parquet, a compressed and efficient columnar data storage format, and Zeppelin’s storage integration with GitHub for notebook version control.

screen_shot_2019-11-27_at_11_13_53_am

Notebook 2

The second notebook demonstrates the use of a single-node and multi-node Amazon EMR cluster for the exploration and analysis of public datasets ranging from approximately 100k rows up to 27MM rows, using Zeppelin. We will use the latest GroupLens MovieLens rating datasets to examine the performance characteristics of Zeppelin, using Spark, on single- verses multi-node EMR clusters for analyzing big data using a variety of Amazon EC2 Instance Types.

screen_shot_2019-11-27_at_11_14_01_am

Notebook 3

The third notebook demonstrates Amazon EMR and Zeppelin’s integration capabilities with AWS Glue Data Catalog as an Apache Hive-compatible metastore for Spark SQL. We will create an Amazon S3-based Data Lake using the AWS Glue Data Catalog and a set of AWS Glue Crawlers.

screen_shot_2019-11-27_at_11_44_44_pm.png

Notebook 4

The fourth notebook demonstrates Zeppelin’s ability to integrate with an external data source. In this case, we will interact with data in an Amazon RDS PostgreSQL relational database using three methods, including the Psycopg 2 PostgreSQL adapter for Python, Spark’s native JDBC capability, and Zeppelin’s JDBC Interpreter.

screen-shot-2019-11-27-at-11_26_34-am

Demonstration

In Part 1 of the post, as a DataOps Engineer, we will create and configure the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3-based data lake. In Part 2 of this post, as a Data Scientist, we will explore Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the Zeppelin notebooks.

Source Code

The demonstration’s source code is contained in two public GitHub repositories. The first repository, zeppelin-emr-demo, includes the four Zeppelin notebooks, organized according to the conventions of Zeppelin’s pluggable notebook storage mechanisms.

.
├── 2ERVVKTCG
│   └── note.json
├── 2ERYY923A
│   └── note.json
├── 2ESH8DGFS
│   └── note.json
├── 2EUZKQXX7
│   └── note.json
├── LICENSE
└── README.md

Zeppelin GitHub Storage

During the demonstration, changes made to your copy of the Zeppelin notebooks running on EMR will be automatically pushed back to GitHub when a commit occurs. To accomplish this, instead of just cloning a local copy of my zeppelin-emr-demo project repository, you will want your own copy, within your personal GitHub account. You could folk my zeppelin-emr-demo GitHub repository or copy a clone into your own GitHub repository.

To make a copy of the project in your own GitHub account, first, create a new empty repository on GitHub, for example, ‘my-zeppelin-emr-demo-copy’. Then, execute the following commands from your terminal, to clone the original project repository to your local environment, and finally, push it to your GitHub account.

# change me
GITHUB_ACCOUNT="your-account-name" # i.e. garystafford
GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo-copy

# shallow clone into new directory
git clone --branch master \
    --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/zeppelin-emr-demo.git \
    ${GITHUB_REPO}

# re-initialize repository
cd ${GITHUB_REPO}
rm -rf .git
git init

# re-commit code
git add -A
git commit -m "Initial commit of my copy of zeppelin-emr-demo"

# push to your repo
git remote add origin \
    https://github.com/$GITHUB_ACCOUNT/$GITHUB_REPO.git
git push -u origin master

GitHub Personal Access Token

To automatically push changes to your GitHub repository when a commit occurs, Zeppelin will need a GitHub personal access token. Create a personal access token with the scope shown below. Be sure to keep the token secret. Make sure you do not accidentally check your token value into your source code on GitHub. To minimize the risk, change or delete the token after completing the demo.

screen_shot_2019-11-18_at_8_44_27_pm

The second repository, zeppelin-emr-config, contains the necessary bootstrap files, CloudFormation templates, and PostgreSQL DDL (Data Definition Language) SQL script.

.
├── LICENSE
├── README.md
├── bootstrap
│   ├── bootstrap.sh
│   ├── emr-config.json
│   ├── helium.json
├── cloudformation
│   ├── crawler.yml
│   ├── emr_single_node.yml
│   ├── emr_cluster.yml
│   └── rds_postgres.yml
└── sql
    └── ratings.sql

Use the following AWS CLI command to clone the GitHub repository to your local environment.

git clone --branch master \
    --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/zeppelin-emr-demo-setup.git

Requirements

To follow along with the demonstration, you will need an AWS Account, an existing Amazon S3 bucket to store EMR configuration and data, and an EC2 key pair. You will also need a current version of the AWS CLI installed in your work environment. Due to the particular EMR features, we will be using, I recommend using the us-east-1 AWS Region to create the demonstration’s resources.

# create secure emr config and data bucket
# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"

aws s3api create-bucket \
    --bucket ${ZEPPELIN_DEMO_BUCKET}
aws s3api put-public-access-block \
    --bucket ${ZEPPELIN_DEMO_BUCKET} \
    --public-access-block-configuration \
    BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

Copy Configuration Files to S3

To start, we need to copy three configuration files, bootstrap.sh, helium.json, and ratings.sql, from the zeppelin-emr-demo-setup project directory to our S3 bucket. Change the ZEPPELIN_DEMO_BUCKET variable value, then run the following s3 cp API command, using the AWS CLI. The three files will be copied to a bootstrap directory within your S3 bucket.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
aws s3 cp bootstrap/bootstrap.sh s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/
aws s3 cp bootstrap/helium.json s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/
aws s3 cp sql/ratings.sql s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/

Below, sample output from copying local files to S3.

screen-shot-2019-11-21-at-2_03_49-pm

Create AWS Resources

We will start by creating most of the required AWS resources for the demonstration using three CloudFormation templates. We will create a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, an AWS Glue Data Catalog database, two AWS Glue Crawlers, and a Glue IAM Role. We will wait to create the multi-node EMR cluster due to the compute costs of running large EC2 instances in the cluster. You should understand the cost of these resources before proceeding, and that you ensure they are destroyed immediately upon completion of the demonstration to minimize your expenses.

Single-Node EMR Cluster

We will start by creating the single-node Amazon EMR cluster, consisting of just one master node with no core or task nodes (a cluster of one). All operations will take place on the master node.

Default EMR Resources

The following EMR instructions assume you have already created at least one EMR cluster in the past, in your current AWS Region, using the EMR web interface with the ‘Create Cluster – Quick Options’ option. Creating a cluster this way creates several additional AWS resources, such as the EMR_EC2_DefaultRole EC2 instance profile, the default EMR_DefaultRole EMR IAM Role, and the default EMR S3 log bucket.

screen_shot_2019-11-17_at_5_46_56_pm.png

If you haven’t created any EMR clusters using the EMR ‘Create Cluster – Quick Options’ in the past, don’t worry, you can also create the required resources with a few quick AWS CLI commands. Change the following LOG_BUCKET variable value, then run the aws emr and aws s3api API commands, using the AWS CLI. The LOG_BUCKET variable value follows the convention of aws-logs-awsaccount-region. For example, aws-logs-012345678901-us-east-1.

# create emr roles
aws emr create-default-roles

# create log secure bucket
# change me
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"

aws s3api create-bucket --bucket ${LOG_BUCKET}
aws s3api put-public-access-block --bucket ${LOG_BUCKET} \
    --public-access-block-configuration \
    BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

The new EMR IAM Roles can be viewed in the IAM Roles web interface.

screen-shot-2019-11-21-at-2_05_35-pm

Often, I see tutorials that reference these default EMR resources from the AWS CLI or CloudFormation, without any understanding or explanation of how they are created.

EMR Bootstrap Script

As part of creating our EMR cluster, the CloudFormation template, emr_single_node.yml, will call the bootstrap script we copied to S3, earlier, bootstrap.sh. The bootstrap script pre-installs required Python and Linux software packages, and the PostgreSQL driver JAR. The bootstrap script also clones your copy of the zeppelin-emr-demo GitHub repository.

#!/bin/bash
set -ex

if [[ $# -ne 2 ]] ; then
    echo "Script requires two arguments"
    exit 1
fi

GITHUB_ACCOUNT=$1
GITHUB_REPO=$2

# install extra python packages
sudo python3 -m pip install psycopg2-binary boto3

# install extra linux packages
yes | sudo yum install git htop

# clone github repo
cd /tmp
git clone "https://github.com/${GITHUB_ACCOUNT}/${GITHUB_REPO}.git"

# install extra jars
POSTGRES_JAR="postgresql-42.2.8.jar"
wget -nv "https://jdbc.postgresql.org/download/${POSTGRES_JAR}"
sudo chown -R hadoop:hadoop ${POSTGRES_JAR}
mkdir -p /home/hadoop/extrajars/
cp ${POSTGRES_JAR} /home/hadoop/extrajars/

EMR Application Configuration

The EMR CloudFormation template will also modify the EMR cluster’s Spark and Zeppelin application configurations. Amongst other configuration properties, the template sets the default Python version to Python3, instructs Zeppelin to use the cloned GitHub notebook directory path, and adds the PostgreSQL Driver JAR to the JVM ClassPath. Below we can see the configuration properties applied to an existing EMR cluster.

screen-shot-2019-11-21-at-2_24_43-pm
EMR Application Versions

As of the date of this post, EMR is at version 5.28.0. Below, as shown in the EMR web interface, are the current (21) applications and frameworks available for installation on EMR.

emr-28.png

For this demo, we will install Apache Spark v2.4.4, Ganglia v3.7.2, and Zeppelin 0.8.2.

screen_shot_2019-11-17_at_8_32_17_pmApache Zeppelin: Web Interface

screen_shot_2019-11-13_at_5_40_12_pmApache Spark: DAG Visualization

screen_shot_2019-11-13_at_8_31_13_pmGanglia: Cluster CPU Monitoring

Create the EMR CloudFormation Stack

Change the following (7) variable values, then run the emr cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
EC2_KEY_NAME="your-key-name"
LOG_BUCKET="aws-logs-your_aws_account_id-your_region"
GITHUB_ACCOUNT="your-account-name" # i.e. garystafford
GITHUB_REPO="your-new-project-name" # i.e. my-zeppelin-emr-demo
GITHUB_TOKEN="your-token-value"
MASTER_INSTANCE_TYPE="m5.xlarge" # optional
 
aws cloudformation create-stack \
    --stack-name zeppelin-emr-dev-stack \
    --template-body file://cloudformation/emr_single_node.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
                 ParameterKey=Ec2KeyName,ParameterValue=${EC2_KEY_NAME} \
                 ParameterKey=LogBucket,ParameterValue=${LOG_BUCKET} \
                 ParameterKey=MasterInstanceType,ParameterValue=${MASTER_INSTANCE_TYPE} \
                 ParameterKey=GitHubAccount,ParameterValue=${GITHUB_ACCOUNT} \
                 ParameterKey=GitHubRepository,ParameterValue=${GITHUB_REPO} \
                 ParameterKey=GitHubToken,ParameterValue=${GITHUB_TOKEN}

You can use the Amazon EMR web interface to confirm the results of the CloudFormation stack. The cluster should be in the ‘Waiting’ state.

screen_shot_2019-11-15_at_7_42_09_pm

PostgreSQL on Amazon RDS

Next, create a simple, single-AZ, single-master, non-replicated Amazon RDS PostgreSQL database, using the included CloudFormation template, rds_postgres.yml. We will use this database in Notebook 4. For the demo, I have selected the current-generation general purpose db.m4.large EC2 instance type to run PostgreSQL. You can easily change the instance type to another RDS-supported instance type to suit your own needs.

Change the following (3) variable values, then run the cloudformation create-stack API command, using the AWS CLI.

# change me
DB_MASTER_USER="your-db-username" # i.e. masteruser
DB_MASTER_PASSWORD="your-db-password" # i.e. 5up3r53cr3tPa55w0rd
MASTER_INSTANCE_TYPE="db.m4.large" # optional
 
aws cloudformation create-stack \
    --stack-name zeppelin-rds-stack \
    --template-body file://cloudformation/rds_postgres.yml \
    --parameters ParameterKey=DBUser,ParameterValue=${DB_MASTER_USER} \
                 ParameterKey=DBPassword,ParameterValue=${DB_MASTER_PASSWORD} \
                 ParameterKey=DBInstanceClass,ParameterValue=${MASTER_INSTANCE_TYPE}

You can use the Amazon RDS web interface to confirm the results of the CloudFormation stack.

screen_shot_2019-11-17_at_8_06_44_pm.png

AWS Glue

Next, create the AWS Glue Data Catalog database, the Apache Hive-compatible metastore for Spark SQL, two AWS Glue Crawlers, and a Glue IAM Role (ZeppelinDemoCrawlerRole), using the included CloudFormation template, crawler.yml. The AWS Glue Data Catalog database will be used in Notebook 3.

Change the following variable value, then run the cloudformation create-stack API command, using the AWS CLI.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
aws cloudformation create-stack \
    --stack-name zeppelin-crawlers-stack \
    --template-body file://cloudformation/crawler.yml \
    --parameters ParameterKey=ZeppelinDemoBucket,ParameterValue=${ZEPPELIN_DEMO_BUCKET} \
    --capabilities CAPABILITY_NAMED_IAM

You can use the AWS Glue web interface to confirm the results of the CloudFormation stack. Note the Data Catalog database and the two Glue Crawlers. We will not run the two crawlers until Part 2 of the post, so no tables will exist in the Data Catalog database, yet.

screen_shot_2019-11-27_at_8_50_09_pmscreen_shot_2019-11-27_at_8_56_47_pm

At this point in the demonstration, you should have successfully created a single-node Amazon EMR cluster, an Amazon RDS PostgresSQL database, and several AWS Glue resources, all using CloudFormation templates.

screen_shot_2019-11-21_at_4_19_01_pm

Post-EMR Creation Configuration

RDS Security

For the new EMR cluster to communicate with the RDS PostgreSQL database, we need to ensure that port 5432 is open from the RDS database’s VPC security group, which is the default VPC security group, to the security groups of the EMR nodes. Obtain the Group ID of the ElasticMapReduce-master and ElasticMapReduce-slave Security Groups from the EMR web interface.

screen_shot_2019-11-20_at_11_51_10_am

Access the Security Group for the RDS database using the RDS web interface. Change the inbound rule for port 5432 to include both Security Group IDs.

screen_shot_2019-11-20_at_11_52_23_am

SSH to EMR Master Node

In addition to the bootstrap script and configurations, we already applied to the EMR cluster, we need to make several post-EMR creation configuration changes to the EMR cluster for our demonstration. These changes will require SSH’ing to the EMR cluster. Using the master node’s public DNS address and SSH command provided in the EMR web console, SSH into the master node.

screen_shot_2019-11-15_at_7_42_09_pm_v3

If you cannot access the node using SSH, check that port 22 is open on the associated EMR master node IAM Security Group (ElasticMapReduce-master) to your IP address or address range.

screen_shot_2019-11-15_at_7_42_09_pm_v2

screen_shot_2019-11-21_at_4_51_01_pm.png

Git Permissions

We need to change permissions on the git repository we installed during the EMR bootstrapping phase. Typically, with an EC2 instance, you perform operations as the ec2-user user. With Amazon EMR, you often perform actions as the hadoop user. With Zeppelin on EMR, the notebooks perform operations, including interacting with the git repository as the zeppelin user. As a result of the bootstrap.sh script, the contents of the git repository directory, /tmp/zeppelin-emr-demo/, are owned by the hadoop user and group by default.

screen_shot_2019-11-17_at_8_01_24_pm

We will change their owner to the zeppelin user and group. We could not perform this step as part of the bootstrap script since the the zeppelin user and group did not exist at the time the script was executed.

cd /tmp/zeppelin-emr-demo/
sudo chown -R zeppelin:zeppelin .

The results should look similar to the following output.

screen_shot_2019-11-17_at_8_02_16_pm

Pre-Install Visualization Packages

Next, we will pre-install several Apache Zeppelin Visualization packages. According to the Zeppelin website, an Apache Zeppelin Visualization is a pluggable package that can be loaded/unloaded on runtime through the Helium framework in Zeppelin. We can use them just like any other built-in visualization in the notebook. A Visualization is a javascript npm package. For example, here is a link to the ultimate-pie-chart on the public npm registry.

We can pre-load plugins by replacing the /usr/lib/zeppelin/conf/helium.json file with the version of helium.json we copied to S3, earlier, and restarting Zeppelin. If you have a lot of Visualizations or package types or use any DataOps automation to create EMR clusters, this approach is much more efficient and repeatable than manually loading plugins using the Zeppelin UI, each time you create a new EMR cluster. Below, the helium.json file, which pre-loads (8) Visualization packages.

{
    "enabled": {
        "ultimate-pie-chart": "ultimate-pie-chart@0.0.2",
        "ultimate-column-chart": "ultimate-column-chart@0.0.2",
        "ultimate-scatter-chart": "ultimate-scatter-chart@0.0.2",
        "ultimate-range-chart": "ultimate-range-chart@0.0.2",
        "ultimate-area-chart": "ultimate-area-chart@0.0.1",
        "ultimate-line-chart": "ultimate-line-chart@0.0.1",
        "zeppelin-bubblechart": "zeppelin-bubblechart@0.0.4",
        "zeppelin-highcharts-scatterplot": "zeppelin-highcharts-scatterplot@0.0.2"
    },
    "packageConfig": {},
    "bundleDisplayOrder": [
        "ultimate-pie-chart",
        "ultimate-column-chart",
        "ultimate-scatter-chart",
        "ultimate-range-chart",
        "ultimate-area-chart",
        "ultimate-line-chart",
        "zeppelin-bubblechart",
        "zeppelin-highcharts-scatterplot"
    ]
}

Run the following commands to load the plugins and adjust the permissions on the file.

# change me
ZEPPELIN_DEMO_BUCKET="your-bucket-name"
 
sudo aws s3 cp s3://${ZEPPELIN_DEMO_BUCKET}/bootstrap/helium.json \
    /usr/lib/zeppelin/conf/helium.json
sudo chown zeppelin:zeppelin /usr/lib/zeppelin/conf/helium.json

Create New JDBC Interpreter

Lastly, we need to create a new Zeppelin JDBC Interpreter to connect to our RDS database. By default, Zeppelin has several interpreters installed. You can review a list of available interpreters using the following command.

sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --list

screen_shot_2019-11-18_at_6_29_40_am

The new JDBC interpreter will allow us to connect to our RDS PostgreSQL database, using Java Database Connectivity (JDBC). First, ensure all available interpreters are installed, including the current Zeppelin JDBC driver (org.apache.zeppelin:zeppelin-jdbc:0.8.0) to /usr/lib/zeppelin/interpreter/jdbc.

Creating a new interpreter is a two-part process. In this stage, we install the required interpreter files on the master node using the following command. Then later, in the Zeppelin web interface, we will configure the new PostgreSQL JDBC interpreter. Note we must provide a unique name for the interpreter (I chose ‘postgres’ in this case), which we will refer to in part two of the interpreter creation process.

sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh --all
 
sudo sh /usr/lib/zeppelin/bin/install-interpreter.sh \
    --name "postgres" \
    --artifact org.apache.zeppelin:zeppelin-jdbc:0.8.0

To complete the post-EMR creation configuration on the master node, we must restart Zeppelin for our changes to take effect.

sudo stop zeppelin && sudo start zeppelin

In my experience, it could take 2–3 minutes for the Zeppelin UI to become fully responsive after a restart.

screen_shot_2019-11-18_at_10_01_54_pm

Zeppelin Web Interface Access

With all the EMR application configuration complete, we will access the Zeppelin web interface running on the master node. Use the Zeppelin connection information provided in the EMR web interface to setup SSH tunneling to the Zeppelin web interface, running on the master node. Using this method, we can also access the Spark History Server, Ganglia, and Hadoop Resource Manager web interfaces; all links are provided from EMR.

screen_shot_2019-11-15_at_7_42_09_pm

To set up a web connection to the applications installed on the EMR cluster, I am using FoxyProxy as a proxy management tool with Google Chrome.

screen_shot_2019-11-17_at_8_22_09_pm.png

If everything is working so far, you should see the Zeppelin web interface with all four Zeppelin notebooks available from the cloned GitHub repository. You will be logged in as the anonymous user. Zeppelin offers authentication for accessing notebooks on the EMR cluster. For brevity, we will not cover setting up authentication in Zeppelin, using Shiro Authentication.

screen_shot_2019-11-17_at_8_32_17_pm.png

To confirm the path to the local, cloned copy of the GitHub notebook repository, is correct, check the Notebook Repos interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen. The value should match the ZEPPELIN_NOTEBOOK_DIR configuration property value in the emr_single_node.yml CloudFormation template we executed earlier.

screen_shot_2019-11-18_at_10_04_23_pm

To confirm the Helium Visualizations were pre-installed correctly, using the helium.json file, open the Helium interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen.

screen_shot_2019-11-15_at_7_45_28_pmNote the enabled visualizations. And, it is easy to enable additional plugins through the web interface.screen_shot_2019-11-15_at_7_45_33_pm

New PostgreSQL JDBC Interpreter

If you recall, earlier, we install the required interpreter files on the master node using the following command using the bootstrap script. We will now complete the process of configuring the new PostgreSQL JDBC interpreter. Open the Interpreter interface, accessible under the Settings dropdown (anonymous user) in the upper right of the screen.

The title of the new interpreter must match the name we used to install the interpreter files, ‘postgres’. The interpreter group will be ‘jdbc’. There are, minimally, three properties we need to configure for your specific RDS database instance, including default.url, default.user, and default.password. These should match the values you used to create your RDS instance, earlier. Make sure to includes the database name in the default.url. An example is shown below.

default.url: jdbc:postgresql://zeppelin-demo.abcd1234efg56.us-east-1.rds.amazonaws.com:5432/ratings
default.user: masteruser
default.password: 5up3r53cr3tPa55w0rd

We also need to provide a path to the PostgreSQL driver JAR dependency. This path is the location where we placed the JAR using the bootstrap.sh script, earlier, /home/hadoop/extrajars/postgresql-42.2.8.jar. Save the new interpreter and make sure it starts successfully (shows a green icon).

screen_shot_2019-11-17_at_9_03_02_pm.png

screen_shot_2019-11-18_at_6_42_13_am

Switch Interpreters to Python 3

The last thing we need to do is change the Spark and Python interpreters to use Python 3 instead of the default Python 2. On the same screen you used to create a new interpreter, modify the Spark and Python interpreters. First, for the Python interpreter, change the zeppelin.python property to python3.

screen_shot_2019-11-18_at_3_30_39_pm

Next, for the Spark interpreter, change the zeppelin.pyspark.python property to python3.

screen_shot_2019-11-18_at_3_32_34_pm

Part 2

In Part 1 of this post, we created and configured the AWS resources required to demonstrate the use of Apache Zeppelin on EMR, using an AWS Glue Data Catalog, Amazon RDS PostgreSQL database, and an S3 data lake. In Part 2 of this post, we will explore some of Apache Zeppelin’s features and integration capabilities with a variety of AWS services using the notebooks.

All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.

, , , , , , , , ,

Leave a comment