There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives.
However, installing, configuring, and managing the technologies that support big data analytics, data science, ML, and AI, at scale and in Production, often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. The mere ability to manage terabytes and petabytes of transient data is beyond the capability of many enterprises, let alone performing analysis of that data.
To ease the burden of implementing these technologies, the three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data Analytics-, AI-, and ML-as-a-Service offerings. In this post, we will explore one such cloud-based service offering in the field of big data analytics, Google Cloud Dataproc. We will focus on Cloud Dataproc’s ability to quickly and efficiently run Spark jobs written in Java and Python, two widely adopted enterprise programming languages.
Featured Technologies
The following technologies are featured prominently in this post.
Google Cloud Dataproc
According to Google, Cloud Dataproc is a fast, easy-to-use, fully-managed cloud service for running the Apache Spark and Apache Hadoop ecosystem on Google Cloud Platform. Dataproc is a complete platform for data processing, analytics, and machine learning. Dataproc offers per-second billing, so you only pay for exactly the resources you consume. Dataproc offers frequently updated and native versions of Apache Spark, Hadoop, Pig, and Hive, as well as other related applications. Dataproc has built-in integrations with other Google Cloud Platform (GCP) services, such as Cloud Storage, BigQuery, Bigtable, Stackdriver Logging, and Stackdriver Monitoring. Dataproc’s clusters are configurable and resizable from a three to hundreds of nodes, and each cluster action takes less than 90 seconds on average.
Similar Platform as a Service (PaaS) offerings to Dataproc, include Amazon Elastic MapReduce (EMR), Microsoft Azure HDInsight, and Qubole Data Service. Qubole is offered on AWS, Azure, and Oracle Cloud Infrastructure (Oracle OCI).
According to Google, Cloud Dataproc and Cloud Dataflow, both part of GCP’s Data Analytics/Big Data Product offerings, can both be used for data processing, and there’s overlap in their batch and streaming capabilities. Cloud Dataflow is a fully-managed service for transforming and enriching data in stream and batch modes. Dataflow uses the Apache Beam SDK to provide developers with Java and Python APIs, similar to Spark.
Apache Spark
According to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With in-memory speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.
According to a post by DataFlair, ‘the DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD (Resilient Distributed Dataset). In Spark DAG, every edge directs from earlier to later in the sequence. On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.’ Below, we see a three-stage DAG visualization, displayed using the Spark History Server Web UI, from a job demonstrated in this post.
Spark’s polyglot programming model allows users to write applications in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). Spark may be run using its standalone cluster mode or on Apache Hadoop YARN, Mesos, and Kubernetes.
PySpark
The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.
Apache Hadoop
According to Apache, the Apache Hadoop project develops open-source software for reliable, scalable, distributed computing. The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. This is a rather modest description of such a significant and transformative project. When we talk about Hadoop, often it is in the context of the project’s well-known modules, which includes:
- Hadoop Common: The common utilities that support the other Hadoop modules
- Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data
- Hadoop YARN (Yet Another Resource Negotiator): A framework for job scheduling and cluster resource management, also known as ‘Hadoop NextGen’
- Hadoop MapReduce: A YARN-based system for parallel processing of large datasets
- Hadoop Ozone: An object store for Hadoop
Based on the Powered by Apache Hadoop list, there are many well-known enterprises and academic institutions using Apache Hadoop, including Adobe, eBay, Facebook, Hulu, LinkedIn, and The New York Times.
Spark vs. Hadoop
There are many articles and posts that delve into the Spark versus Hadoop debate, this post is not one of them. Although both are mature technologies, Spark, the new kid on the block, reached version 1.0.0 in May 2014, whereas Hadoop reached version 1.0.0, earlier, in December 2011. According to Google Trends, interest in both technologies has remained relatively high over the last three years. However, interest in Spark, based on the volume of searches, has been steadily outpacing Hadoop for well over a year now. The in-memory speed of Spark over HDFS-based Hadoop and ease of Spark SQL for working with structured data are likely big differentiators for many users coming from a traditional relational database background and users with large or streaming datasets, requiring near real-time processing.
In this post, all examples are built to run on Spark. This is not meant to suggest Spark is necessarily superior or that Spark runs better on Dataproc than Hadoop. In fact, Dataproc’s implementation of Spark relies on Hadoop’s core HDFS and YARN technologies to run.
Demonstration
To show the capabilities of Cloud Dataproc, we will create both a single-node Dataproc cluster and three-node cluster, upload Java- and Python-based analytics jobs and data to Google Cloud Storage, and execute the jobs on the Spark cluster. Finally, we will enable monitoring and notifications for the Dataproc clusters and the jobs running on the clusters with Stackdriver. The post will demonstrate the use of the Google Cloud Console, as well as Google’s Cloud SDK’s command line tools, for all tasks.
In this post, we will be uploading and running individual jobs on the Dataproc Spark cluster, as opposed to using the Cloud Dataproc Workflow Templates. According to Google, Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. Workflow Templates are useful for automating your Datapoc workflows, however, automation is not the primary topic of this post.
Source Code
All open-sourced code for this post can be found on GitHub in two repositories, one for Java with Spark and one for Python with PySpark. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.
Cost
Of course, there is a cost associated with provisioning cloud services. However, if you manage the Google Cloud Dataproc resources prudently, the costs are negligible. Regarding pricing, according to Google, Cloud Dataproc pricing is based on the size of Cloud Dataproc clusters and the duration of time that they run. The size of a cluster is based on the aggregate number of virtual CPUs (vCPUs) across the entire cluster, including the master and worker nodes. The duration of a cluster is the length of time, measured in minutes, between cluster creation and cluster deletion.
Over the course of writing the code for this post, as well as writing the post itself, the entire cost of all the related resources was a minuscule US$7.50. The cost includes creating, running, and deleting more than a dozen Dataproc clusters and uploading and executing approximately 75-100 Spark and PySpark jobs. Given the quick creation time of a cluster, 2 minutes on average or less in this demonstration, there is no reason to leave a cluster running longer than it takes to complete your workloads.
Kaggle Datasets
To explore the features of Dataproc, we will use a publicly-available dataset from Kaggle. Kaggle is a popular open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.
For this demonstration, I chose the IBRD Statement Of Loans Data dataset, from World Bank Financial Open Data, and available on Kaggle. The International Bank for Reconstruction and Development (IBRD) loans are public and publicly guaranteed debt extended by the World Bank Group. IBRD loans are made to, or guaranteed by, countries that are members of IBRD. This dataset contains historical snapshots of the Statement of Loans including the latest available snapshots.
There are two data files available. The ‘Statement of Loans’ latest available snapshots data file contains 8,713 rows of loan data (~3 MB), ideal for development and testing. The ‘Statement of Loans’ historic data file contains approximately 750,000 rows of data (~265 MB). Although not exactly ‘big data’, the historic dataset is large enough to sufficiently explore Dataproc. Both IBRD files have an identical schema with 33 columns of data (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
End of Period | Loan Number | Region | Country Code | Country | Borrower | Guarantor Country Code | Guarantor | Loan Type | Loan Status | Interest Rate | Currency of Commitment | Project ID | Project Name | Original Principal Amount | Cancelled Amount | Undisbursed Amount | Disbursed Amount | Repaid to IBRD | Due to IBRD | Exchange Adjustment | Borrower's Obligation | Sold 3rd Party | Repaid 3rd Party | Due 3rd Party | Loans Held | First Repayment Date | Last Repayment Date | Agreement Signing Date | Board Approval Date | Effective Date (Most Recent) | Closed Date (Most Recent) | Last Disbursement Date | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2018-10-31T00:00:00 | IBRD00010 | EUROPE AND CENTRAL ASIA | FR | France | CREDIT NATIONAL | FR | France | NON POOL | Fully Repaid | 4.2500 | P037383 | RECONSTRUCTION | 250000000.00 | 0.00 | 0.00 | 250000000.00 | 38000.00 | 0.00 | 0.00 | 0.00 | 249962000.00 | 249962000.00 | 0.00 | 0.00 | 1952-11-01T00:00:00 | 1977-05-01T00:00:00 | 1947-05-09T00:00:00 | 1947-05-09T00:00:00 | 1947-06-09T00:00:00 | 1947-12-31T00:00:00 | |||
2018-10-31T00:00:00 | IBRD00020 | EUROPE AND CENTRAL ASIA | NL | Netherlands | NON POOL | Fully Repaid | 4.2500 | P037452 | RECONSTRUCTION | 191044211.75 | 0.00 | 0.00 | 191044211.75 | 103372211.75 | 0.00 | 0.00 | 0.00 | 87672000.00 | 87672000.00 | 0.00 | 0.00 | 1952-04-01T00:00:00 | 1972-10-01T00:00:00 | 1947-08-07T00:00:00 | 1947-08-07T00:00:00 | 1947-09-11T00:00:00 | 1948-03-31T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00021 | EUROPE AND CENTRAL ASIA | NL | Netherlands | NON POOL | Fully Repaid | 4.2500 | P037452 | RECONSTRUCTION | 3955788.25 | 0.00 | 0.00 | 3955788.25 | 0.00 | 0.00 | 0.00 | 0.00 | 3955788.25 | 3955788.25 | 0.00 | 0.00 | 1953-04-01T00:00:00 | 1954-04-01T00:00:00 | 1948-05-25T00:00:00 | 1947-08-07T00:00:00 | 1948-06-01T00:00:00 | 1948-06-30T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00030 | EUROPE AND CENTRAL ASIA | DK | Denmark | NON POOL | Fully Repaid | 4.2500 | P037362 | RECONSTRUCTION | 40000000.00 | 0.00 | 0.00 | 40000000.00 | 17771000.00 | 0.00 | 0.00 | 0.00 | 22229000.00 | 22229000.00 | 0.00 | 0.00 | 1953-02-01T00:00:00 | 1972-08-01T00:00:00 | 1947-08-22T00:00:00 | 1947-08-22T00:00:00 | 1947-10-17T00:00:00 | 1949-03-31T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00040 | EUROPE AND CENTRAL ASIA | LU | Luxembourg | NON POOL | Fully Repaid | 4.2500 | P037451 | RECONSTRUCTION | 12000000.00 | 238016.98 | 0.00 | 11761983.02 | 1619983.02 | 0.00 | 0.00 | 0.00 | 10142000.00 | 10142000.00 | 0.00 | 0.00 | 1949-07-15T00:00:00 | 1972-07-15T00:00:00 | 1947-08-28T00:00:00 | 1947-08-28T00:00:00 | 1947-10-24T00:00:00 | 1949-03-31T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00050 | LATIN AMERICA AND CARIBBEAN | CL | Chile | Ministry of Finance | CL | Chile | NON POOL | Fully Repaid | 4.5000 | P006578 | POWER | 13500000.00 | 0.00 | 0.00 | 13500000.00 | 12167000.00 | 0.00 | 0.00 | 0.00 | 1333000.00 | 1333000.00 | 0.00 | 0.00 | 1953-07-01T00:00:00 | 1968-07-01T00:00:00 | 1948-03-25T00:00:00 | 1948-03-25T00:00:00 | 1949-04-07T00:00:00 | 1954-12-31T00:00:00 | |||
2018-10-31T00:00:00 | IBRD00060 | LATIN AMERICA AND CARIBBEAN | CL | Chile | Ministry of Finance | CL | Chile | NON POOL | Fully Repaid | 3.7500 | P006577 | FOMENTO AGRIC CREDIT | 2500000.00 | 0.00 | 0.00 | 2500000.00 | 755000.00 | 0.00 | 0.00 | 0.00 | 1745000.00 | 1745000.00 | 0.00 | 0.00 | 1950-07-01T00:00:00 | 1955-01-01T00:00:00 | 1948-03-25T00:00:00 | 1948-03-25T00:00:00 | 1949-04-07T00:00:00 | 1950-01-01T00:00:00 | |||
2018-10-31T00:00:00 | IBRD00070 | EUROPE AND CENTRAL ASIA | NL | Netherlands | NON POOL | Fully Repaid | 3.5625 | P037453 | SHIPPING I | 2000000.00 | 0.00 | 0.00 | 2000000.00 | 0.00 | 0.00 | 0.00 | 0.00 | 2000000.00 | 2000000.00 | 0.00 | 0.00 | 1949-01-15T00:00:00 | 1958-07-15T00:00:00 | 1948-07-15T00:00:00 | 1948-05-21T00:00:00 | 1948-08-03T00:00:00 | 1948-08-03T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00071 | EUROPE AND CENTRAL ASIA | NL | Netherlands | NON POOL | Fully Repaid | 3.5625 | P037453 | SHIPPING I | 2000000.00 | 0.00 | 0.00 | 2000000.00 | 0.00 | 0.00 | 0.00 | 0.00 | 2000000.00 | 2000000.00 | 0.00 | 0.00 | 1949-01-15T00:00:00 | 1958-07-15T00:00:00 | 1948-07-15T00:00:00 | 1948-05-21T00:00:00 | 1948-08-03T00:00:00 | 1948-08-03T00:00:00 |
In this demonstration, both the Java and Python jobs will perform the same simple analysis of the larger historic dataset. For the analysis, we will ascertain the top 25 historic IBRD borrower, we will determine their total loan disbursements, current loan obligations, and the average interest rates they were charged for all loans. This simple analysis will be performed using Spark’s SQL capabilities. The results of the analysis, a Spark DataFrame containing 25 rows, will be saved as a CSV-format data file.
SELECT country, country_code, Format_number(total_disbursement, 0) AS total_disbursement, Format_number(total_obligation, 0) AS total_obligation, Format_number(avg_interest_rate, 2) AS avg_interest_rate FROM (SELECT country, country_code, Sum(disbursed) AS total_disbursement, Sum(obligation) AS total_obligation, Avg(interest_rate) AS avg_interest_rate FROM loans GROUP BY country, country_code ORDER BY total_disbursement DESC LIMIT 25)
Google Cloud Storage
First, we need a location to store our Spark jobs, data files, and results, which will be accessible to Dataproc. Although there are a number of choices, the simplest and most convenient location for Dataproc is a Google Cloud Storage bucket. According to Google, Cloud Storage offers the highest level of availability and performance within a single region and is ideal for compute, analytics, and ML workloads in a particular region. Cloud Storage buckets are nearly identical to Amazon Simple Storage Service (Amazon S3), their object storage service.
Using the Google Cloud Console, Google’s Web Admin UI, create a new, uniquely named Cloud Storage bucket. Our Dataproc clusters will eventually be created in a single regional location. We need to ensure our new bucket is created in the same regional location as the clusters; I chose us-east1.
We will need the new bucket’s link, to use within the Java and Python code as well from the command line with gsutil
. The gsutil
tool is a Python application that lets you access Cloud Storage from the command line. The bucket’s link may be found on the Storage Browser Console’s Overview tab. A bucket’s link is always in the format, gs://bucket-name
.
Alternatively, we may also create the Cloud Storage bucket using gsutil
with the make buckets (mb
) command, as follows:
# Always best practice since features are updated frequently gcloud components update export PROJECT=your_project_name export REGION=us-east1 export BUCKET_NAME=gs://your_bucket_name # Make sure you are creating resources in the correct project gcloud config set project $PROJECT gsutil mb -p $PROJECT -c regional -l $REGION $BUCKET_NAME
Cloud Dataproc Cluster
Next, we will create two different Cloud Dataproc clusters for demonstration purposes. If you have not used Cloud Dataproc previously in your GCP Project, you will first need to enable the API for Cloud Dataproc.
Single Node Cluster
We will start with a single node cluster with no worker nodes, suitable for development and testing Spark and Hadoop jobs, using small datasets. Create a single-node Dataproc cluster using the Single Node Cluster mode option. Create the cluster in the same region as the new Cloud Storage bucket. This will allow the Dataproc cluster access to the bucket without additional security or IAM configuration. I used the n1-standard-1
machine type, with 1 vCPU and 3.75 GB of memory. Observe the resources assigned to Hadoop YARN for Spark job scheduling and cluster resource management.
The new cluster, consisting of a single node and no worker nodes, should be ready for use in a few minutes or less.
Note the Image version, 1.3.16-deb9
. According to Google, Dataproc uses image versions to bundle operating system, big data components, and Google Cloud Platform connectors into one package that is deployed on a cluster. This image, released in November 2018, is the latest available version at the time of this post. The image contains:
- Apache Spark 2.3.1
- Apache Hadoop 2.9.0
- Apache Pig 0.17.0
- Apache Hive 2.3.2
- Apache Tez 0.9.0
- Cloud Storage connector 1.9.9-hadoop2
- Scala 2.11.8
- Python 2.7
To avoid lots of troubleshooting, make sure your code is compatible with the image’s versions. It is important to note the image does not contain a version of Python 3. You will need to ensure your Python code is built to run with Python 2.7. Alternatively, use Dataproc’s --initialization-actions
flag along with bootstrap and setup shell scripts to install Python 3 on the cluster using pip
or conda
. Tips for installing Python 3 on Datapoc be found on Stack Overflow and elsewhere on the Internet.
As as an alternative to the Google Cloud Console, we are able to create the cluster using a REST command. Google provides the Google Cloud Console’s equivalent REST request, as shown in the example below.
Additionally, we have the option of using the gcloud
command line tool. This tool provides the primary command-line interface to Google Cloud Platform and is part of Google’s Cloud SDK, which also includes the aforementioned gsutil
. Here again, Google provides the Google Cloud Console’s equivalent gcloud
command. This is a great way to learn to use the command line.
Using the dataproc clusters create
command, we are able to create the same cluster as shown above from the command line, as follows:
export PROJECT=your_project_name export CLUSTER_1=your_single_node_cluster_name export REGION=us-east1 export ZONE=us-east1-b export MACHINE_TYPE_SMALL=n1-standard-1 gcloud dataproc clusters create $CLUSTER_1 \ --region $REGION \ --zone $ZONE \ --single-node \ --master-machine-type $MACHINE_TYPE_SMALL \ --master-boot-disk-size 500 \ --image-version 1.3-deb9 \ --project $PROJECT
There are a few useful commands to inspect your running Dataproc clusters. The dataproc clusters describe
command, in particular, provides detailed information about all aspects of the cluster’s configuration and current state.
gcloud dataproc clusters list --region $REGION gcloud dataproc clusters describe $CLUSTER_2 \ --region $REGION --format json
Standard Cluster
In addition to the single node cluster, we will create a second three-node Dataproc cluster. We will compare the speed of a single-node cluster to that of a true cluster with multiple worker nodes. Create a new Dataproc cluster using the Standard Cluster mode option. Again, make sure to create the cluster in the same region as the new Storage bucket.
The second cluster contains a single master node and two worker nodes. All three nodes use the n1-standard-4
machine type, with 4 vCPU and 15 GB of memory. Although still considered a minimally-sized cluster, this cluster represents a significant increase in compute power over the first single-node cluster, which had a total of 2 vCPU, 3.75 GB of memory, and no worker nodes on which to distribute processing. Between the two workers in the second cluster, we have 8 vCPU and 30 GB of memory for computation.
Again, we have the option of using the gcloud
command line tool to create the cluster:
export PROJECT=your_project_name export CLUSTER_2=your_three_node_cluster_name export REGION=us-east1 export ZONE=us-east1-b export NUM_WORKERS=2 export MACHINE_TYPE_LARGE=n1-standard-4 gcloud dataproc clusters create $CLUSTER_2 \ --region $REGION \ --zone $ZONE \ --master-machine-type $MACHINE_TYPE_LARGE \ --master-boot-disk-size 500 \ --num-workers $NUM_WORKERS \ --worker-machine-type $MACHINE_TYPE_LARGE \ --worker-boot-disk-size 500 \ --image-version 1.3-deb9 \ --project $PROJECT
Cluster Creation Speed: Cloud Dataproc versus Amazon EMS?
In a series of rather unscientific tests, I found the three-node Dataproc cluster took less than two minutes on average to be created. Compare that time to a similar three-node cluster built with Amazon’s EMR service using their general purpose m4.4xlarge Amazon EC2 instance type. In a similar series of tests, I found the EMR cluster took seven minutes on average to be created. The EMR cluster took 3.5 times longer to create than the comparable Dataproc cluster. Again, although not a totally accurate comparison, since both services offer different features, it gives you a sense of the speed of Dataproc as compared to Amazon EMR.
Staging Buckets
According to Google, when you create a cluster, Cloud Dataproc creates a Cloud Storage staging bucket in your project or reuses an existing Cloud Dataproc-created bucket from a previous cluster creation request. Staging buckets are used to stage miscellaneous configuration and control files that are needed by your cluster. Below, we see the staging buckets created for the two Dataproc clusters.
Project Files
Before uploading the jobs and running them on the Cloud Dataproc clusters, we need to understand what is included in the two GitHub projects. If you recall from the Kaggle section of the post, both projects are basically the same but, written in different languages, Java and Python. The jobs they contain all perform the same basic analysis on the dataset.
Java Project
The dataproc-java-demo
Java-based GitHub project contains three classes, each which are jobs to run by Spark. The InternationalLoansApp
Java class is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example.dataproc; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.SaveMode; | |
import org.apache.spark.sql.SparkSession; | |
public class InternationalLoansApp { | |
public static void main(String[] args) { | |
InternationalLoansApp app = new InternationalLoansApp(); | |
app.start(); | |
} | |
private void start() { | |
SparkSession spark = SparkSession.builder() | |
.appName("dataproc-java-demo") | |
.master("local[*]") | |
.getOrCreate(); | |
spark.sparkContext().setLogLevel("INFO"); // INFO by default | |
// Loads CSV file from local directory | |
Dataset<Row> dfLoans = spark.read() | |
.format("csv") | |
.option("header", "true") | |
.option("inferSchema", true) | |
.load("data/ibrd-statement-of-loans-latest-available-snapshot.csv"); | |
// Basic stats | |
System.out.printf("Rows of data:%d%n", dfLoans.count()); | |
System.out.println("Inferred Schema:"); | |
dfLoans.printSchema(); | |
// Creates temporary view using DataFrame | |
dfLoans.withColumnRenamed("Country", "country") | |
.withColumnRenamed("Country Code", "country_code") | |
.withColumnRenamed("Disbursed Amount", "disbursed") | |
.withColumnRenamed("Borrower's Obligation", "obligation") | |
.withColumnRenamed("Interest Rate", "interest_rate") | |
.createOrReplaceTempView("loans"); | |
// Performs basic analysis of dataset | |
Dataset<Row> dfDisbursement = spark.sql( | |
"SELECT country, country_code, " | |
+ "format_number(total_disbursement, 0) AS total_disbursement, " | |
+ "format_number(ABS(total_obligation), 0) AS total_obligation, " | |
+ "format_number(avg_interest_rate, 2) AS avg_interest_rate " | |
+ "FROM ( " | |
+ "SELECT country, country_code, " | |
+ "SUM(disbursed) AS total_disbursement, " | |
+ "SUM(obligation) AS total_obligation, " | |
+ "AVG(interest_rate) AS avg_interest_rate " | |
+ "FROM loans " | |
+ "GROUP BY country, country_code " | |
+ "ORDER BY total_disbursement DESC " | |
+ "LIMIT 25)" | |
); | |
dfDisbursement.show(25, 100); | |
// Calculates and displays the grand total disbursed amount | |
Dataset<Row> dfGrandTotalDisbursement = spark.sql( | |
"SELECT format_number(SUM(disbursed),0) AS grand_total_disbursement FROM loans" | |
); | |
dfGrandTotalDisbursement.show(); | |
// Calculates and displays the grand total remaining obligation amount | |
Dataset<Row> dfGrandTotalObligation = spark.sql( | |
"SELECT format_number(SUM(obligation),0) AS grand_total_obligation FROM loans" | |
); | |
dfGrandTotalObligation.show(); | |
// Saves results to a locally CSV file | |
dfDisbursement.repartition(1) | |
.write() | |
.mode(SaveMode.Overwrite) | |
.format("csv") | |
.option("header", "true") | |
.save("data/ibrd-summary-small-java"); | |
System.out.println("Results successfully written to CSV file"); | |
} | |
} |
On line 20, the Spark Session’s Master URL, .master("local[*]")
, directs Spark to run locally with as many worker threads as logical cores on the machine. There are several options for setting the Master URL, detailed here.
On line 30, the path to the data file, and on line 84, the output path for the data file, is a local relative file path.
On lines 38–42, we do a bit of clean up on the column names, for only those columns we are interested in for the analysis. Be warned, the column names of the IBRD data are less than ideal for SQL-based analysis, containing mixed-cased characters, word spaces, and brackets.
On line 79, we call Spark DataFrame’s repartition method, dfDisbursement.repartition(1)
. The repartition method allows us to recombine the results of our analysis and output a single CSV file to the bucket. Ordinarily, Spark splits the data into partitions and executes computations on the partitions in parallel. Each partition’s data is written to separate CSV files when a DataFrame is written back to the bucket.
Using coalesce(1)
or repartition(1)
to recombine the resulting 25-Row DataFrame on a single node is okay for the sake of this demonstration, but is not practical for recombining partitions from larger DataFrames. There are more efficient and less costly ways to manage the results of computations, depending on the intended use of the resulting data.
The InternationalLoansAppDataprocSmall
class is intended to be run on the Dataproc clusters, analyzing the same smaller CSV data file. The InternationalLoansAppDataprocLarge
class is also intended to be run on the Dataproc clusters, however, it analyzes the larger 750K rows of data in the IRBD historic CSV file (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example.dataproc; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.SaveMode; | |
import org.apache.spark.sql.SparkSession; | |
public class InternationalLoansAppDataprocLarge { | |
public static void main(String[] args) { | |
InternationalLoansAppDataprocLarge app = new InternationalLoansAppDataprocLarge(); | |
app.start(); | |
} | |
private void start() { | |
SparkSession spark = SparkSession.builder() | |
.appName("dataproc-java-demo") | |
.master("yarn") | |
.getOrCreate(); | |
spark.sparkContext().setLogLevel("WARN"); // INFO by default | |
// Loads CSV file from Google Storage Bucket | |
Dataset<Row> dfLoans = spark.read() | |
.format("csv") | |
.option("header", "true") | |
.option("inferSchema", true) | |
.load("gs://dataproc-demo-bucket/ibrd-statement-of-loans-historical-data.csv"); | |
// Creates temporary view using DataFrame | |
dfLoans.withColumnRenamed("Country", "country") | |
.withColumnRenamed("Country Code", "country_code") | |
.withColumnRenamed("Disbursed Amount", "disbursed") | |
.withColumnRenamed("Borrower's Obligation", "obligation") | |
.withColumnRenamed("Interest Rate", "interest_rate") | |
.createOrReplaceTempView("loans"); | |
// Performs basic analysis of dataset | |
Dataset<Row> dfDisbursement = spark.sql( | |
"SELECT country, country_code, " | |
+ "format_number(total_disbursement, 0) AS total_disbursement, " | |
+ "format_number(ABS(total_obligation), 0) AS total_obligation, " | |
+ "format_number(avg_interest_rate, 2) AS avg_interest_rate " | |
+ "FROM ( " | |
+ "SELECT country, country_code, " | |
+ "SUM(disbursed) AS total_disbursement, " | |
+ "SUM(obligation) AS total_obligation, " | |
+ "AVG(interest_rate) AS avg_interest_rate " | |
+ "FROM loans " | |
+ "GROUP BY country, country_code " | |
+ "ORDER BY total_disbursement DESC " | |
+ "LIMIT 25)" | |
); | |
// Saves results to single CSV file in Google Storage Bucket | |
dfDisbursement.repartition(1) | |
.write() | |
.mode(SaveMode.Overwrite) | |
.format("csv") | |
.option("header", "true") | |
.save("gs://dataproc-demo-bucket/ibrd-summary-large-java"); | |
System.out.println("Results successfully written to CSV file"); | |
} | |
} |
On line 20, note the Spark Session’s Master URL, .master(yarn)
, directs Spark to connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode
when submitting the job. The cluster location will be found based on the HADOOP_CONF_DIR
or YARN_CONF_DIR
variable. Recall, the Dataproc cluster runs Spark on YARN.
Also, note on line 30, the path to the data file, and on line 63, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv"
). Cloud Dataproc clusters automatically install the Cloud Storage connector. According to Google, there are a number of benefits to choosing Cloud Storage over traditional HDFS including data persistence, reliability, and performance.
These are the only two differences between the local version of the Spark job and the version of the Spark job intended for Dataproc. To build the project’s JAR file, which you will later upload to the Cloud Storage bucket, compile the Java project using the gradle build
command from the root of the project. For convenience, the JAR file is also included in the GitHub repository.
Python Project
The dataproc-python-demo
Python-based GitHub project contains two Python scripts to be run using PySpark for this post. The international_loans_local.py
Python script is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file. It does a few different analysis with the smaller dataset. (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Author: Gary A. Stafford | |
# License: MIT | |
from pyspark.sql import SparkSession | |
def main(): | |
spark = SparkSession \ | |
.builder \ | |
.master("local[*]") \ | |
.appName('dataproc-python-demo') \ | |
.getOrCreate() | |
# Defaults to INFO | |
sc = spark.sparkContext | |
sc.setLogLevel("INFO") | |
# Loads CSV file from local directory | |
df_loans = spark \ | |
.read \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.option("inferSchema", "true") \ | |
.load("data/ibrd-statement-of-loans-latest-available-snapshot.csv") | |
# Prints basic stats | |
print "Rows of data:" + str(df_loans.count()) | |
print "Inferred Schema:" | |
df_loans.printSchema() | |
# Creates temporary view using DataFrame | |
df_loans.withColumnRenamed("Country", "country") \ | |
.withColumnRenamed("Country Code", "country_code") \ | |
.withColumnRenamed("Disbursed Amount", "disbursed") \ | |
.withColumnRenamed("Borrower's Obligation", "obligation") \ | |
.withColumnRenamed("Interest Rate", "interest_rate") \ | |
.createOrReplaceTempView("loans") | |
# Performs basic analysis of dataset | |
df_disbursement = spark.sql(""" | |
SELECT country, country_code, | |
format_number(total_disbursement, 0) AS total_disbursement, | |
format_number(ABS(total_obligation), 0) AS total_obligation, | |
format_number(avg_interest_rate, 2) AS avg_interest_rate | |
FROM ( | |
SELECT country, country_code, | |
SUM(disbursed) AS total_disbursement, | |
SUM(obligation) AS total_obligation, | |
AVG(interest_rate) AS avg_interest_rate | |
FROM loans | |
GROUP BY country, country_code | |
ORDER BY total_disbursement DESC | |
LIMIT 25) | |
""").cache() | |
df_disbursement.show(25, True) | |
# Saves results to a locally CSV file | |
df_disbursement.repartition(1) \ | |
.write \ | |
.mode("overwrite") \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.save("data/ibrd-summary-small-python") | |
print "Results successfully written to CSV file" | |
spark.stop() | |
if __name__ == "__main__": | |
main() |
Identical to the corresponding Java class, note on line 12, the Spark Session’s Master URL, .master("local[*]")
, directs Spark to run locally with as many worker threads as logical cores on the machine.
Also identical to the corresponding Java class, note on line 26, the path to the data file, and on line 66, the output path for the resulting data file, is a local relative file path.
The international_loans_dataproc-large.py
Python script is intended to be run on the Dataproc clusters, analyzing the larger 750K rows of data in the IRBD historic CSV file (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Author: Gary A. Stafford | |
# License: MIT | |
from pyspark.sql import SparkSession | |
def main(): | |
spark = SparkSession \ | |
.builder \ | |
.master("yarn") \ | |
.appName('dataproc-python-demo') \ | |
.getOrCreate() | |
# Defaults to INFO | |
sc = spark.sparkContext | |
sc.setLogLevel("WARN") | |
# Loads CSV file from Google Storage Bucket | |
df_loans = spark \ | |
.read \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.option("inferSchema", "true") \ | |
.load("gs://dataproc-demo-bucket/ibrd-statement-of-loans-historical-data.csv") | |
# Creates temporary view using DataFrame | |
df_loans.withColumnRenamed("Country", "country") \ | |
.withColumnRenamed("Country Code", "country_code") \ | |
.withColumnRenamed("Disbursed Amount", "disbursed") \ | |
.withColumnRenamed("Borrower's Obligation", "obligation") \ | |
.withColumnRenamed("Interest Rate", "interest_rate") \ | |
.createOrReplaceTempView("loans") | |
# Performs basic analysis of dataset | |
df_disbursement = spark.sql(""" | |
SELECT country, country_code, | |
format_number(total_disbursement, 0) AS total_disbursement, | |
format_number(ABS(total_obligation), 0) AS total_obligation, | |
format_number(avg_interest_rate, 2) AS avg_interest_rate | |
FROM ( | |
SELECT country, country_code, | |
SUM(disbursed) AS total_disbursement, | |
SUM(obligation) AS total_obligation, | |
AVG(interest_rate) AS avg_interest_rate | |
FROM loans | |
GROUP BY country, country_code | |
ORDER BY total_disbursement DESC | |
LIMIT 25) | |
""").cache() | |
# Saves results to single CSV file in Google Storage Bucket | |
df_disbursement.repartition(1) \ | |
.write \ | |
.mode("overwrite") \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.save("gs://dataproc-demo-bucket/ibrd-summary-large-python") | |
spark.stop() | |
if __name__ == "__main__": | |
main() |
On line 12, note the Spark Session’s Master URL, .master(yarn)
, directs Spark to connect to a YARN cluster.
Again, note on line 26, the path to the data file, and on line 59, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv"
).
These are the only two differences between the local version of the PySpark job and the version of the PySpark job intended for Dataproc. With Python, there is no pre-compilation necessary. We will upload the second script, directly.
Uploading Job Resources to Cloud Storage
In total, we need to upload four items to the new Cloud Storage bucket we created previously. The items include the two Kaggle IBRD CSV files, the compiled Java JAR file from the dataproc-java-demo
project, and the Python script from the dataproc-python-demo
project. Using the Google Cloud Console, upload the four files to the new Google Storage bucket, as shown below. Make sure you unzip the two Kaggle IRBD CSV data files before uploading.
Like before, we also have the option of using gsutil
with the copy (cp
) command to upload the four files. The cp
command accepts wildcards, as shown below.
export PROJECT=your_project_name export BUCKET_NAME=gs://your_bucket_name gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME gsutil cp international_loans_dataproc_large.py $BUCKET_NAME
If our Java or Python jobs were larger, or more complex and required multiple files to run, we could also choose to upload ZIP or other common compression formatted archives using the --archives
flag.
Running Jobs on Dataproc
The easiest way to run a job on the Dataproc cluster is by submitting a job through the Dataproc Jobs UI, part of the Google Cloud Console.
Dataproc has the capability of running multiple types of jobs, including:
- Hadoop
- Spark
- SparkR
- PySpark
- Hive
- SparkSql
- Pig
We will be running both Spark and PySpark jobs as part of this demonstration.
Spark Jobs
To run a Spark job using the JAR file, select Job type Spark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters in your chosen region. Run both jobs at least twice, once on both clusters, for a total of four jobs.
Lastly, you will need to input the main class and the path to the JAR file. The JAR location will be:
gs://your_bucket_name/dataprocJavaDemo-1.0-SNAPSHOT.jar
The main class for the smaller dataset will be:
org.example.dataproc.InternationalLoansAppDataprocSmall
The main class for the larger dataset will be:
org.example.dataproc.InternationalLoansAppDataprocLarge
During or after job execution, you may view details in the Output tab of the Dataproc Jobs console.
Like every other step in this demonstration, we can also use the gcloud
command line tool, instead of the web console, to submit our Spark jobs to each cluster. Here, I am submitting the larger dataset Spark job to the three-node cluster.
export CLUSTER_2=your_three_node_cluster_name export REGION=us-east1 export BUCKET_NAME=gs://your_bucket_name gcloud dataproc jobs submit spark \ --region $REGION \ --cluster $CLUSTER_2 \ --class org.example.dataproc.InternationalLoansAppDataprocLarge \ --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar \ --async
PySpark Jobs
To run a Spark job using the Python script, select Job type PySpark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters. Run the job at least twice, once on both clusters.
Lastly, you will need to input the main Python file path. There is only one Dataproc Python script, which analyzes the larger dataset. The script location will be:
gs://your_bucket_name/international_loans_dataproc_large.py
Like every other step in this demonstration, we can also use the gcloud
command line tool instead of the web console to submit our PySpark jobs to each cluster. Below, I am submitting the PySpark job to the three-node cluster.
export CLUSTER_2=your_three_node_cluster_name export REGION=us-east1 export BUCKET_NAME=gs://your_bucket_name gcloud dataproc jobs submit pyspark \ $BUCKET_NAME/international_loans_dataproc_large.py \ --region $REGION \ --cluster $CLUSTER_2 \ --async
Including the optional --async
flag with any of the dataproc jobs submit command, the job will be sent to the Dataproc cluster and immediately release the terminal back to the user. If you do not to use the --async
flag, the terminal will be unavailable until the job is finished.
However, without the flag, we will get the standard output (stdout) and standard error (stderr) from Dataproc. The output includes some useful information, including different stages of the job execution lifecycle and execution times.
File Output
During development and testing, outputting results to the console is useful. However, in Production, the output from jobs is most often written to Apache Parquet, Apache Avro, CSV, JSON, or XML format files, persisted Apache Hive, SQL, or NoSQL database, or streamed to another system for post-processing, using technologies such as Apache Kafka.
Once both the Java and Python jobs have run successfully on the Dataproc cluster, you should observe the results have been saved back to the Storage bucket. Each script saves its results to a single CSV file in separate directories, as shown below.
The final dataset, written to the CSV file, contains the results of the analysis results (gist).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
country | country_code | total_disbursement | total_obligation | avg_interest_rate | |
---|---|---|---|---|---|
Brazil | BR | 4,302,455,404,056 | 1,253,228,385,979 | 4.08 | |
Mexico | MX | 4,219,081,270,927 | 1,297,489,060,082 | 4.94 | |
Indonesia | ID | 3,270,346,860,046 | 1,162,592,633,450 | 4.67 | |
China | CN | 3,065,658,803,841 | 1,178,177,730,111 | 3.01 | |
India | IN | 3,052,082,309,937 | 1,101,910,589,590 | 3.81 | |
Turkey | TR | 2,797,634,959,120 | 1,111,562,740,520 | 4.85 | |
Argentina | AR | 2,241,512,056,786 | 524,815,800,115 | 3.38 | |
Colombia | CO | 1,701,021,819,054 | 758,168,606,621 | 4.48 | |
Korea, Republic of | KR | 1,349,701,565,303 | 9,609,765,857 | 6.81 | |
Philippines | PH | 1,166,976,603,303 | 365,840,981,818 | 5.38 | |
Poland | PL | 1,157,181,357,135 | 671,373,801,971 | 2.89 | |
Morocco | MA | 1,045,267,705,436 | 365,073,667,924 | 4.39 | |
Russian Federation | RU | 915,318,843,306 | 98,207,276,721 | 1.70 | |
Romania | RO | 902,736,599,033 | 368,321,253,522 | 4.36 | |
Egypt, Arab Republic of | EG | 736,945,143,568 | 431,086,774,867 | 4.43 | |
Thailand | TH | 714,203,701,665 | 70,485,749,749 | 5.93 | |
Peru | PE | 655,818,700,812 | 191,464,347,544 | 3.83 | |
Ukraine | UA | 644,031,278,339 | 394,273,593,116 | 1.47 | |
Pakistan | PK | 628,853,154,827 | 121,673,028,048 | 3.82 | |
Tunisia | TN | 625,648,381,742 | 202,230,595,005 | 4.56 | |
Nigeria | NG | 484,529,279,526 | 2,351,912,541 | 5.86 | |
Kazakhstan | KZ | 453,938,975,114 | 292,590,991,287 | 2.81 | |
Algeria | DZ | 390,644,588,386 | 251,720,881 | 5.30 | |
Chile | CL | 337,041,916,083 | 11,479,003,904 | 4.86 | |
Serbia | YF | 331,975,671,975 | 173,516,517,964 | 5.30 |
Cleaning Up
When you are finished, make sure to delete your running clusters. This may be done through the Google Cloud Console. Deletion of the three-node cluster took, on average, slightly more than one minute.
As usual, we can also use the gcloud
command line tool instead of the web console to delete the Dataproc clusters.
export CLUSTER_1=your_single_node_cluster_name export CLUSTER_2=your_three_node_cluster_name export REGION=us-east1 yes | gcloud dataproc clusters delete $CLUSTER_1 --region $REGION yes | gcloud dataproc clusters delete $CLUSTER_2 --region $REGION
Results
Some observations, based on approximately 75 successful jobs. First, both the Python job and the Java jobs ran in nearly the same amount of time on the single-node cluster and then on the three-node cluster. This is beneficial since, although, a lot of big data analysis is performed with Python, Java is still the lingua franca of many large enterprises.
Consecutive Execution
Below are the average times for running the larger dataset on both clusters, in Java, and in Python. The jobs were all run consecutively as opposed to concurrently. The best time was 59 seconds on the three-node cluster compared to the best time of 150 seconds on the single-node cluster, a difference of 256%. Given the differences in the two clusters, this large variation is expected. The average difference between the two clusters for running the large dataset was 254%.
Concurrent Execution
It is important to understand the impact of concurrently running multiple jobs on the same Dataproc cluster. To demonstrate this, both the Java and Python jobs were also run concurrently. In one such test, ten copies of the Python job were run concurrently on the three-node cluster.
Observe that the execution times of the concurrent jobs increase in near-linear time. The first job completes in roughly the same time as the consecutively executed jobs, shown above, but each proceeding job’s execution time increases linearly.
According to Apache, when running on a cluster, each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application. Each application is given a maximum amount of resources it can use and holds onto them for its whole duration. Note no tuning was done to the Dataproc clusters to optimize for concurrent execution.
Really Big Data?
Although there is no exact definition of ‘big data’, 750K rows of data at 265 MB is probably not generally considered big data. Likewise, the three-node cluster used in this demonstration is still pretty diminutive. Lastly, the SQL query was less than complex. To really test the abilities of Dataproc would require a multi-gigabyte or multi-terabyte-sized dataset, divided amongst multiple files, computed on a much beefier cluster with more workers nodes and more computer resources.
Monitoring and Instrumentation
In addition to viewing the results of running and completed jobs, there are a number of additional monitoring resources, including the Hadoop Yarn Resource Manager, HDFS NameNode, and Spark History Server Web UIs, and Google Stackdriver. I will only briefly introduce these resources, and not examine any of these interfaces in detail. You’re welcome to investigate the resources for your own clusters. Apache lists other Spark monitoring and instrumentation resources in their documentation.
To access the Hadoop Yarn Resource Manager, HDFS NameNode, and Spark History Server Web UIs, you must create an SSH tunnel and run Chrome through a proxy. Google Dataproc provides both commands and a link to documentation in the Dataproc Cluster tab, to connect.
Hadoop Yarn Resource Manager Web UI
Once you are connected to the Dataproc cluster, via the SSH tunnel and proxy, the Hadoop Yarn Resource Manager Web UI is accessed on port 8088. The UI allows you to view all aspects of the YARN cluster and the distributed applications running on the YARN system.
HDFS NameNode Web UI
Once you are connected to the Dataproc cluster, via the SSH tunnel and proxy, the HDFS NameNode Web UI may is accessed on port 9870. According to the Hadoop Wiki, the NameNode is the centerpiece of an HDFS file system. It keeps the directory tree of all files in the file system, and tracks were across the cluster the file data is kept. It does not store the data of these files itself.
Spark History Server Web UI
We can view the details of all completed jobs using the Spark History Server Web UI. Once you are connected to the cluster, via the SSH tunnel and proxy, the Spark History Server Web UI is accessed on port 18080. Of all the methods of reviewing aspects of a completed Spark job, the History Server provides the most detailed.
Using the History Server UI, we can drill into fine-grained details of each job, including the event timeline.
Also, using the History Server UI, we can see a visualization of the Spark job’s DAG (Directed Acyclic Graph). DataBricks published an excellent post on learning how to interpret the information visualized in the Spark UI.
Not only can view the DAG and drill into each Stage of the DAG, from the UI.
Stackdriver
We can also enable Google Stackdriver for monitoring and management of services, containers, applications, and infrastructure. Stackdriver offers an impressive array of services, including debugging, error reporting, monitoring, alerting, tracing, logging, and dashboards, to mention only a few Stackdriver features.
There are dozens of metrics available, which collectively, reflect the health of the Dataproc clusters. Below we see the states of one such metric, the YARN virtual cores (vcores). A YARN vcore, introduced in Hadoop 2.4, is a usage share of a host CPU. The number of YARN virtual cores is equivalent to the number of worker nodes (2) times the number of vCPUs per node (4), for a total of eight YARN virtual cores. Below, we see that at one point in time, 5 of the 8 vcores have been allocated, with 2 more available.
Next, we see the states of the YARN memory size. YARN memory size is calculated as the number of worker nodes (2) times the amount of memory on each node (15 GB) times the fraction given to YARN (0.8), for a total of 24 GB (2 x 15 GB x 0.8). Below, we see that at one point in time, 20 GB of RAM is allocated with 4 GB available. At that instant in time, the workload does not appear to be exhausting the cluster’s memory.
Notifications
Since no one actually watches dashboards all day, waiting for something to fail, how do know when we have an issue with Dataproc? Stackdrive offers integrations with most popular notification channels, including email, SMS, Slack, PagerDuty, HipChat, and Webhooks. With Stackdriver, we define a condition which describes when a service is considered unhealthy. When triggered, Stackdriver sends a notification to one or more channels.
Below is a preview of two alert notifications in Slack. I enabled Slack as a notification channel and created an alert which is triggered each time a Dataproc job fails. Whenever a job fails, such as the two examples below, I receive a Slack notification through the Slack Channel defined in Stackdriver.
Slack notifications contain a link, which routes you back to Stackdriver, to an incident which was opened on your behalf, due to the job failure.
For convenience, the incident also includes a pre-filtered link directly to the log entries at the time of the policy violation. Stackdriver logging offers advanced filtering capabilities to quickly find log entries, as shown below.
With Stackdriver, you get monitoring, logging, alerting, notification, and incident management as a service, with minimal cost and upfront configuration. Think about how much time and effort it takes the average enterprise to achieve this level of infrastructure observability on their own, most never do.
Conclusion
In this post, we have seen the ease-of-use, extensive feature-set, out-of-the-box integration ability with other cloud services, low cost, and speed of Google Cloud Dataproc, to run big data analytics workloads. Couple this with the ability of Stackdriver to provide monitoring, logging, alerting, notification, and incident management for Dataproc with minimal up-front configuration. In my opinion, based on these features, Google Cloud Dataproc leads other cloud competitors for fully-managed Spark and Hadoop Cluster management.
In future posts, we will examine the use of Cloud Dataproc Workflow Templates for process automation, the integration capabilities of Dataproc with services such as BigQuery, Bigtable, Cloud Dataflow, and Google Cloud Pub/Sub, and finally, DevOps for Big Data with Dataproc and tools like Spinnaker and Jenkins on GKE.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, nor Apache or Google.