Getting Started with Presto Federated Queries using Ahana’s PrestoDB Sandbox on AWS

Introduction

According to The Presto Foundation, Presto (aka PrestoDB), not to be confused with PrestoSQL, is an open-source, distributed, ANSI SQL compliant query engine. Presto is designed to run interactive ad-hoc analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Presto is used in production at an immense scale by many well-known organizations, including Facebook, Twitter, Uber, Alibaba, Airbnb, Netflix, Pinterest, Atlassian, Nasdaq, and more.

In the following post, we will gain a better understanding of Presto’s ability to execute federated queries, which join multiple disparate data sources without having to move the data. Additionally, we will explore Apache Hive, the Hive Metastore, Hive partitioned tables, and the Apache Parquet file format.

Presto on AWS

There are several options for Presto on AWS. AWS recommends Amazon EMR and Amazon Athena. Presto comes pre-installed on EMR 5.0.0 and later. The Athena query engine is a derivation of Presto 0.172 and does not support all of Presto’s native features. However, Athena has many comparable features and deep integrations with other AWS services. If you need full, fine-grain control, you could deploy and manage Presto, yourself, on Amazon EC2, Amazon ECS, or Amazon EKS. Lastly, you may decide to purchase a Presto distribution with commercial support from an AWS Partner, such as Ahana or Starburst. If your organization needs 24x7x365 production-grade support from experienced Presto engineers, this is an excellent choice.

Federated Queries

In a modern Enterprise, it is rare to find all data living in a monolithic datastore. Given the multitude of available data sources, internal and external to an organization, and the growing number of purpose-built databases, analytics engines must be able to join and aggregate data across many sources efficiently. AWS defines a federated query as a capability that ‘enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources.

Presto allows querying data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, and MongoDB. In fact, there are currently 24 different Presto data source connectors available. With Presto, we can write queries that join multiple disparate data sources, without moving the data. Below is a simple example of a Presto federated query statement that correlates a customer’s credit rating with their age and gender. The query federates two different data sources, a PostgreSQL database table, postgresql.public.customer, and an Apache Hive Metastore table, hive.default.customer_demographics, whose underlying data resides in Amazon S3.

WITH credit_demographics AS (
SELECT
(year (now()) c_birth_year) AS age,
cd_credit_rating AS credit_rating,
cd_gender AS gender,
count(cd_gender) AS gender_count
FROM
postgresql.public.customer
LEFT JOIN hive.default.customer_demographics ON c_current_cdemo_sk = cd_demo_sk
WHERE
c_birth_year IS NOT NULL
AND cd_credit_rating IS NOT NULL
AND lower(cd_credit_rating) != 'unknown'
AND cd_gender IS NOT NULL
GROUP BY
cd_credit_rating,
c_birth_year,
cd_gender
)
SELECT
age,
credit_rating,
gender,
gender_count
FROM
credit_demographics
WHERE
age BETWEEN 21 AND 65
ORDER BY
age,
credit_rating,
gender;

Ahana

The Linux Foundation’s Presto Foundation member, Ahana, was founded as the first company focused on bringing PrestoDB-based ad hoc analytics offerings to market and working to foster growth and evangelize the Presto community. Ahana’s mission is to simplify ad hoc analytics for organizations of all shapes and sizes. Ahana has been successful in raising seed funding, led by GV (formerly Google Ventures). Ahana’s founders have a wealth of previous experience in tech companies, including Alluxio, Kinetica, Couchbase, IBM, Apple, Splunk, and Teradata.

PrestoDB Sandbox

This post will use Ahana’s PrestoDB Sandbox, an Amazon Linux 2, AMI-based solution available on AWS Marketplace, to execute Presto federated queries.

Ahana’s PrestoDB Sandbox AMI allows you to easily get started with Presto to query data wherever your data resides. This AMI configures a single EC2 instance Sandbox to be both the Presto Coordinator and a Presto Worker. It comes with an Apache Hive Metastore backed by PostgreSQL bundled in. In addition, the following catalogs are bundled in to try, test, and prototype with Presto:

  • JMX: useful for monitoring and debugging Presto
  • Memory: stores data and metadata in RAM, which is discarded when Presto restarts
  • TPC-DS: provides a set of schemas to support the TPC Benchmark DS
  • TPC-H: provides a set of schemas to support the TPC Benchmark H

Apache Hive

In this demonstration, we will use Apache Hive and an Apache Hive Metastore backed by PostgreSQL. Apache Hive is data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. The structure can be projected onto data already in storage. A command-line tool and JDBC driver are provided to connect users to Hive. The Metastore provides two essential features of a data warehouse: data abstraction and data discovery. Hive accomplishes both features by providing a metadata repository that is tightly integrated with the Hive query processing system so that data and metadata are in sync.

Getting Started

To get started creating federated queries with Presto, we first need to create and configure our AWS environment, as shown below.

Architecture of the demonstration’s AWS environment and resources

Subscribe to Ahana’s PrestoDB Sandbox

To start, subscribe to Ahana’s PrestoDB Sandbox on AWS Marketplace. Make sure you are aware of the costs involved. The AWS current pricing for the default, Linux-based r5.xlarge on-demand EC2 instance hosted in US East (N. Virginia) is USD 0.252 per hour. For the demonstration, since performance is not an issue, you could try a smaller EC2 instance, such as r5.large instance costs USD 0.126 per hour.

The configuration process will lead you through the creation of an EC2 instance based on Ahana’s PrestoDB Sandbox AMI.

I chose to create the EC2 instance in my default VPC. Part of the demonstration includes connecting to Presto locally using JDBC. Therefore, it was also necessary to include a public IP address for the EC2 instance. If you chose to do so, I strongly recommend limiting the required ports 22 and 8080 in the instance’s Security Group to just your IP address (a /32 CIDR block).

Limiting access to ports 22 and 8080 from only my current IP address

Lastly, we need to assign an IAM Role to the EC2 instance, which has access to Amazon S3. I assigned the AWS managed policy, AmazonS3FullAccess, to the EC2’s IAM Role.

Attaching the AmazonS3FullAccess AWS managed policy to the Role

Part of the configuration also asks for a key pair. You can use an existing key or create a new key for the demo. For reference in future commands, I am using a key named ahana-presto and my key path of ~/.ssh/ahana-presto.pem. Be sure to update the commands to match your own key’s name and location.

Once complete, instructions for using the PrestoDB Sandbox EC2 are provided.

You can view the running EC2 instance, containing Presto, from the web-based AWS EC2 Management Console. Make sure to note the public IPv4 address or the public IPv4 DNS address as this value will be required during the demo.

AWS CloudFormation

We will use Amazon RDS for PostgreSQL and Amazon S3 as additional data sources for Presto. Included in the project files on GitHub is an AWS CloudFormation template, cloudformation/presto_ahana_demo.yaml. The template creates a single RDS for PostgreSQL instance in the default VPC and an encrypted Amazon S3 bucket.

AWSTemplateFormatVersion: "2010-09-09"
Description: "This template deploys a RDS PostgreSQL database and an Amazon S3 bucket"
Parameters:
DBInstanceIdentifier:
Type: String
Default: "ahana-prestodb-demo"
DBEngine:
Type: String
Default: "postgres"
DBEngineVersion:
Type: String
Default: "12.3"
DBAvailabilityZone:
Type: String
Default: "us-east-1f"
DBInstanceClass:
Type: String
Default: "db.t3.medium"
DBStorageType:
Type: String
Default: "gp2"
DBAllocatedStorage:
Type: Number
Default: 20
DBName:
Type: String
Default: "shipping"
DBUser:
Type: String
Default: "presto"
DBPassword:
Type: String
Default: "5up3r53cr3tPa55w0rd"
# NoEcho: True
Resources:
MasterDatabase:
Type: AWS::RDS::DBInstance
Properties:
DBInstanceIdentifier:
Ref: DBInstanceIdentifier
DBName:
Ref: DBName
AllocatedStorage:
Ref: DBAllocatedStorage
DBInstanceClass:
Ref: DBInstanceClass
StorageType:
Ref: DBStorageType
Engine:
Ref: DBEngine
EngineVersion:
Ref: DBEngineVersion
MasterUsername:
Ref: DBUser
MasterUserPassword:
Ref: DBPassword
AvailabilityZone: !Ref DBAvailabilityZone
PubliclyAccessible: true
Tags:
Key: Project
Value: "Demo of RDS PostgreSQL"
DataBucket:
DeletionPolicy: Retain
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Outputs:
Endpoint:
Description: "Endpoint of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Address
Port:
Description: "Port of RDS PostgreSQL database"
Value: !GetAtt MasterDatabase.Endpoint.Port
JdbcConnString:
Description: "JDBC connection string of RDS PostgreSQL database"
Value: !Join
""
– "jdbc:postgresql://"
!GetAtt MasterDatabase.Endpoint.Address
":"
!GetAtt MasterDatabase.Endpoint.Port
"/"
!Ref DBName
"?user="
!Ref DBUser
"&password="
!Ref DBPassword
Bucket:
Description: "Name of Amazon S3 data bucket"
Value: !Ref DataBucket

All the source code for this post is on GitHub. Use the following command to git clone a local copy of the project.

git clone \
–branch master –single-branch –depth 1 –no-tags \
https://github.com/garystafford/presto-aws-federated-queries.git

To create the AWS CloudFormation stack from the template, cloudformation/rds_s3.yaml, execute the following aws cloudformation command. Make sure you change the DBAvailabilityZone parameter value (shown in bold) to match the AWS Availability Zone in which your Ahana PrestoDB Sandbox EC2 instance was created. In my case, us-east-1f.

aws cloudformation create-stack \
--stack-name ahana-prestodb-demo \
--template-body file://cloudformation/presto_ahana_demo.yaml \
--parameters ParameterKey=DBAvailabilityZone,ParameterValue=us-east-1f

To ensure the RDS for PostgreSQL database instance can be accessed by Presto running on the Ahana PrestoDB Sandbox EC2, manually add the PrestoDB Sandbox EC2’s Security Group to port 5432 within the database instance’s VPC Security Group’s Inbound rules. I have also added my own IP to port 5432, which enables me to connect to the RDS instance directly from my IDE using JDBC.

The AWS CloudFormation stack’s Outputs tab includes a set of values, including the JDBC connection string for the new RDS for PostgreSQL instance, JdbcConnString, and the Amazon S3 bucket’s name, Bucket. All these values will be required during the demonstration.

Preparing the PrestoDB Sandbox

There are a few steps we need to take to properly prepare the PrestoDB Sandbox EC2 for our demonstration. First, use your PrestoDB Sandbox EC2 SSH key to scp the properties and sql directories to the Presto EC2 instance. First, you will need to set the EC2_ENDPOINT value (shown in bold) to your EC2’s public IPv4 address or public IPv4 DNS value. You can hardcode the value or use the aws ec2 API command is shown below to retrieve the value programmatically.

# on local workstation
EC2_ENDPOINT=$(aws ec2 describe-instances \
--filters "Name=product-code,Values=ejee5zzmv4tc5o3tr1uul6kg2" \
"Name=product-code.type,Values=marketplace" \
--query "Reservations[*].Instances[*].{Instance:PublicDnsName}" \
--output text)
scp -i "~/.ssh/ahana-presto.pem" \
-r properties/ sql/ \
ec2-user@${EC2_ENDPOINT}:~/
ssh -i "~/.ssh/ahana-presto.pem" ec2-user@${EC2_ENDPOINT}

Environment Variables

Next, we need to set several environment variables. First, replace the DATA_BUCKET and POSTGRES_HOST values below (shown in bold) to match your environment. The PGPASSWORD value should be correct unless you changed it in the CloudFormation template. Then, execute the command to add the variables to your .bash_profile file.

echo """
export DATA_BUCKET=prestodb-demo-databucket-CHANGE_ME
export POSTGRES_HOST=presto-demo.CHANGE_ME.us-east-1.rds.amazonaws.com
export PGPASSWORD=5up3r53cr3tPa55w0rd
export JAVA_HOME=/usr
export HADOOP_HOME=/home/ec2-user/hadoop
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/tools/lib/*
export HIVE_HOME=/home/ec2-user/hive
export PATH=$HIVE_HOME/bin:$HADOOP_HOME/bin:$PATH
""" >>~/.bash_profile

Optionally, I suggest updating the EC2 instance with available updates and install your favorite tools, likehtop, to monitor the EC2 performance.

yes | sudo yum update
yes | sudo yum install htop
View of htop running on an r5.xlarge EC2 instance

Before further configuration for the demonstration, let’s review a few aspects of the Ahana PrestoDB EC2 instance. There are several applications pre-installed on the instance, including Java, Presto, Hadoop, PostgreSQL, and Hive. Versions shown are current as of early September 2020.

java -version
# openjdk version "1.8.0_252"
# OpenJDK Runtime Environment (build 1.8.0_252-b09)
# OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
hadoop version
# Hadoop 2.9.2
postgres --version
# postgres (PostgreSQL) 9.2.24
psql --version
# psql (PostgreSQL) 9.2.24
hive --version
# Hive 2.3.7
presto-cli --version
# Presto CLI 0.235-cb21100

The Presto configuration files are in the /etc/presto/ directory. The Hive configuration files are in the ~/hive/conf/ directory. Here are a few commands you can use to gain a better understanding of their configurations.

ls /etc/presto/
cat /etc/presto/jvm.config
cat /etc/presto/config.properties
cat /etc/presto/node.properties
# installed and configured catalogs
ls /etc/presto/catalog/
cat ~/hive/conf/hive-site.xml

Configure Presto

To configure Presto, we need to create and copy a new Presto postgresql catalog properties file for the newly created RDS for PostgreSQL instance. Modify the properties/rds_postgresql.properties file, replacing the value, connection-url (shown in bold), with your own JDBC connection string, shown in the CloudFormation Outputs tab.

connector.name=postgresql
connection-url=jdbc:postgresql://presto-demo.abcdefg12345.us-east-1.rds.amazonaws.com:5432/shipping
connection-user=presto
connection-password=5up3r53cr3tPa55w0rd

Move the rds_postgresql.properties file to its correct location using sudo.

sudo mv properties/rds_postgresql.properties /etc/presto/catalog/

We also need to modify the existing Hive catalog properties file, which will allow us to write to non-managed Hive tables from Presto.

connector.name=hive-hadoop2
hive.metastore.uri=thrift://localhost:9083
hive.non-managed-table-writes-enabled=true

The following command will overwrite the existing hive.properties file with the modified version containing the new property.

sudo mv properties/hive.properties |
/etc/presto/catalog/hive.properties

To finalize the configuration of the catalog properties files, we need to restart Presto. The easiest way is to reboot the EC2 instance, then SSH back into the instance. Since our environment variables are in the .bash_profile file, they will survive a restart and logging back into the EC2 instance.

sudo reboot

Add Tables to Apache Hive Metastore

We will use RDS for PostgreSQL and Apache Hive Metastore/Amazon S3 as additional data sources for our federated queries. The Ahana PrestoDB Sandbox instance comes pre-configured with Apache Hive and an Apache Hive Metastore, backed by PostgreSQL (a separate PostgreSQL 9.x instance pre-installed on the EC2).

The Sandbox’s instance of Presto comes pre-configured with schemas for the TPC Benchmark DS (TPC-DS). We will create identical tables in our Apache Hive Metastore, which correspond to three external tables in the TPC-DS data source’s sf1 schema: tpcds.sf1.customer, tpcds.sf1.customer_address, and tpcds.sf1.customer_demographics. A Hive external table describes the metadata/schema on external files. External table files can be accessed and managed by processes outside of Hive. As an example, here is the SQL statement that creates the external customer table in the Hive Metastore and whose data will be stored in the S3 bucket.

CREATE EXTERNAL TABLE IF NOT EXISTS `customer`(
`c_customer_sk` bigint,
`c_customer_id` char(16),
`c_current_cdemo_sk` bigint,
`c_current_hdemo_sk` bigint,
`c_current_addr_sk` bigint,
`c_first_shipto_date_sk` bigint,
`c_first_sales_date_sk` bigint,
`c_salutation` char(10),
`c_first_name` char(20),
`c_last_name` char(30),
`c_preferred_cust_flag` char(1),
`c_birth_day` integer,
`c_birth_month` integer,
`c_birth_year` integer,
`c_birth_country` char(20),
`c_login` char(13),
`c_email_address` char(50),
`c_last_review_date_sk` bigint)
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

The threeCREATE EXTERNAL TABLE SQL statements are included in the sql/ directory: sql/hive_customer.sql, sql/hive_customer_address.sql, and sql/hive_customer_demographics.sql. The bucket name (shown in bold above), needs to be manually updated to your own bucket name in all three files before continuing.

Next, run the following hive commands to create the external tables in the Hive Metastore within the existing default schema/database.

hive --database default -f sql/hive_customer.sql
hive --database default -f sql/hive_customer_address.sql
hive --database default -f sql/hive_customer_demographics.sql

To confirm the tables were created successfully, we could use a variety of hive commands.

hive --database default -e "SHOW TABLES;"
hive --database default -e "DESCRIBE FORMATTED customer;"
hive --database default -e "SELECT * FROM customer LIMIT 5;"
Using the ‘DESCRIBE FORMATTED customer_address;’ Hive command

Alternatively, you can also create the external table interactively from within Hive, using the hive command to access the CLI. Copy and paste the contents of the SQL files to the hive CLI. To exit hive use quit;.

Interactively querying within Apache Hive

Amazon S3 Data Source Setup

With the external tables created, we will now select all the data from each of the three tables in the TPC-DS data source and insert that data into the equivalent Hive tables. The physical data will be written to Amazon S3 as highly-efficient, columnar storage format, SNAPPY-compressed Apache Parquet files. Execute the following commands. I will explain why the customer_address table statements are a bit different, next.

# inserts 100,000 rows
presto-cli --execute """
INSERT INTO hive.default.customer
SELECT * FROM tpcds.sf1.customer;
"""
# inserts 50,000 rows across 52 partitions
presto-cli --execute """
INSERT INTO hive.default.customer_address
SELECT ca_address_sk, ca_address_id, ca_street_number,
ca_street_name, ca_street_type, ca_suite_number,
ca_city, ca_county, ca_zip, ca_country, ca_gmt_offset,
ca_location_type, ca_state
FROM tpcds.sf1.customer_address
ORDER BY ca_address_sk;
"""
# add new partitions in metastore
hive -e "MSCK REPAIR TABLE default.customer_address;"
# inserts 1,920,800 rows
presto-cli --execute """
INSERT INTO hive.default.customer_demographics
SELECT * FROM tpcds.sf1.customer_demographics;
"""

Confirm the data has been loaded into the correct S3 bucket locations and is in Parquet-format using the AWS Management Console or AWS CLI. Rest assured, the Parquet-format data is SNAPPY-compressed even though the S3 console incorrectly displays Compression as None. You can easily confirm the compression codec with a utility like parquet-tools.

Data organized by key prefixes in Amazon S3
Using S3’s ‘Select from’ feature to preview the SNAPPY-compressed Parquet format data

Partitioned Tables

The customer_address table is unique in that it has been partitioned by the ca_state column. Partitioned tables are created using the PARTITIONED BY clause.

CREATE EXTERNAL TABLE `customer_address`(
`ca_address_sk` bigint,
`ca_address_id` char(16),
`ca_street_number` char(10),
`ca_street_name` char(60),
`ca_street_type` char(15),
`ca_suite_number` char(10),
`ca_city` varchar(60),
`ca_county` varchar(30),
`ca_zip` char(10),
`ca_country` char(20),
`ca_gmt_offset` double precision,
`ca_location_type` char(20)
)
PARTITIONED BY (`ca_state` char(2))
STORED AS PARQUET
LOCATION
's3a://prestodb-demo-databucket-CHANGE_ME/customer'
TBLPROPERTIES ('parquet.compression'='SNAPPY');

According to Apache Hive, a table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Since the data for the Hive tables are stored in Amazon S3, that means that when the data is written to the customer_address table, it is automatically separated into different S3 key prefixes based on the state. The data is physically “partitioned”.

customer_address data, partitioned by the state, in Amazon S3

Whenever add new partitions in S3, we need to run the MSCK REPAIR TABLE command to add that table’s new partitions to the Hive Metastore.

hive -e "MSCK REPAIR TABLE default.customer_address;"

In SQL, a predicate is a condition expression that evaluates to a Boolean value, either true or false. Defining the partitions aligned with the attributes that are frequently used in the conditions/filters (predicates) of the queries can significantly increase query efficiency. When we execute a query that uses an equality comparison condition, such as ca_state = 'TN', partitioning means the query will only work with a slice of the data in the corresponding ca_state=TN prefix key. There are 50,000 rows of data in the customer_address table, but only 1,418 rows (2.8% of the total data) in the ca_state=TN partition. With the additional advantage of Parquet format with SNAPPY compression, partitioning can significantly reduce query execution time.

Adding Data to RDS for PostgreSQL Instance

For the demonstration, we will also replicate the schema and data of the tpcds.sf1.customer_address table to the new RDS for PostgreSQL instance’s shipping database.

CREATE TABLE customer_address (
ca_address_sk bigint,
ca_address_id char(16),
ca_street_number char(10),
ca_street_name char(60),
ca_street_type char(15),
ca_suite_number char(10),
ca_city varchar(60),
ca_county varchar(30),
ca_state char(2),
ca_zip char(10),
ca_country char(20),
ca_gmt_offset double precision,
ca_location_type char(20)
);

Like Hive and Presto, we can create the table programmatically from the command line or interactively; I prefer the programmatic approach. Use the following psql command, we can create the customer_address table in the public schema of the shipping database.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-f sql/postgres_customer_address.sql

Now, to insert the data into the new PostgreSQL table, run the following presto-cli command.

# inserts 50,000 rows
presto-cli --execute """
INSERT INTO rds_postgresql.public.customer_address
SELECT * FROM tpcds.sf1.customer_address;
"""

To confirm that the data was imported properly, we can use a variety of commands.

-- Should be 50000 rows in table
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT COUNT(*) FROM customer_address;"
psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto \
-c "SELECT * FROM customer_address LIMIT 5;"

Alternatively, you could use the PostgreSQL client interactively by copying and pasting the contents of the sql/postgres_customer_address.sql file to the psql command prompt. To interact with PostgreSQL from the psql command prompt, use the following command.

psql -h ${POSTGRES_HOST} -p 5432 -d shipping -U presto

Use the \dt command to list the PostgreSQL tables and the \q command to exit the PostgreSQL client. We now have all the new data sources created and configured for Presto!

Interacting with Presto

Presto provides a web interface for monitoring and managing queries. The interface provides dashboard-like insights into the Presto Cluster and queries running on the cluster. The Presto UI is available on port 8080 using the public IPv4 address or the public IPv4 DNS.

There are several ways to interact with Presto, via the PrestoDB Sandbox. The post will demonstrate how to execute ad-hoc queries against Presto from an IDE using a JDBC connection and the Presto CLI. Other options include running queries against Presto from Java and Python applications, Tableau, or Apache Spark/PySpark.

Below, we see a query being run against Presto from JetBrains PyCharm, using a Java Database Connectivity (JDBC) connection. The advantage of using an IDE like JetBrains is having a single visual interface, including all the project files, multiple JDBC configurations, output results, and the ability to run multiple ad hoc queries.

Below, we see an example of configuring the Presto Data Source using the JDBC connection string, supplied in the CloudFormation stack Outputs tab.

Make sure to download and use the latest Presto JDBC driver JAR.

With JetBrains’ IDEs, we can even limit the databases/schemas displayed by the Data Source. This is helpful when we have multiple Presto catalogs configured, but we are only interested in certain data sources.

We can also run queries using the Presto CLI, three different ways. We can pass a SQL statement to the Presto CLI, pass a file containing a SQL statement to the Presto CLI, or work interactively from the Presto CLI. Below, we see a query being run, interactively from the Presto CLI.

As the query is running, we can observe the live Presto query statistics (not very user friendly in my terminal).

And finally, the view the query results.

Federated Queries

The example queries used in the demonstration and included in the project were mainly extracted from the scholarly article, Why You Should Run TPC-DS: A Workload Analysis, available as a PDF on the tpc.org website. I have modified the SQL queries to work with Presto.

In the first example, we will run the three versions of the same basic query statement. Version 1 of the query is not a federated query; it only queries a single data source. Version 2 of the query queries two different data sources. Finally, version 3 of the query queries three different data sources. Each of the three versions of the SQL statement should return the same results — 93 rows of data.

Version 1: Single Data Source

The first version of the query statement, sql/presto_query2.sql, is not a federated query. Each of the query’s four tables (catalog_returns, date_dim, customer, and customer_address) reference the TPC-DS data source, which came pre-installed with the PrestoDB Sandbox. Note table references on lines 11–13 and 41–42 are all associated with the tpcds.sf1 schema.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
catalog_returns,
date_dim,
customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
customer_address,
customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

We will run each query non-interactively using the presto-cli. We will choose the sf1 (scale factor of 1) tpcds schema. According to Presto, every unit in the scale factor (sf1, sf10, sf100) corresponds to a gigabyte of data.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2.sql \
--output-format ALIGNED \
--client-tags "presto_query2"

Below, we see the query results in the presto-cli.

Below, we see the first query running in Presto’s web interface.

Below, we see the first query’s results detailed in Presto’s web interface.

Version 2: Two Data Sources

In the second version of the query statement, sql/presto_query2_federated_v1.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. The other two tables (customer and customer_address) now reference the Apache Hive Metastore for their schema and underlying data in Amazon S3. Note table references on lines 11 and 12, as opposed to lines 13, 41, and 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
hive.default.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
hive.default.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v1.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v1"

Below, we see the second query’s results detailed in Presto’s web interface.

Even though the data is in two separate and physically different data sources, we can easily query it as though it were all in the same place.

Version 3: Three Data Sources

In the third version of the query statement, sql/presto_query2_federated_v2.sql, two of the tables (catalog_returns and date_dim) reference the TPC-DS data source. One of the tables (hive.default.customer) references the Apache Hive Metastore. The fourth table (rds_postgresql.public.customer_address) references the new RDS for PostgreSQL database instance. The underlying data is in Amazon S3. Note table references on lines 11 and 12, and on lines 13 and 41, as opposed to line 42.

Modified version of
Figure 7: Reporting Query (Query 40)
http://www.tpc.org/tpcds/presentations/tpcds_workload_analysis.pdf
WITH customer_total_return AS (
SELECT
cr_returning_customer_sk AS ctr_cust_sk,
ca_state AS ctr_state,
sum(cr_return_amt_inc_tax) AS ctr_return
FROM
tpcds.sf1.catalog_returns,
tpcds.sf1.date_dim,
rds_postgresql.public.customer_address
WHERE
cr_returned_date_sk = d_date_sk
AND d_year = 1998
AND cr_returning_addr_sk = ca_address_sk
GROUP BY
cr_returning_customer_sk,
ca_state
)
SELECT
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return
FROM
customer_total_return ctr1,
rds_postgresql.public.customer_address,
hive.default.customer
WHERE
ctr1.ctr_return > (
SELECT
avg(ctr_return) * 1.2
FROM
customer_total_return ctr2
WHERE
ctr1.ctr_state = ctr2.ctr_state)
AND ca_address_sk = c_current_addr_sk
AND ca_state = 'TN'
AND ctr1.ctr_cust_sk = c_customer_sk
ORDER BY
c_customer_id,
c_salutation,
c_first_name,
c_last_name,
ca_street_number,
ca_street_name,
ca_street_type,
ca_suite_number,
ca_city,
ca_county,
ca_state,
ca_zip,
ca_country,
ca_gmt_offset,
ca_location_type,
ctr_return;

Again, we have run the query using the presto-cli.

presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query2_federated_v2.sql \
--output-format ALIGNED \
--client-tags "presto_query2_federated_v2"

Below, we see the third query’s results detailed in Presto’s web interface.

Again, even though the data is in three separate and physically different data sources, we can easily query it as though it were all in the same place.

Additional Query Examples

The project contains several additional query statements, which I have extracted from Why You Should Run TPC-DS: A Workload Analysis and modified work with Presto and federate across multiple data sources.

# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1.sql \
--output-format ALIGNED \
--client-tags "presto_query1"
# federated - two sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query1_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query1_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4.sql \
--output-format ALIGNED \
--client-tags "presto_query4"
# federated - three sources
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query4_federated.sql \
--output-format ALIGNED \
--client-tags "presto_query4_federated"
# non-federated
presto-cli \
--catalog tpcds \
--schema sf1 \
--file sql/presto_query5.sql \
--output-format ALIGNED \
--client-tags "presto_query5"

Conclusion

In this post, we gained a better understanding of Presto using Ahana’s PrestoDB Sandbox product from AWS Marketplace. We learned how Presto queries data where it lives, including Apache Hive, Thrift, Kafka, Kudu, and Cassandra, Elasticsearch, MongoDB, etc. We also learned about Apache Hive and the Apache Hive Metastore, Apache Parquet file format, and how and why to partition Hive data in Amazon S3. Most importantly, we learned how to write federated queries that join multiple disparate data sources without having to move the data into a single monolithic data store.


This blog represents my own viewpoints and not of my employer, Amazon Web Services.

, , , ,

  1. Leave a comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: