Archive for category Java Development
Java Development with Microsoft SQL Server: Calling Microsoft SQL Server Stored Procedures from Java Applications Using JDBC
Posted by Gary A. Stafford in AWS, Java Development, Software Development, SQL, SQL Server Development on September 9, 2020
Introduction
Enterprise software solutions often combine multiple technology platforms. Accessing an Oracle database via a Microsoft .NET application and vice-versa, accessing Microsoft SQL Server from a Java-based application is common. In this post, we will explore the use of the JDBC (Java Database Connectivity) API to call stored procedures from a Microsoft SQL Server 2017 database and return data to a Java 11-based console application.

The objectives of this post include:
- Demonstrate the differences between using static SQL statements and stored procedures to return data.
- Demonstrate three types of JDBC statements to return data:
Statement
,PreparedStatement
, andCallableStatement
. - Demonstrate how to call stored procedures with input and output parameters.
- Demonstrate how to return single values and a result set from a database using stored procedures.
Why Stored Procedures?
To access data, many enterprise software organizations require their developers to call stored procedures within their code as opposed to executing static T-SQL (Transact-SQL) statements against the database. There are several reasons stored procedures are preferred:
- Optimization: Stored procedures are often written by DBAs or database developers who specialize in database development. They understand the best way to construct queries for optimal performance and minimal load on the database server. Think of it as a developer using an API to interact with the database.
- Safety and Security: Stored procedures are considered safer and more secure than static SQL statements. The stored procedure provides tight control over the content of the queries, preventing malicious or unintentionally destructive code from being executed against the database.
- Error Handling: Stored procedures can contain logic for handling errors before they bubble up to the application layer and possibly to the end-user.
AdventureWorks 2017 Database
For brevity, I will use an existing and well-known Microsoft SQL Server database, AdventureWorks. The AdventureWorks database was originally published by Microsoft for SQL Server 2008. Although a bit dated architecturally, the database comes prepopulated with plenty of data for demonstration purposes.

HumanResources
schema, one of five schemas within the AdventureWorks databaseFor the demonstration, I have created an Amazon RDS for SQL Server 2017 Express Edition instance on AWS. You have several options for deploying SQL Server, including AWS, Microsoft Azure, Google Cloud, or installed on your local workstation.
There are many methods to deploy the AdventureWorks database to Microsoft SQL Server. For this post’s demonstration, I used the AdventureWorks2017.bak
backup file, which I copied to Amazon S3. Then, I enabled and configured the native backup and restore feature of Amazon RDS for SQL Server to import and install the backup.
DROP DATABASE IF EXISTS AdventureWorks;
GO
EXECUTE msdb.dbo.rds_restore_database
@restore_db_name='AdventureWorks',
@s3_arn_to_restore_from='arn:aws:s3:::my-bucket/AdventureWorks2017.bak',
@type='FULL',
@with_norecovery=0;
-- get task_id from output (e.g. 1)
EXECUTE msdb.dbo.rds_task_status
@db_name='AdventureWorks',
@task_id=1;
Install Stored Procedures
For the demonstration, I have added four stored procedures to the AdventureWorks database to use in this post. To follow along, you will need to install these stored procedures, which are included in the GitHub project.

Data Sources, Connections, and Properties
Using the latest Microsoft JDBC Driver 8.4 for SQL Server (ver. 8.4.1.jre11), we create a SQL Server data source, com.microsoft.sqlserver.jdbc.SQLServerDataSource
, and database connection, java.sql.Connection
. There are several patterns for creating and working with JDBC data sources and connections. This post does not necessarily focus on the best practices for creating or using either. In this example, the application instantiates a connection class, SqlConnection.java
, which in turn instantiates the java.sql.Connection
and com.microsoft.sqlserver.jdbc.SQLServerDataSource
objects. The data source’s properties are supplied from an instance of a singleton class, ProjectProperties.java
. This properties class instantiates the java.util.Properties
class, which reads values from a configuration properties file, config.properties
. On startup, the application creates the database connection, calls each of the example methods, and then closes the connection.
Examples
For each example, I will show the stored procedure, if applicable, followed by the Java method that calls the procedure or executes the static SQL statement. I have left out the data source and connection code in the article. Again, a complete copy of all the code for this article is available on GitHub, including Java source code, SQL statements, helper SQL scripts, and a set of basic JUnit tests.
To run the JUnit unit tests, using Gradle, which the project is based on, use the ./gradlew cleanTest test --warning-mode none
command.

To build and run the application, using Gradle, which the project is based on, use the ./gradlew run --warning-mode none
command.

Example 1: SQL Statement
Before jumping into stored procedures, we will start with a simple static SQL statement. This example’s method, getAverageProductWeightST
, uses the java.sql.Statement
class. According to Oracle’s JDBC documentation, the Statement
object is used for executing a static SQL statement and returning the results it produces. This SQL statement calculates the average weight of all products in the AdventureWorks database. It returns a solitary double
numeric value. This example demonstrates one of the simplest methods for returning data from SQL Server.
/**
* Statement example, no parameters, returns Integer
*
* @return Average weight of all products
*/
public double getAverageProductWeightST() {
double averageWeight = 0;
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.getConnection().createStatement();
String sql = "WITH Weights_CTE(AverageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2)" +
"FROM [Weights_CTE];";
rs = stmt.executeQuery(sql);
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}
Example 2: Prepared Statement
Next, we will execute almost the same static SQL statement as in Example 1. The only change is the addition of the column name, averageWeight
. This allows us to parse the results by column name, making the code easier to understand as opposed to using the numeric index of the column as in Example 1.
Also, instead of using the java.sql.Statement
class, we use the java.sql.PreparedStatement
class. According to Oracle’s documentation, a SQL statement is precompiled and stored in a PreparedStatement
object. This object can then be used to execute this statement multiple times efficiently.
/**
* PreparedStatement example, no parameters, returns Integer
*
* @return Average weight of all products
*/
public double getAverageProductWeightPS() {
double averageWeight = 0;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
String sql = "WITH Weights_CTE(averageWeight) AS" +
"(" +
" SELECT [Weight] AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'LB'" +
" UNION" +
" SELECT [Weight] * 0.00220462262185 AS [AverageWeight]" +
" FROM [Production].[Product]" +
" WHERE [Weight] > 0" +
" AND [WeightUnitMeasureCode] = 'G')" +
"SELECT ROUND(AVG([AverageWeight]), 2) AS [averageWeight]" +
"FROM [Weights_CTE];";
pstmt = connection.getConnection().prepareStatement(sql);
rs = pstmt.executeQuery();
if (rs.next()) {
averageWeight = rs.getDouble("averageWeight");
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}
Example 3: Callable Statement
In this example, the average product weight query has been moved into a stored procedure. The procedure is identical in functionality to the static statement in the first two examples. To call the stored procedure, we use the java.sql.CallableStatement
class. According to Oracle’s documentation, the CallableStatement
extends PreparedStatement
. It is the interface used to execute SQL stored procedures. The CallableStatement
accepts both input and output parameters; however, this simple example does not use either. Like the previous two examples, the procedure returns a double
numeric value.
CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeight]
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO
The calling Java method is shown below.
/**
* CallableStatement, no parameters, returns Integer
*
* @return Average weight of all products
*/
public double getAverageProductWeightCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
ResultSet rs = null;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeight]}");
cstmt.execute();
rs = cstmt.getResultSet();
if (rs.next()) {
averageWeight = rs.getDouble(1);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}
Example 4: Calling a Stored Procedure with an Output Parameter
In this example, we use almost the same stored procedure as in Example 3. The only difference is the inclusion of an output parameter. This time, instead of returning a result set with a value in a single unnamed column, the column has a name, averageWeight
. We can now call that column by name when retrieving the value.
The stored procedure patterns found in Examples 3 and 4 are both commonly used. One procedure uses an output parameter, and one not, both return the same value(s). You can use the CallableStatement
to for either type.
CREATE OR
ALTER PROCEDURE [Production].[uspGetAverageProductWeightOUT]@averageWeight DECIMAL(8, 2) OUT
AS
BEGIN
SET NOCOUNT ON;
WITH
Weights_CTE(AverageWeight)
AS
(
SELECT [Weight] AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'LB'
UNION
SELECT [Weight] * 0.00220462262185 AS [AverageWeight]
FROM [Production].[Product]
WHERE [Weight] > 0
AND [WeightUnitMeasureCode] = 'G'
)
SELECT @averageWeight = ROUND(AVG([AverageWeight]), 2)
FROM [Weights_CTE];
END
GO
The calling Java method is shown below.
/**
* CallableStatement example, (1) output parameter, returns Integer
*
* @return Average weight of all products
*/
public double getAverageProductWeightOutCS() {
CallableStatement cstmt = null;
double averageWeight = 0;
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetAverageProductWeightOUT](?)}");
cstmt.registerOutParameter("averageWeight", Types.DECIMAL);
cstmt.execute();
averageWeight = cstmt.getDouble("averageWeight");
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return averageWeight;
}
Example 5: Calling a Stored Procedure with an Input Parameter
In this example, the procedure returns a result set, java.sql.ResultSet
, of employees whose last name starts with a particular sequence of characters (e.g., ‘M’ or ‘Sa’). The sequence of characters is passed as an input parameter, lastNameStartsWith
, to the stored procedure using the CallableStatement
.
The method making the call iterates through the rows of the result set returned by the stored procedure, concatenating multiple columns to form the employee’s full name as a string. Each full name string is then added to an ordered collection of strings, a List<String>
object. The List instance is returned by the method. You will notice this procedure takes a little longer to run because of the use of the LIKE
operator. The database server has to perform pattern matching on each last name value in the table to determine the result set.
CREATE OR
ALTER PROCEDURE [HumanResources].[uspGetEmployeesByLastName]
@lastNameStartsWith VARCHAR(20) = 'A'
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[FirstName], p.[MiddleName], p.[LastName], p.[Suffix], e.[JobTitle], m.[EmailAddress]
FROM [HumanResources].[Employee] AS e
LEFT JOIN [Person].[Person] p ON e.[BusinessEntityID] = p.[BusinessEntityID]
LEFT JOIN [Person].[EmailAddress] m ON e.[BusinessEntityID] = m.[BusinessEntityID]
WHERE e.[CurrentFlag] = 1
AND p.[PersonType] = 'EM'
AND p.[LastName] LIKE @lastNameStartsWith + '%'
ORDER BY p.[LastName], p.[FirstName], p.[MiddleName]
END
GO
The calling Java method is shown below.
/**
* CallableStatement example, (1) input parameter, returns ResultSet
*
* @param lastNameStartsWith
* @return List of employee names
*/
public List<String> getEmployeesByLastNameCS(String lastNameStartsWith) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<String> employeeFullName = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [HumanResources].[uspGetEmployeesByLastName](?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("lastNameStartsWith", lastNameStartsWith);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
employeeFullName.add(
rs.getString("LastName") + ", "
+ rs.getString("FirstName") + " "
+ rs.getString("MiddleName"));
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return employeeFullName;
}
Example 6: Converting a Result Set to Ordered Collection of Objects
In this last example, we pass two input parameters, productColor
and productSize
, to a slightly more complex stored procedure. The stored procedure returns a result set containing several columns of product information. This time, the example’s method iterates through the result set returned by the procedure and constructs an ordered collection of products, List<Product>
object. The Product objects in the list are instances of the Product.java
POJO class. The method converts each results set’s row’s field value into a Product
property (e.g., Product.Size
, Product.Model
). Using a collection is a common method for persisting data from a result set in an application.
CREATE OR
ALTER PROCEDURE [Production].[uspGetProductsByColorAndSize]
@productColor VARCHAR(20),
@productSize INTEGER
AS
BEGIN
SET NOCOUNT ON;
SELECT p.[ProductNumber], m.[Name] AS [Model], p.[Name] AS [Product], p.[Color], p.[Size]
FROM [Production].[ProductModel] AS m
INNER JOIN
[Production].[Product] AS p ON m.[ProductModelID] = p.[ProductModelID]
WHERE (p.[Color] = @productColor)
AND (p.[Size] = @productSize)
ORDER BY p.[ProductNumber], [Model], [Product]
END
GO
The calling Java method is shown below.
/**
* CallableStatement example, (2) input parameters, returns ResultSet
*
* @param color
* @param size
* @return List of Product objects
*/
public List<Product> getProductsByColorAndSizeCS(String color, String size) {
CallableStatement cstmt = null;
ResultSet rs = null;
List<Product> productList = new ArrayList<>();
try {
cstmt = connection.getConnection().prepareCall(
"{call [Production].[uspGetProductsByColorAndSize](?, ?)}",
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
cstmt.setString("productColor", color);
cstmt.setString("productSize", size);
boolean results = cstmt.execute();
int rowsAffected = 0;
// Protects against lack of SET NOCOUNT in stored procedure
while (results || rowsAffected != -1) {
if (results) {
rs = cstmt.getResultSet();
break;
} else {
rowsAffected = cstmt.getUpdateCount();
}
results = cstmt.getMoreResults();
}
while (rs.next()) {
Product product = new Product(
rs.getString("Product"),
rs.getString("ProductNumber"),
rs.getString("Color"),
rs.getString("Size"),
rs.getString("Model"));
productList.add(product);
}
} catch (Exception ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.SEVERE, null, ex);
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
if (cstmt != null) {
try {
cstmt.close();
} catch (SQLException ex) {
Logger.getLogger(RunExamples.class.getName()).
log(Level.WARNING, null, ex);
}
}
}
return productList;
}
Proper T-SQL: Schema Reference and Brackets
You will notice in all T-SQL statements, I refer to the schema as well as the table or stored procedure name (e.g., {call [Production].[uspGetAverageProductWeightOUT](?)}
). According to Microsoft, it is always good practice to refer to database objects by a schema name and the object name, separated by a period; that even includes the default schema (e.g., dbo
).
You will also notice I wrap the schema and object names in square brackets (e.g., SELECT [ProductNumber] FROM [Production].[ProductModel]
). The square brackets are to indicate that the name represents an object and not a reserved word (e.g, CURRENT
or NATIONAL
). By default, SQL Server adds these to make sure the scripts it generates run correctly.
Running the Examples
The application will display the name of the method being called, a description, the duration of time it took to retrieve the data, and the results returned by the method.
Below, we see the results.

SQL Statement Performance
This post is certainly not about SQL performance, demonstrated by the fact I am only using Amazon RDS for SQL Server 2017 Express Edition on a single, very underpowered db.t2.micro Amazon RDS instance types. However, I have added a timer feature, ProcessTimer.java
class, to capture the duration of time each example takes to return data, measured in milliseconds. The ProcessTimer.java
class is part of the project code. Using the timer, you should observe significant differences between the first run and proceeding runs of the application for several of the called methods. The time difference is a result of several factors, primarily pre-compilation of the SQL statements and SQL Server plan caching.
The effects of these two factors are easily demonstrated by clearing the SQL Server plan cache (see SQL script below) using DBCC (Database Console Commands) statements. and then running the application twice in a row. The second time, pre-compilation and plan caching should result in significantly faster times for the prepared statements and callable statements, in Examples 2–6. In the two random runs shown below, we see up to a 497% improvement in query time.
USE AdventureWorks;
DBCC FREESYSTEMCACHE('SQL Plans');
GO
CHECKPOINT;
GO
-- Impossible to run with Amazon RDS for Microsoft SQL Server on AWS
-- DBCC DROPCLEANBUFFERS;
-- GO
The first run results are shown below.
SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 122
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 146
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 72
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 623
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 830
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 427
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---
The second run results are shown below.
SQL SERVER STATEMENT EXAMPLES
======================================
Method: GetAverageProductWeightST
Description: Statement, no parameters, returns Integer
Duration (ms): 116
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightPS
Description: PreparedStatement, no parameters, returns Integer
Duration (ms): 89
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightCS
Description: CallableStatement, no parameters, returns Integer
Duration (ms): 80
Results: Average product weight (lb): 12.43
---
Method: GetAverageProductWeightOutCS
Description: CallableStatement, (1) output parameter, returns Integer
Duration (ms): 340
Results: Average product weight (lb): 12.43
---
Method: GetEmployeesByLastNameCS
Description: CallableStatement, (1) input parameter, returns ResultSet
Duration (ms): 139
Results: Last names starting with 'Sa': 7
Last employee found: Sandberg, Mikael Q
---
Method: GetProductsByColorAndSizeCS
Description: CallableStatement, (2) input parameter, returns ResultSet
Duration (ms): 208
Results: Products found (color: 'Red', size: '44'): 7
First product: Road-650 Red, 44 (BK-R50R-44)
---
Conclusion
This post has demonstrated several methods for querying and calling stored procedures from a SQL Server 2017 database using JDBC with the Microsoft JDBC Driver 8.4 for SQL Server. Although the examples are quite simple, the same patterns can be used with more complex stored procedures, with multiple input and output parameters, which not only select, but insert, update, and delete data.
There are some limitations of the Microsoft JDBC Driver for SQL Server you should be aware of by reading the documentation. However, for most tasks that require database interaction, the Driver provides adequate functionality with SQL Server.
This blog represents my own viewpoints and not of my employer, Amazon Web Services.
Automating Multi-Environment Kubernetes Virtual Clusters with Google Cloud DNS, Auth0, and Istio 1.0
Posted by Gary A. Stafford in Bash Scripting, Build Automation, Cloud, DevOps, Enterprise Software Development, GCP, Java Development, Kubernetes, Software Development on January 19, 2019
Kubernetes supports multiple virtual clusters within the same physical cluster. These virtual clusters are called Namespaces. Namespaces are a way to divide cluster resources between multiple users. Many enterprises use Namespaces to divide the same physical Kubernetes cluster into different virtual software development environments as part of their overall Software Development Lifecycle (SDLC). This practice is commonly used in ‘lower environments’ or ‘non-prod’ (not Production) environments. These environments commonly include Continous Integration and Delivery (CI/CD), Development, Integration, Testing/Quality Assurance (QA), User Acceptance Testing (UAT), Staging, Demo, and Hotfix. Namespaces provide a basic form of what is referred to as soft multi-tenancy.
Generally, the security boundaries and performance requirements between non-prod environments, within the same enterprise, are less restrictive than Production or Disaster Recovery (DR) environments. This allows for multi-tenant environments, while Production and DR are normally single-tenant environments. In order to approximate the performance characteristics of Production, the Performance Testing environment is also often isolated to a single-tenant. A typical enterprise would minimally have a non-prod, performance, production, and DR environment.
Using Namespaces to create virtual separation on the same physical Kubernetes cluster provides enterprises with more efficient use of virtual compute resources, reduces Cloud costs, eases the management burden, and often expedites and simplifies the release process.
Demonstration
In this post, we will re-examine the topic of virtual clusters, similar to the recent post, Managing Applications Across Multiple Kubernetes Environments with Istio: Part 1 and Part 2. We will focus specifically on automating the creation of the virtual clusters on GKE with Istio 1.0, managing the Google Cloud DNS records associated with the cluster’s environments, and enabling both HTTPS and token-based OAuth access to each environment. We will use the Storefront API for our demonstration, featured in the previous three posts, including Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine.
Source Code
The source code for this post may be found on the gke
branch of the storefront-kafka-docker GitHub repository.
git clone --branch gke --single-branch --depth 1 --no-tags \ https://github.com/garystafford/storefront-kafka-docker.git
Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers, such as LinkedIn.
This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources.
To follow along, you will need to register your own domain, arrange for an Auth0, or alternative, authentication and authorization service, and obtain an SSL/TLS certificate.
SSL/TLS Wildcard Certificate
In the recent post, Securing Your Istio Ingress Gateway with HTTPS, we examined how to create and apply an SSL/TLS certificate to our GKE cluster, to secure communications. Although we are only creating a non-prod cluster, it is more and more common to use SSL/TLS everywhere, especially in the Cloud. For this post, I have registered a single wildcard certificate, *.api.storefront-demo.com. This certificate will cover the three second-level subdomains associated with the virtual clusters: dev.api.storefront-demo.com, test.api.storefront-demo.com, and uat.api.storefront-demo.com. Setting the environment name, such as dev.*
, as the second-level subdomain of my storefront-demo
domain, following the first level api.*
subdomain, makes the use of a wildcard certificate much easier.
As shown below, my wildcard certificate contains the Subject Name and Subject Alternative Name (SAN) of *.api.storefront-demo.com. For Production, api.storefront-demo.com, I prefer to use a separate certificate.
Create GKE Cluster
With your certificate in hand, create the non-prod Kubernetes cluster. Below, the script creates a minimally-sized, three-node, multi-zone GKE cluster, running on GCP, with Kubernetes Engine cluster version 1.11.5-gke.5 and Istio on GKE version 1.0.3-gke.0. I have enabled the master authorized networks option to secure my GKE cluster master endpoint. For the demo, you can add your own IP address CIDR on line 9 (i.e. 1.2.3.4/32
), or remove lines 30 – 31 to remove the restriction (gist).
- Lines 16–39: Create a 3-node, multi-zone GKE cluster with Istio;
- Line 48: Creates three non-prod Namespaces: dev, test, and uat;
- Lines 51–53: Enable Istio automatic sidecar injection within each Namespace;
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Create non-prod Kubernetes cluster on GKE | |
# Constants – CHANGE ME! | |
readonly PROJECT='gke-confluent-atlas' | |
readonly CLUSTER='storefront-api-non-prod' | |
readonly REGION='us-central1' | |
readonly MASTER_AUTH_NETS='<your_ip_cidr>' | |
readonly NAMESPACES=( 'dev' 'test' 'uat' ) | |
# Build a 3-node, single-region, multi-zone GKE cluster | |
time gcloud beta container \ | |
–project $PROJECT clusters create $CLUSTER \ | |
–region $REGION \ | |
–no-enable-basic-auth \ | |
–no-issue-client-certificate \ | |
–cluster-version "1.11.5-gke.5" \ | |
–machine-type "n1-standard-2" \ | |
–image-type "COS" \ | |
–disk-type "pd-standard" \ | |
–disk-size "100" \ | |
–scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \ | |
–num-nodes "1" \ | |
–enable-stackdriver-kubernetes \ | |
–enable-ip-alias \ | |
–enable-master-authorized-networks \ | |
–master-authorized-networks $MASTER_AUTH_NETS \ | |
–network "projects/${PROJECT}/global/networks/default" \ | |
–subnetwork "projects/${PROJECT}/regions/${REGION}/subnetworks/default" \ | |
–default-max-pods-per-node "110" \ | |
–addons HorizontalPodAutoscaling,HttpLoadBalancing,Istio \ | |
–istio-config auth=MTLS_STRICT \ | |
–metadata disable-legacy-endpoints=true \ | |
–enable-autoupgrade \ | |
–enable-autorepair | |
# Get cluster creds | |
gcloud container clusters get-credentials $CLUSTER \ | |
–region $REGION –project $PROJECT | |
kubectl config current-context | |
# Create Namespaces | |
kubectl apply -f ./resources/other/namespaces.yaml | |
# Enable automatic Istio sidecar injection | |
for namespace in ${NAMESPACES[@]}; do | |
kubectl label namespace $namespace istio-injection=enabled | |
done |
If successful, the results should look similar to the output, below.
The cluster will contain a pool of three minimally-sized VMs, the Kubernetes nodes.
Deploying Resources
The Istio Gateway and three ServiceEntry resources are the primary resources responsible for routing the traffic from the ingress router to the Services, within the multiple Namespaces. Both of these resource types are new to Istio 1.0 (gist).
- Lines 9–16: Port config that only accepts HTTPS traffic on port 443 using TLS;
- Lines 18–20: The three subdomains being routed to the non-prod GKE cluster;
- Lines 28, 63, 98: The three subdomains being routed to the non-prod GKE cluster;
- Lines 39, 47, 65, 74, 82, 90, 109, 117, 125: Routing to FQDN of Storefront API Services within the three Namespaces;
apiVersion: networking.istio.io/v1alpha3 | |
kind: Gateway | |
metadata: | |
name: storefront-gateway | |
spec: | |
selector: | |
istio: ingressgateway | |
servers: | |
– port: | |
number: 443 | |
name: https | |
protocol: HTTPS | |
tls: | |
mode: SIMPLE | |
serverCertificate: /etc/istio/ingressgateway-certs/tls.crt | |
privateKey: /etc/istio/ingressgateway-certs/tls.key | |
hosts: | |
– dev.api.storefront-demo.com | |
– test.api.storefront-demo.com | |
– uat.api.storefront-demo.com | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-dev | |
spec: | |
hosts: | |
– dev.api.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.dev.svc.cluster.local | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-test | |
spec: | |
hosts: | |
– test.api.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.test.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.test.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.test.svc.cluster.local | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-uat | |
spec: | |
hosts: | |
– uat.api.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.uat.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.uat.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.uat.svc.cluster.local |
Next, deploy the Istio and Kubernetes resources to the new GKE cluster. For the sake of brevity, we will deploy the same number of instances and the same version of each the three Storefront API services (Accounts, Orders, Fulfillment) to each of the three non-prod environments (dev, test, uat). In reality, you would have varying numbers of instances of each service, and each environment would contain progressive versions of each service, as part of the SDLC of each microservice (gist).
- Lines 13–14: Deploy the SSL/TLS certificate and the private key;
- Line 17: Deploy the Istio Gateway and three ServiceEntry resources;
- Lines 20–22: Deploy the Istio Authentication Policy resources each Namespace;
- Lines 26–37: Deploy the same set of resources to the dev, test, and uat Namespaces;
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Deploy Kubernetes/Istio resources | |
# Constants – CHANGE ME! | |
readonly CERT_PATH=~/Documents/Articles/gke-kafka/sslforfree_non_prod | |
readonly NAMESPACES=( 'dev' 'test' 'uat' ) | |
# Kubernetes Secret to hold the server’s certificate and private key | |
kubectl create -n istio-system secret tls istio-ingressgateway-certs \ | |
–key $CERT_PATH/private.key –cert $CERT_PATH/certificate.crt | |
# Istio Gateway and three ServiceEntry resources | |
kubectl apply -f ./resources/other/istio-gateway.yaml | |
# End-user auth applied per environment | |
kubectl apply -f ./resources/other/auth-policy-dev.yaml | |
kubectl apply -f ./resources/other/auth-policy-test.yaml | |
kubectl apply -f ./resources/other/auth-policy-uat.yaml | |
# Loop through each non-prod Namespace (environment) | |
# Re-use same resources (incld. credentials) for all environments, just for the demo | |
for namespace in ${NAMESPACES[@]}; do | |
kubectl apply -n $namespace -f ./resources/config/confluent-cloud-kafka-configmap.yaml | |
kubectl apply -n $namespace -f ./resources/config/mongodb-atlas-secret.yaml | |
kubectl apply -n $namespace -f ./resources/config/confluent-cloud-kafka-secret.yaml | |
kubectl apply -n $namespace -f ./resources/other/mongodb-atlas-external-mesh.yaml | |
kubectl apply -n $namespace -f ./resources/other/confluent-cloud-external-mesh.yaml | |
kubectl apply -n $namespace -f ./resources/services/accounts.yaml | |
kubectl apply -n $namespace -f ./resources/services/fulfillment.yaml | |
kubectl apply -n $namespace -f ./resources/services/orders.yaml | |
done |
The deployed Storefront API Services should look as follows.
Google Cloud DNS
Next, we need to enable DNS access to the GKE cluster using Google Cloud DNS. According to Google, Cloud DNS is a scalable, reliable and managed authoritative Domain Name System (DNS) service running on the same infrastructure as Google. It has low latency, high availability, and is a cost-effective way to make your applications and services available to your users.
Whenever a new GKE cluster is created, a new Network Load Balancer is also created. By default, the load balancer’s front-end is an external IP address.
Using a forwarding rule, traffic directed at the external IP address is redirected to the load balancer’s back-end. The load balancer’s back-end is comprised of three VM instances, which are the three Kubernete nodes in the GKE cluster.
If you are following along with this post’s demonstration, we will assume you have a domain registered and configured with Google Cloud DNS. I am using the storefront-demo.com domain, which I have used in the last three posts to demonstrate Istio and GKE.
Google Cloud DNS has a fully functional web console, part of the Google Cloud Console. However, using the Cloud DNS web console is impractical in a DevOps CI/CD workflow, where Kubernetes clusters, Namespaces, and Workloads are ephemeral. Therefore we will use the following script. Within the script, we reset the IP address associated with the A records for each non-prod subdomains associated with storefront-demo.com domain (gist).
- Lines 23–25: Find the previous load balancer’s front-end IP address;
- Lines 27–29: Find the new load balancer’s front-end IP address;
- Line 35: Start the Cloud DNS transaction;
- Lines 37–47: Add the DNS record changes to the transaction;
- Line 49: Execute the Cloud DNS transaction;
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Update Cloud DNS A Records | |
# Constants – CHANGE ME! | |
readonly PROJECT='gke-confluent-atlas' | |
readonly DOMAIN='storefront-demo.com' | |
readonly ZONE='storefront-demo-com-zone' | |
readonly REGION='us-central1' | |
readonly TTL=300 | |
readonly RECORDS=('dev' 'test' 'uat') | |
# Make sure any old load balancers were removed | |
if [ $(gcloud compute forwarding-rules list –filter "region:($REGION)" | wc -l | awk '{$1=$1};1') -gt 2 ]; then | |
echo "More than one load balancer detected, exiting script." | |
exit 1 | |
fi | |
# Get load balancer IP address from first record | |
readonly OLD_IP=$(gcloud dns record-sets list \ | |
–filter "name=${RECORDS[0]}.api.${DOMAIN}." –zone $ZONE \ | |
| awk 'NR==2 {print $4}') | |
readonly NEW_IP=$(gcloud compute forwarding-rules list \ | |
–filter "region:($REGION)" \ | |
| awk 'NR==2 {print $3}') | |
echo "Old LB IP Address: ${OLD_IP}" | |
echo "New LB IP Address: ${NEW_IP}" | |
# Update DNS records | |
gcloud dns record-sets transaction start –zone $ZONE | |
for record in ${RECORDS[@]}; do | |
echo "${record}.api.${DOMAIN}." | |
gcloud dns record-sets transaction remove \ | |
–name "${record}.api.${DOMAIN}." –ttl $TTL \ | |
–type A –zone $ZONE "${OLD_IP}" | |
gcloud dns record-sets transaction add \ | |
–name "${record}.api.${DOMAIN}." –ttl $TTL \ | |
–type A –zone $ZONE "${NEW_IP}" | |
done | |
gcloud dns record-sets transaction execute –zone $ZONE |
The outcome of the script is shown below. Note how changes are executed as part of a transaction, by automatically creating a transaction.yaml
file. The file contains the six DNS changes, three additions and three deletions. The command executes the transaction and then deletes the transaction.yaml
file.
> sh ./part3_set_cloud_dns.sh
Old LB IP Address: 35.193.208.115 New LB IP Address: 35.238.196.231 Transaction started [transaction.yaml]. dev.api.storefront-demo.com. Record removal appended to transaction at [transaction.yaml]. Record addition appended to transaction at [transaction.yaml]. test.api.storefront-demo.com. Record removal appended to transaction at [transaction.yaml]. Record addition appended to transaction at [transaction.yaml]. uat.api.storefront-demo.com. Record removal appended to transaction at [transaction.yaml]. Record addition appended to transaction at [transaction.yaml]. Executed transaction [transaction.yaml] for managed-zone [storefront-demo-com-zone]. Created [https://www.googleapis.com/dns/v1/projects/gke-confluent-atlas/managedZones/storefront-demo-com-zone/changes/53]. ID START_TIME STATUS 55 2019-01-16T04:54:14.984Z pending
Based on my own domain and cluster details, the transaction.yaml
file looks as follows. Again, note the six DNS changes, three additions, followed by three deletions (gist).
— | |
additions: | |
– kind: dns#resourceRecordSet | |
name: storefront-demo.com. | |
rrdatas: | |
– ns-cloud-a1.googledomains.com. cloud-dns-hostmaster.google.com. 25 21600 3600 | |
259200 300 | |
ttl: 21600 | |
type: SOA | |
– kind: dns#resourceRecordSet | |
name: dev.api.storefront-demo.com. | |
rrdatas: | |
– 35.238.196.231 | |
ttl: 300 | |
type: A | |
– kind: dns#resourceRecordSet | |
name: test.api.storefront-demo.com. | |
rrdatas: | |
– 35.238.196.231 | |
ttl: 300 | |
type: A | |
– kind: dns#resourceRecordSet | |
name: uat.api.storefront-demo.com. | |
rrdatas: | |
– 35.238.196.231 | |
ttl: 300 | |
type: A | |
deletions: | |
– kind: dns#resourceRecordSet | |
name: storefront-demo.com. | |
rrdatas: | |
– ns-cloud-a1.googledomains.com. cloud-dns-hostmaster.google.com. 24 21600 3600 | |
259200 300 | |
ttl: 21600 | |
type: SOA | |
– kind: dns#resourceRecordSet | |
name: dev.api.storefront-demo.com. | |
rrdatas: | |
– 35.193.208.115 | |
ttl: 300 | |
type: A | |
– kind: dns#resourceRecordSet | |
name: test.api.storefront-demo.com. | |
rrdatas: | |
– 35.193.208.115 | |
ttl: 300 | |
type: A | |
– kind: dns#resourceRecordSet | |
name: uat.api.storefront-demo.com. | |
rrdatas: | |
– 35.193.208.115 | |
ttl: 300 | |
type: A |
Confirm DNS Changes
Use the dig
command to confirm the DNS records are now correct and that DNS propagation has occurred. The IP address returned by dig
should be the external IP address assigned to the front-end of the Google Cloud Load Balancer.
> dig dev.api.storefront-demo.com +short 35.238.196.231
Or, all the three records.
echo \ "dev.api.storefront-demo.com\n" \ "test.api.storefront-demo.com\n" \ "uat.api.storefront-demo.com" \ > records.txt | dig -f records.txt +short 35.238.196.231 35.238.196.231 35.238.196.231
Optionally, more verbosely by removing the +short
option.
> dig +nocmd dev.api.storefront-demo.com ;; Got answer: ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 30763 ;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1 ;; OPT PSEUDOSECTION: ; EDNS: version: 0, flags:; udp: 512 ;; QUESTION SECTION: ;dev.api.storefront-demo.com. IN A ;; ANSWER SECTION: dev.api.storefront-demo.com. 299 IN A 35.238.196.231 ;; Query time: 27 msec ;; SERVER: 8.8.8.8#53(8.8.8.8) ;; WHEN: Wed Jan 16 18:00:49 EST 2019 ;; MSG SIZE rcvd: 72
The resulting records in the Google Cloud DNS management console should look as follows.
JWT-based Authentication
As discussed in the previous post, Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0, it is typical to limit restrict access to the Kubernetes cluster, Namespaces within the cluster, or Services running within Namespaces to end-users, whether they are humans or other applications. In that previous post, we saw an example of applying a machine-to-machine (M2M) Istio Authentication Policy to only the uat Namespace. This scenario is common when you want to control access to resources in non-production environments, such as UAT, to outside test teams, accessing the uat Namespace through an external application. To simulate this scenario, we will apply the following Istio Authentication Policy to the uat Namespace. (gist).
apiVersion: authentication.istio.io/v1alpha1 | |
kind: Policy | |
metadata: | |
name: default | |
namespace: uat | |
spec: | |
peers: | |
– mtls: {} | |
origins: | |
– jwt: | |
audiences: | |
– "storefront-api-uat" | |
issuer: "https://storefront-demo.auth0.com/" | |
jwksUri: "https://storefront-demo.auth0.com/.well-known/jwks.json" | |
principalBinding: USE_ORIGIN |
For the dev and test Namespaces, we will apply an additional, different Istio Authentication Policy. This policy will protect against the possibility of dev and test M2M API consumers interfering with uat M2M API consumers and vice-versa. Below is the dev and test version of the Policy (gist).
apiVersion: authentication.istio.io/v1alpha1 | |
kind: Policy | |
metadata: | |
name: default | |
namespace: dev | |
spec: | |
peers: | |
– mtls: {} | |
origins: | |
– jwt: | |
audiences: | |
– "storefront-api-dev-test" | |
issuer: "https://storefront-demo.auth0.com/" | |
jwksUri: "https://storefront-demo.auth0.com/.well-known/jwks.json" | |
principalBinding: USE_ORIGIN |
Testing Authentication
Using Postman, with the ‘Bearer Token’ type authentication method, as detailed in the previous post, a call a Storefront API resource in the uat Namespace should succeed. This also confirms DNS and HTTPS are working properly.
The dev and test Namespaces require different authentication. Trying to use no Authentication, or authenticating as a UAT API consumer, will result in a 401 Unauthorized
HTTP status, along with the Origin authentication failed.
error message.
Conclusion
In this brief post, we demonstrated how to create a GKE cluster with Istio 1.0.x, containing three virtual clusters, or Namespaces. Each Namespace represents an environment, which is part of an application’s SDLC. We enforced HTTP over TLS (HTTPS) using a wildcard SSL/TLS certificate. We also enforced end-user authentication using JWT-based OAuth 2.0 with Auth0. Lastly, we provided user-friendly DNS routing to each environment, using Google Cloud DNS. Short of a fully managed API Gateway, like Apigee, and automating the execution of the scripts with Jenkins or Spinnaker, this cluster is ready to provide a functional path to Production for developing our Storefront API.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Building a Microservices Platform with Confluent Cloud, MongoDB Atlas, Istio, and Google Kubernetes Engine
Posted by Gary A. Stafford in Bash Scripting, Cloud, DevOps, Enterprise Software Development, GCP, Java Development, Python, Software Development on December 28, 2018
Leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications. In previous posts, we have integrated other SaaS products, including as MongoDB Atlas fully-managed MongoDB-as-a-service, ElephantSQL fully-manage PostgreSQL-as-a-service, and CloudAMQP RabbitMQ-as-a-service, into cloud-native applications on Azure, AWS, GCP, and PCF.
In this post, we will build and deploy an existing, Spring Framework, microservice-based, cloud-native API to Google Kubernetes Engine (GKE), replete with Istio 1.0, on Google Cloud Platform (GCP). The API will rely on Confluent Cloud to provide a fully-managed, Kafka-based messaging-as-a-service (MaaS). Similarly, the API will rely on MongoDB Atlas to provide a fully-managed, MongoDB-based Database-as-a-service (DBaaS).
Background
In a previous two-part post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1 and Part 2, we examined the role of Apache Kafka in an event-driven, eventually consistent, distributed system architecture. The system, an online storefront RESTful API simulation, was composed of multiple, Java Spring Boot microservices, each with their own MongoDB database. The microservices used a publish/subscribe model to communicate with each other using Kafka-based messaging. The Spring services were built using the Spring for Apache Kafka and Spring Data MongoDB projects.
Given the use case of placing an order through the Storefront API, we examined the interactions of three microservices, the Accounts, Fulfillment, and Orders service. We examined how the three services used Kafka to communicate state changes to each other, in a fully-decoupled manner.
The Storefront API’s microservices were managed behind an API Gateway, Netflix’s Zuul. Service discovery and load balancing were handled by Netflix’s Eureka. Both Zuul and Eureka are part of the Spring Cloud Netflix project. In that post, the entire containerized system was deployed to Docker Swarm.
Developing the services, not operationalizing the platform, was the primary objective of the previous post.
Featured Technologies
The following technologies are featured prominently in this post.
Confluent Cloud
In May 2018, Google announced a partnership with Confluence to provide Confluent Cloud on GCP, a managed Apache Kafka solution for the Google Cloud Platform. Confluent, founded by the creators of Kafka, Jay Kreps, Neha Narkhede, and Jun Rao, is known for their commercial, Kafka-based streaming platform for the Enterprise.
Confluent Cloud is a fully-managed, cloud-based streaming service based on Apache Kafka. Confluent Cloud delivers a low-latency, resilient, scalable streaming service, deployable in minutes. Confluent deploys, upgrades, and maintains your Kafka clusters. Confluent Cloud is currently available on both AWS and GCP.
Confluent Cloud offers two plans, Professional and Enterprise. The Professional plan is optimized for projects under development, and for smaller organizations and applications. Professional plan rates for Confluent Cloud start at $0.55/hour. The Enterprise plan adds full enterprise capabilities such as service-level agreements (SLAs) with a 99.95% uptime and virtual private cloud (VPC) peering. The limitations and supported features of both plans are detailed, here.
MongoDB Atlas
Similar to Confluent Cloud, MongoDB Atlas is a fully-managed MongoDB-as-a-Service, available on AWS, Azure, and GCP. Atlas, a mature SaaS product, offers high-availability, uptime SLAs, elastic scalability, cross-region replication, enterprise-grade security, LDAP integration, BI Connector, and much more.
MongoDB Atlas currently offers four pricing plans, Free, Basic, Pro, and Enterprise. Plans range from the smallest, M0-sized MongoDB cluster, with shared RAM and 512 MB storage, up to the massive M400 MongoDB cluster, with 488 GB of RAM and 3 TB of storage.
MongoDB Atlas has been featured in several past posts, including Deploying and Configuring Istio on Google Kubernetes Engine (GKE) and Developing Applications for the Cloud with Azure App Services and MongoDB Atlas.
Kubernetes Engine
According to Google, Google Kubernetes Engine (GKE) provides a fully-managed, production-ready Kubernetes environment for deploying, managing, and scaling your containerized applications using Google infrastructure. GKE consists of multiple Google Compute Engine instances, grouped together to form a cluster.
A forerunner to other managed Kubernetes platforms, like EKS (AWS), AKS (Azure), PKS (Pivotal), and IBM Cloud Kubernetes Service, GKE launched publicly in 2015. GKE was built on Google’s experience of running hyper-scale services like Gmail and YouTube in containers for over 12 years.
GKE’s pricing is based on a pay-as-you-go, per-second-billing plan, with no up-front or termination fees, similar to Confluent Cloud and MongoDB Atlas. Cluster sizes range from 1 – 1,000 nodes. Node machine types may be optimized for standard workloads, CPU, memory, GPU, or high-availability. Compute power ranges from 1 – 96 vCPUs and memory from 1 – 624 GB of RAM.
Demonstration
In this post, we will deploy the three Storefront API microservices to a GKE cluster on GCP. Confluent Cloud on GCP will replace the previous Docker-based Kafka implementation. Similarly, MongoDB Atlas will replace the previous Docker-based MongoDB implementation.
Kubernetes and Istio 1.0 will replace Netflix’s Zuul and Eureka for API management, load-balancing, routing, and service discovery. Google Stackdriver will provide logging and monitoring. Docker Images for the services will be stored in Google Container Registry. Although not fully operationalized, the Storefront API will be closer to a Production-like platform, than previously demonstrated on Docker Swarm.
For brevity, we will not enable standard API security features like HTTPS, OAuth for authentication, and request quotas and throttling, all of which are essential in Production. Nor, will we integrate a full lifecycle API management tool, like Google Apigee.
Source Code
The source code for this demonstration is contained in four separate GitHub repositories, storefront-kafka-docker, storefront-demo-accounts, storefront-demo-orders, and, storefront-demo-fulfillment. However, since the Docker Images for the three storefront services are available on Docker Hub, it is only necessary to clone the storefront-kafka-docker project. This project contains all the code to deploy and configure the GKE cluster and Kubernetes resources (gist).
git clone –branch master –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/storefront-kafka-docker.git | |
# optional repositories | |
git clone –branch gke –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/storefront-demo-accounts.git | |
git clone –branch gke –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/storefront-demo-orders.git | |
git clone –branch gke –single-branch –depth 1 –no-tags \ | |
https://github.com/garystafford/storefront-demo-fulfillment.git |
Source code samples in this post are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.
Setup Process
The setup of the Storefront API platform is divided into a few logical steps:
- Create the MongoDB Atlas cluster;
- Create the Confluent Cloud Kafka cluster;
- Create Kafka topics;
- Modify the Kubernetes resources;
- Modify the microservices to support Confluent Cloud configuration;
- Create the GKE cluster with Istio on GCP;
- Apply the Kubernetes resources to the GKE cluster;
- Test the Storefront API, Kafka, and MongoDB are functioning properly;
MongoDB Atlas Cluster
This post assumes you already have a MongoDB Atlas account and an existing project created. MongoDB Atlas accounts are free to set up if you do not already have one. Account creation does require the use of a Credit Card.
For minimal latency, we will be creating the MongoDB Atlas, Confluent Cloud Kafka, and GKE clusters, all on the Google Cloud Platform’s us-central1 Region. Available GCP Regions and Zones for MongoDB Atlas, Confluent Cloud, and GKE, vary, based on multiple factors.
For this demo, I suggest creating a free, M0-sized MongoDB cluster. The M0-sized 3-data node cluster, with shared RAM and 512 MB of storage, and currently running MongoDB 4.0.4, is fine for individual development. The us-central1 Region is the only available US Region for the free-tier M0-cluster on GCP. An M0-sized Atlas cluster may take between 7-10 minutes to provision.
MongoDB Atlas’ Web-based management console provides convenient links to cluster details, metrics, alerts, and documentation.
Once the cluster is ready, you can review details about the cluster and each individual cluster node.
In addition to the account owner, create a demo_user
account. This account will be used to authenticate and connect with the MongoDB databases from the storefront services. For this demo, we will use the same, single user account for all three services. In Production, you would most likely have individual users for each service.
Again, for security purposes, Atlas requires you to whitelist the IP address or CIDR block from which the storefront services will connect to the cluster. For now, open the access to your specific IP address using whatsmyip.com, or much less-securely, to all IP addresses (0.0.0.0/0
). Once the GKE cluster and external static IP addresses are created, make sure to come back and update this value; do not leave this wide open to the Internet.
The Java Spring Boot storefront services use a Spring Profile, gke
. According to Spring, Spring Profiles provide a way to segregate parts of your application configuration and make it available only in certain environments. The gke
Spring Profile’s configuration values may be set in a number of ways. For this demo, the majority of the values will be set using Kubernetes Deployment, ConfigMap and Secret resources, shown later.
The first two Spring configuration values will need are the MongoDB Atlas cluster’s connection string and the demo_user
account password. Note these both for later use.
Confluent Cloud Kafka Cluster
Similar to MongoDB Atlas, this post assumes you already have a Confluent Cloud account and an existing project. It is free to set up a Professional account and a new project if you do not already have one. Atlas account creation does require the use of a Credit Card.
The Confluent Cloud web-based management console is shown below. Experienced users of other SaaS platforms may find the Confluent Cloud web-based console a bit sparse on features. In my opinion, the console lacks some necessary features, like cluster observability, individual Kafka topic management, detailed billing history (always says $0?), and persistent history of cluster activities, which survives cluster deletion. It seems like Confluent prefers users to download and configure their Confluent Control Center to get the functionality you might normally expect from a web-based Saas management tool.
As explained earlier, for minimal latency, I suggest creating the MongoDB Atlas cluster, Confluent Cloud Kafka cluster, and the GKE cluster, all on the Google Cloud Platform’s us-central1 Region. For this demo, choose the smallest cluster size available on GCP, in the us-central1 Region, with 1 MB/s R/W throughput and 500 MB of storage. As shown below, the cost will be approximately $0.55/hour. Don’t forget to delete this cluster when you are done with the demonstration, or you will continue to be charged.
Cluster creation of the minimally-sized Confluent Cloud cluster is pretty quick.
Once the cluster is ready, Confluent provides instructions on how to interact with the cluster via the Confluent Cloud CLI. Install the Confluent Cloud CLI, locally, for use later.
As explained earlier, the Java Spring Boot storefront services use a Spring Profile, gke
. Like MongoDB Atlas, the Confluent Cloud Kafka cluster configuration values will be set using Kubernetes ConfigMap and Secret resources, shown later. There are several Confluent Cloud Java configuration values shown in the Client Config Java tab; we will need these for later use.
SASL and JAAS
Some users may not be familiar with the terms, SASL and JAAS. According to Wikipedia, Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols. According to Confluent, Kafka brokers support client authentication via SASL. SASL authentication can be enabled concurrently with SSL encryption (SSL client authentication will be disabled).
There are numerous SASL mechanisms. The PLAIN SASL mechanism (SASL/PLAIN), used by Confluent, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use. The SASL/PLAIN mechanism should only be used with SSL as a transport layer to ensure that clear passwords are not transmitted on the wire without encryption.
According to Wikipedia, Java Authentication and Authorization Service (JAAS) is the Java implementation of the standard Pluggable Authentication Module (PAM) information security framework. According to Confluent, Kafka uses the JAAS for SASL configuration. You must provide JAAS configurations for all SASL authentication mechanisms.
Cluster Authentication
Similar to MongoDB Atlas, we need to authenticate with the Confluent Cloud cluster from the storefront services. The authentication to Confluent Cloud is done with an API Key. Create a new API Key, and note the Key and Secret; these two additional pieces of configuration will be needed later.
Confluent Cloud API Keys can be created and deleted as necessary. For security in Production, API Keys should be created for each service and regularly rotated.
Kafka Topics
With the cluster created, create the storefront service’s three Kafka topics manually, using the Confluent Cloud’s ccloud
CLI tool. First, configure the Confluent Cloud CLI using the ccloud init
command, using your new cluster’s Bootstrap Servers address, API Key, and API Secret. The instructions are shown above Clusters Client Config tab of the Confluent Cloud web-based management interface.
Create the storefront service’s three Kafka topics using the ccloud topic create
command. Use the list
command to confirm they are created.
# manually create kafka topics ccloud topic create accounts.customer.change ccloud topic create fulfillment.order.change ccloud topic create orders.order.fulfill # list kafka topics ccloud topic list accounts.customer.change fulfillment.order.change orders.order.fulfill
Another useful ccloud
command, topic describe
, displays topic replication details. The new topics will have a replication factor of 3 and a partition count of 12.
Adding the --verbose
flag to the command, ccloud --verbose topic describe
, displays low-level topic and cluster configuration details, as well as a log of all topic-related activities.
Kubernetes Resources
The deployment of the three storefront microservices to the dev
Namespace will minimally require the following Kubernetes configuration resources.
- (1) Kubernetes Namespace;
- (3) Kubernetes Deployments;
- (3) Kubernetes Services;
- (1) Kubernetes ConfigMap;
- (2) Kubernetes Secrets;
- (1) Istio 1.0 Gateway;
- (1) Istio 1.0 VirtualService;
- (2) Istio 1.0 ServiceEntry;
The Istio networking.istio.io
v1alpha3
API introduced the last three configuration resources in the list, to control traffic routing into, within, and out of the mesh. There are a total of four new io networking.istio.io
v1alpha3
API routing resources: Gateway, VirtualService, DestinationRule, and ServiceEntry.
Creating and managing such a large number of resources is a common complaint regarding the complexity of Kubernetes. Imagine the resource sprawl when you have dozens of microservices replicated across several namespaces. Fortunately, all resource files for this post are included in the storefront-kafka-docker project’s gke directory.
To follow along with the demo, you will need to make minor modifications to a few of these resources, including the Istio Gateway, Istio VirtualService, two Istio ServiceEntry resources, and two Kubernetes Secret resources.
Istio Gateway & VirtualService
Both the Istio Gateway and VirtualService configuration resources are contained in a single file, istio-gateway.yaml. For the demo, I am using a personal domain, storefront-demo.com
, along with the sub-domain, api.dev
, to host the Storefront API. The domain’s primary A record (‘@’) and sub-domain A record are both associated with the external IP address on the frontend of the load balancer. In the file, this host is configured for the Gateway and VirtualService resources. You can choose to replace the host with your own domain, or simply remove the host block altogether on lines 13–14 and 21–22. Removing the host blocks, you would then use the external IP address on the frontend of the load balancer (explained later in the post) to access the Storefront API (gist).
apiVersion: networking.istio.io/v1alpha3 | |
kind: Gateway | |
metadata: | |
name: storefront-gateway | |
spec: | |
selector: | |
istio: ingressgateway | |
servers: | |
– port: | |
number: 80 | |
name: http | |
protocol: HTTP | |
hosts: | |
– api.dev.storefront-demo.com | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-dev | |
spec: | |
hosts: | |
– api.dev.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.dev.svc.cluster.local |
Istio ServiceEntry
There are two Istio ServiceEntry configuration resources. Both ServiceEntry resources control egress traffic from the Storefront API services, both of their ServiceEntry Location items are set to MESH_INTERNAL
. The first ServiceEntry, mongodb-atlas-external-mesh.yaml, defines MongoDB Atlas cluster egress traffic from the Storefront API (gist).
apiVersion: networking.istio.io/v1alpha3 | |
kind: ServiceEntry | |
metadata: | |
name: mongdb-atlas-external-mesh | |
spec: | |
hosts: | |
– <your_atlas_url.gcp.mongodb.net> | |
ports: | |
– name: mongo | |
number: 27017 | |
protocol: MONGO | |
location: MESH_EXTERNAL | |
resolution: NONE |
The other ServiceEntry, confluent-cloud-external-mesh.yaml, defines Confluent Cloud Kafka cluster egress traffic from the Storefront API (gist).
apiVersion: networking.istio.io/v1alpha3 | |
kind: ServiceEntry | |
metadata: | |
name: confluent-cloud-external-mesh | |
spec: | |
hosts: | |
– <your_cluster_url.us-central1.gcp.confluent.cloud> | |
ports: | |
– name: kafka | |
number: 9092 | |
protocol: TLS | |
location: MESH_EXTERNAL | |
resolution: NONE |
Both need to have their host
items replaced with the appropriate Atlas and Confluent URLs.
Inspecting Istio Resources
The easiest way to view Istio resources is from the command line using the istioctl
and kubectl
CLI tools.
istioctl get gateway istioctl get virtualservices istioctl get serviceentry kubectl describe gateway kubectl describe virtualservices kubectl describe serviceentry
Multiple Namespaces
In this demo, we are only deploying to a single Kubernetes Namespace, dev
. However, Istio will also support routing traffic to multiple namespaces. For example, a typical non-prod Kubernetes cluster might support dev
, test
, and uat
, each associated with a different sub-domain. One way to support multiple Namespaces with Istio 1.0 is to add each host to the Istio Gateway (lines 14–16, below), then create a separate Istio VirtualService for each Namespace. All the VirtualServices are associated with the single Gateway. In the VirtualService, each service’s host address is the fully qualified domain name (FQDN) of the service. Part of the FQDN is the Namespace, which we change for each for each VirtualService (gist).
apiVersion: networking.istio.io/v1alpha3 | |
kind: Gateway | |
metadata: | |
name: storefront-gateway | |
spec: | |
selector: | |
istio: ingressgateway | |
servers: | |
– port: | |
number: 80 | |
name: http | |
protocol: HTTP | |
hosts: | |
– api.dev.storefront-demo.com | |
– api.test.storefront-demo.com | |
– api.uat.storefront-demo.com | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-dev | |
spec: | |
hosts: | |
– api.dev.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.dev.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.dev.svc.cluster.local | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-test | |
spec: | |
hosts: | |
– api.test.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.test.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.test.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.test.svc.cluster.local | |
— | |
apiVersion: networking.istio.io/v1alpha3 | |
kind: VirtualService | |
metadata: | |
name: storefront-uat | |
spec: | |
hosts: | |
– api.uat.storefront-demo.com | |
gateways: | |
– storefront-gateway | |
http: | |
– match: | |
– uri: | |
prefix: /accounts | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: accounts.uat.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /fulfillment | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: fulfillment.uat.svc.cluster.local | |
– match: | |
– uri: | |
prefix: /orders | |
route: | |
– destination: | |
port: | |
number: 8080 | |
host: orders.uat.svc.cluster.local |
MongoDB Atlas Secret
There is one Kubernetes Secret for the sensitive MongoDB configuration and one Secret for the sensitive Confluent Cloud configuration. The Kubernetes Secret object type is intended to hold sensitive information, such as passwords, OAuth tokens, and SSH keys.
The mongodb-atlas-secret.yaml file contains the MongoDB Atlas cluster connection string, with the demo_user
username and password, one for each of the storefront service’s databases (gist).
apiVersion: v1 | |
kind: Secret | |
metadata: | |
name: mongodb-atlas | |
namespace: dev | |
type: Opaque | |
data: | |
mongodb.uri.accounts: your_base64_encoded_value | |
mongodb.uri.fulfillment: your_base64_encoded_value | |
mongodb.uri.orders: your_base64_encoded_value |
Kubernetes Secrets are Base64 encoded. The easiest way to encode the secret values is using the Linux base64
program. The base64
program encodes and decodes Base64 data, as specified in RFC 4648. Pass each MongoDB URI string to the base64
program using echo -n
.
MONGODB_URI=mongodb+srv://demo_user:your_password@your_cluster_address/accounts?retryWrites=true echo -n $MONGODB_URI | base64 bW9uZ29kYitzcnY6Ly9kZW1vX3VzZXI6eW91cl9wYXNzd29yZEB5b3VyX2NsdXN0ZXJfYWRkcmVzcy9hY2NvdW50cz9yZXRyeVdyaXRlcz10cnVl
Repeat this process for the three MongoDB connection strings.
Confluent Cloud Secret
The confluent-cloud-kafka-secret.yaml file contains two data fields in the Secret’s data map, bootstrap.servers
and sasl.jaas.config
. These configuration items were both listed in the Client Config Java tab of the Confluent Cloud web-based management console, as shown previously. The sasl.jaas.config
data field requires the Confluent Cloud cluster API Key and Secret you created earlier. Again, use the base64 encoding process for these two data fields (gist).
apiVersion: v1 | |
kind: Secret | |
metadata: | |
name: confluent-cloud-kafka | |
namespace: dev | |
type: Opaque | |
data: | |
bootstrap.servers: your_base64_encoded_value | |
sasl.jaas.config: your_base64_encoded_value |
Confluent Cloud ConfigMap
The remaining five Confluent Cloud Kafka cluster configuration values are not sensitive, and therefore, may be placed in a Kubernetes ConfigMap, confluent-cloud-kafka-configmap.yaml (gist).
apiVersion: v1 | |
kind: ConfigMap | |
metadata: | |
name: confluent-cloud-kafka | |
data: | |
ssl.endpoint.identification.algorithm: "https" | |
sasl.mechanism: "PLAIN" | |
request.timeout.ms: "20000" | |
retry.backoff.ms: "500" | |
security.protocol: "SASL_SSL" |
Accounts Deployment Resource
To see how the services consume the ConfigMap and Secret values, review the Accounts Deployment resource, shown below. Note the environment variables section, on lines 44–90, are a mix of hard-coded values and values referenced from the ConfigMap and two Secrets, shown above (gist).
apiVersion: v1 | |
kind: Service | |
metadata: | |
name: accounts | |
labels: | |
app: accounts | |
spec: | |
ports: | |
– name: http | |
port: 8080 | |
selector: | |
app: accounts | |
— | |
apiVersion: extensions/v1beta1 | |
kind: Deployment | |
metadata: | |
name: accounts | |
labels: | |
app: accounts | |
spec: | |
replicas: 2 | |
strategy: | |
type: Recreate | |
selector: | |
matchLabels: | |
app: accounts | |
template: | |
metadata: | |
labels: | |
app: accounts | |
annotations: | |
sidecar.istio.io/inject: "true" | |
spec: | |
containers: | |
– name: accounts | |
image: garystafford/storefront-accounts:gke-2.2.0 | |
resources: | |
requests: | |
memory: "250M" | |
cpu: "100m" | |
limits: | |
memory: "400M" | |
cpu: "250m" | |
env: | |
– name: SPRING_PROFILES_ACTIVE | |
value: "gke" | |
– name: SERVER_SERVLET_CONTEXT-PATH | |
value: "/accounts" | |
– name: LOGGING_LEVEL_ROOT | |
value: "INFO" | |
– name: SPRING_DATA_MONGODB_URI | |
valueFrom: | |
secretKeyRef: | |
name: mongodb-atlas | |
key: mongodb.uri.accounts | |
– name: SPRING_KAFKA_BOOTSTRAP-SERVERS | |
valueFrom: | |
secretKeyRef: | |
name: confluent-cloud-kafka | |
key: bootstrap.servers | |
– name: SPRING_KAFKA_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: ssl.endpoint.identification.algorithm | |
– name: SPRING_KAFKA_PROPERTIES_SASL_MECHANISM | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: sasl.mechanism | |
– name: SPRING_KAFKA_PROPERTIES_REQUEST_TIMEOUT_MS | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: request.timeout.ms | |
– name: SPRING_KAFKA_PROPERTIES_RETRY_BACKOFF_MS | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: retry.backoff.ms | |
– name: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG | |
valueFrom: | |
secretKeyRef: | |
name: confluent-cloud-kafka | |
key: sasl.jaas.config | |
– name: SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL | |
valueFrom: | |
configMapKeyRef: | |
name: confluent-cloud-kafka | |
key: security.protocol | |
ports: | |
– containerPort: 8080 | |
imagePullPolicy: IfNotPresent |
Modify Microservices for Confluent Cloud
As explained earlier, Confluent Cloud’s Kafka cluster requires some very specific configuration, based largely on the security features of Confluent Cloud. Connecting to Confluent Cloud requires some minor modifications to the existing storefront service source code. The changes are identical for all three services. To understand the service’s code, I suggest reviewing the previous post, Using Eventual Consistency and Spring for Kafka to Manage a Distributed Data Model: Part 1. Note the following changes are already made to the source code in the gke
git branch, and not necessary for this demo.
The previous Kafka SenderConfig
and ReceiverConfig
Java classes have been converted to Java interfaces. There are four new SenderConfigConfluent
, SenderConfigNonConfluent
, ReceiverConfigConfluent
, and ReceiverConfigNonConfluent
classes, which implement one of the new interfaces. The new classes contain the Spring Boot Profile class-level annotation. One set of Sender and Receiver classes are assigned the @Profile("gke")
annotation, and the others, the @Profile("!gke")
annotation. When the services start, one of the two class implementations are is loaded, depending on the Active Spring Profile, gke
or not gke
. To understand the changes better, examine the Account service’s SenderConfigConfluent.java file (gist).
Line 20: Designates this class as belonging to the gke
Spring Profile.
Line 23: The class now implements an interface.
Lines 25–44: Reference the Confluent Cloud Kafka cluster configuration. The values for these variables will come from the Kubernetes ConfigMap and Secret, described previously, when the services are deployed to GKE.
Lines 55–59: Additional properties that have been added to the Kafka Sender configuration properties, specifically for Confluent Cloud.
package com.storefront.config; | |
import com.storefront.kafka.Sender; | |
import com.storefront.model.CustomerChangeEvent; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Profile; | |
import org.springframework.kafka.annotation.EnableKafka; | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.kafka.core.ProducerFactory; | |
import org.springframework.kafka.support.serializer.JsonSerializer; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Profile("gke") | |
@Configuration | |
@EnableKafka | |
public class SenderConfigConfluent implements SenderConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.properties.ssl.endpoint.identification.algorithm}") | |
private String sslEndpointIdentificationAlgorithm; | |
@Value("${spring.kafka.properties.sasl.mechanism}") | |
private String saslMechanism; | |
@Value("${spring.kafka.properties.request.timeout.ms}") | |
private String requestTimeoutMs; | |
@Value("${spring.kafka.properties.retry.backoff.ms}") | |
private String retryBackoffMs; | |
@Value("${spring.kafka.properties.security.protocol}") | |
private String securityProtocol; | |
@Value("${spring.kafka.properties.sasl.jaas.config}") | |
private String saslJaasConfig; | |
@Override | |
@Bean | |
public Map<String, Object> producerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); | |
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm); | |
props.put("sasl.mechanism", saslMechanism); | |
props.put("request.timeout.ms", requestTimeoutMs); | |
props.put("retry.backoff.ms", retryBackoffMs); | |
props.put("security.protocol", securityProtocol); | |
props.put("sasl.jaas.config", saslJaasConfig); | |
return props; | |
} | |
@Override | |
@Bean | |
public ProducerFactory<String, CustomerChangeEvent> producerFactory() { | |
return new DefaultKafkaProducerFactory<>(producerConfigs()); | |
} | |
@Override | |
@Bean | |
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
@Override | |
@Bean | |
public Sender sender() { | |
return new Sender(); | |
} | |
} |
Once code changes were completed and tested, the Docker Image for each service was rebuilt and uploaded to Docker Hub for public access. When recreating the images, the version of the Java Docker base image was upgraded from the previous post to Alpine OpenJDK 12 (openjdk:12-jdk-alpine
).
Google Kubernetes Engine (GKE) with Istio
Having created the MongoDB Atlas and Confluent Cloud clusters, built the Kubernetes and Istio resources, modified the service’s source code, and pushed the new Docker Images to Docker Hub, the GKE cluster may now be built.
For the sake of brevity, we will manually create the cluster and deploy the resources, using the Google Cloud SDK gcloud and Kubernetes kubectl CLI tools, as opposed to automating with CI/CD tools, like Jenkins or Spinnaker. For this demonstration, I suggest a minimally-sized two-node GKE cluster using n1-standard-2 machine-type instances. The latest available release of Kubernetes on GKE at the time of this post was 1.11.5-gke.5 and Istio 1.03 (Istio on GKE still considered beta). Note Kubernetes and Istio are evolving rapidly, thus the configuration flags often change with newer versions. Check the GKE Clusters tab for the latest clusters create
command format (gist).
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Create non-prod Kubernetes cluster on GKE | |
# Constants – CHANGE ME! | |
readonly NAMESPACE='dev' | |
readonly PROJECT='gke-confluent-atlas' | |
readonly CLUSTER='storefront-api' | |
readonly REGION='us-central1' | |
readonly ZONE='us-central1-a' | |
# Create GKE cluster (time in foreground) | |
time \ | |
gcloud beta container \ | |
–project $PROJECT clusters create $CLUSTER \ | |
–zone $ZONE \ | |
–username "admin" \ | |
–cluster-version "1.11.5-gke.5" \ | |
–machine-type "n1-standard-2" \ | |
–image-type "COS" \ | |
–disk-type "pd-standard" \ | |
–disk-size "100" \ | |
–scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" \ | |
–num-nodes "2" \ | |
–enable-stackdriver-kubernetes \ | |
–enable-ip-alias \ | |
–network "projects/$PROJECT/global/networks/default" \ | |
–subnetwork "projects/$PROJECT/regions/$REGION/subnetworks/default" \ | |
–default-max-pods-per-node "110" \ | |
–addons HorizontalPodAutoscaling,HttpLoadBalancing,Istio \ | |
–istio-config auth=MTLS_PERMISSIVE \ | |
–issue-client-certificate \ | |
–metadata disable-legacy-endpoints=true \ | |
–enable-autoupgrade \ | |
–enable-autorepair | |
# Get cluster creds | |
gcloud container clusters get-credentials $CLUSTER \ | |
–zone $ZONE –project $PROJECT | |
kubectl config current-context | |
# Create dev Namespace | |
kubectl apply -f ./resources/other/namespaces.yaml | |
# Enable Istio automatic sidecar injection in Dev Namespace | |
kubectl label namespace $NAMESPACE istio-injection=enabled |
Executing these commands successfully will build the cluster and the dev
Namespace, into which all the resources will be deployed. The two-node cluster creation process takes about three minutes on average.
We can also observe the new GKE cluster from the GKE Clusters Details tab.
Creating the GKE cluster also creates several other GCP resources, including a TCP load balancer and three external IP addresses. Shown below in the VPC network External IP addresses tab, there is one IP address associated with each of the two GKE cluster’s VM instances, and one IP address associated with the frontend of the load balancer.
While the TCP load balancer’s frontend is associated with the external IP address, the load balancer’s backend is a target pool, containing the two GKE cluster node machine instances.
A forwarding rule associates the load balancer’s frontend IP address with the backend target pool. External requests to the frontend IP address will be routed to the GKE cluster. From there, requests will be routed by Kubernetes and Istio to the individual storefront service Pods, and through the Istio sidecar (Envoy) proxies. There is an Istio sidecar proxy deployed to each Storefront service Pod.
Below, we see the details of the load balancer’s target pool, containing the two GKE cluster’s VMs.
As shown at the start of the post, a simplified view of the GCP/GKE network routing looks as follows. For brevity, firewall rules and routes are not illustrated in the diagram.
Apply Kubernetes Resources
Again, using kubectl, deploy the three services and associated Kubernetes and Istio resources. Note the Istio Gateway and VirtualService(s) are not deployed to the dev
Namespace since their role is to control ingress and route traffic to the dev
Namespace and the services within it (gist).
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Deploy Kubernetes/Istio resources | |
# Constants – CHANGE ME! | |
readonly NAMESPACE='dev' | |
readonly PROJECT='gke-confluent-atlas' | |
readonly CLUSTER='storefront-api' | |
readonly REGION='us-central1' | |
readonly ZONE='us-central1-a' | |
kubectl apply -f ./resources/other/istio-gateway.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/other/mongodb-atlas-external-mesh.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/other/confluent-cloud-external-mesh.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/config/confluent-cloud-kafka-configmap.yaml | |
kubectl apply -f ./resources/config/mongodb-atlas-secret.yaml | |
kubectl apply -f ./resources/config/confluent-cloud-kafka-secret.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/services/accounts.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/services/fulfillment.yaml | |
kubectl apply -n $NAMESPACE -f ./resources/services/orders.yaml |
Once these commands complete successfully, on the Workloads tab, we should observe two Pods of each of the three storefront service Kubernetes Deployments deployed to the dev
Namespace, all six Pods with a Status of ‘OK’. A Deployment controller provides declarative updates for Pods and ReplicaSets.
On the Services tab, we should observe the three storefront service’s Kubernetes Services. A Service in Kubernetes is a REST object.
On the Configuration Tab, we should observe the Kubernetes ConfigMap and two Secrets also deployed to the dev Environment.
Below, we see the confluent-cloud-kafka ConfigMap resource with its data map of Confluent Cloud configuration.
Below, we see the confluent-cloud-kafka Secret with its data map of sensitive Confluent Cloud configuration.
Test the Storefront API
If you recall from part two of the previous post, there are a set of seven Storefront API endpoints that can be called to create sample data and test the API. The HTTP GET Requests hit each service, generate test data, populate the three MongoDB databases, and produce and consume Kafka messages across all three topics. Making these requests is the easiest way to confirm the Storefront API is working properly.
- Sample Customer: accounts/customers/sample
- Sample Orders: orders/customers/sample/orders
- Sample Fulfillment Requests: orders/customers/sample/fulfill
- Sample Processed Order Event: fulfillment/fulfillment/sample/process
- Sample Shipped Order Event: fulfillment/fulfillment/sample/ship
- Sample In-Transit Order Event: fulfillment/fulfillment/sample/in-transit
- Sample Received Order Event: fulfillment/fulfillment/sample/receive
Thee are a wide variety of tools to interact with the Storefront API. The project includes a simple Python script, sample_data.py, which will make HTTP GET requests to each of the above endpoints, after confirming their health, and return a success message.
Postman
Postman, my personal favorite, is also an excellent tool to explore the Storefront API resources. I have the above set of the HTTP GET requests saved in a Postman Collection. Using Postman, below, we see the response from an HTTP GET request to the /accounts/customers
endpoint.
Postman also allows us to create integration tests and run Collections of Requests in batches using Postman’s Collection Runner. To test the Storefront API, below, I used Collection Runner to run a single series of integration tests, intended to confirm the API’s functionality, by checking for expected HTTP response codes and expected values in the response payloads. Postman also shows the response times from the Storefront API. Since this platform was not built to meet Production SLAs, measuring response times is less critical in the Development environment.
Google Stackdriver
If you recall, the GKE cluster had the Stackdriver Kubernetes option enabled, which gives us, amongst other observability features, access to all cluster, node, pod, and container logs. To confirm data is flowing to the MongoDB databases and Kafka topics, we can check the logs from any of the containers. Below we see the logs from the two Accounts Pod containers. Observe the AfterSaveListener
handler firing on an onAfterSave
event, which sends a CustomerChangeEvent
payload to the accounts.customer.change
Kafka topic, without error. These entries confirm that both Atlas and Confluent Cloud are reachable by the GKE-based workloads, and appear to be functioning properly.
MongoDB Atlas Collection View
Review the MongoDB Atlas Clusters Collections tab. In this Development environment, the MongoDB databases and collections are created the first time a service tries to connects to them. In Production, the databases would be created and secured in advance of deploying resources. Once the sample data requests are completed successfully, you should now observe the three Storefront API databases, each with collections of documents.
MongoDB Compass
In addition to the Atlas web-based management console, MongoDB Compass is an excellent desktop tool to explore and manage MongoDB databases. Compass is available for Mac, Linux, and Windows. One of the many great features of Compass is the ability to visualize collection schemas and interactively filter documents. Below we see the fulfillment.requests
collection schema.
Confluent Control Center
Confluent Control Center is a downloadable, web browser-based tool for managing and monitoring Apache Kafka, including your Confluent Cloud clusters. Confluent Control Center provides rich functionality for building and monitoring production data pipelines and streaming applications. Confluent offers a free 30-day trial of Confluent Control Center. Since the Control Center is provided at an additional fee, and I found difficult to configure for Confluent Cloud clusters based on Confluent’s documentation, I chose not to cover it in detail, for this post.
Tear Down Cluster
Delete your Confluent Cloud and MongoDB clusters using their web-based management consoles. To delete the GKE cluster and all deployed Kubernetes resources, use the cluster delete
command. Also, double-check that the external IP addresses and load balancer, associated with the cluster, were also deleted as part of the cluster deletion (gist).
#!/bin/bash | |
# | |
# author: Gary A. Stafford | |
# site: https://programmaticponderings.com | |
# license: MIT License | |
# purpose: Tear down GKE cluster and associated resources | |
# Constants – CHANGE ME! | |
readonly PROJECT='gke-confluent-atlas' | |
readonly CLUSTER='storefront-api' | |
readonly REGION='us-central1' | |
readonly ZONE='us-central1-a' | |
# Delete GKE cluster (time in foreground) | |
time yes | gcloud beta container clusters delete $CLUSTER –zone $ZONE | |
# Confirm network resources are also deleted | |
gcloud compute forwarding-rules list | |
gcloud compute target-pools list | |
gcloud compute firewall-rules list | |
# In case target-pool associated with Cluster is not deleted | |
yes | gcloud compute target-pools delete \ | |
$(gcloud compute target-pools list \ | |
–filter="region:($REGION)" –project $PROJECT \ | |
| awk 'NR==2 {print $1}') |
Conclusion
In this post, we have seen how easy it is to integrate Cloud-based DBaaS and MaaS products with the managed Kubernetes services from GCP, AWS, and Azure. As this post demonstrated, leading SaaS providers have sufficiently matured the integration capabilities of their product offerings to a point where it is now reasonable for enterprises to architect multi-vendor, single- and multi-cloud Production platforms, without re-engineering existing cloud-native applications.
In future posts, we will revisit this Storefront API example, further demonstrating how to enable HTTPS (Securing Your Istio Ingress Gateway with HTTPS) and end-user authentication (Istio End-User Authentication for Kubernetes using JSON Web Tokens (JWT) and Auth0)
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Using the Google Cloud Dataproc WorkflowTemplates API to Automate Spark and Hadoop Workloads on GCP
Posted by Gary A. Stafford in Bash Scripting, Big Data, Cloud, Continuous Delivery, DevOps, GCP, Java Development, Python, Software Development on December 16, 2018
In the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we explored Google Cloud Dataproc using the Google Cloud Console as well as the Google Cloud SDK and Cloud Dataproc API. We created clusters, then uploaded and ran Spark and PySpark jobs, then deleted clusters, each as discrete tasks. Although each task could be done via the Dataproc API and therefore automatable, they were independent tasks, without awareness of the previous task’s state.
In this brief follow-up post, we will examine the Cloud Dataproc WorkflowTemplates API to more efficiently and effectively automate Spark and Hadoop workloads. According to Google, the Cloud Dataproc WorkflowTemplates API provides a flexible and easy-to-use mechanism for managing and executing Dataproc workflows. A Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. A Workflow is an operation that runs a Directed Acyclic Graph (DAG) of jobs on a cluster. Shown below, we see one of the Workflows that will be demonstrated in this post, displayed in Spark History Server Web UI.
Here we see a four-stage DAG of one of the three jobs in the workflow, displayed in Spark History Server Web UI.
Workflows are ideal for automating large batches of dynamic Spark and Hadoop jobs, and for long-running and unattended job execution, such as overnight.
Demonstration
Using the Python and Java projects from the previous post, we will first create workflow templates using the just the WorkflowTemplates API. We will create the template, set a managed cluster, add jobs to the template, and instantiate the workflow. Next, we will further optimize and simplify our workflow by using a YAML-based workflow template file. The YAML-based template file eliminates the need to make API calls to set the template’s cluster and add the jobs to the template. Finally, to further enhance the workflow and promote re-use of the template, we will incorporate parameterization. Parameters will allow us to pass parameters (key/value) pairs from the command line to workflow template, and on to the Python script as input arguments.
It is not necessary to use the Google Cloud Console for this post. All steps will be done using Google Cloud SDK shell commands. This means all steps may be automated using CI/CD DevOps tools, like Jenkins and Spinnaker on GKE.
Source Code
All open-sourced code for this post can be found on GitHub within three repositories: dataproc-java-demo, dataproc-python-demo, and dataproc-workflow-templates. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.
WorkflowTemplates API
Always start by ensuring you have the latest Google Cloud SDK updates and are working within the correct Google Cloud project.
gcloud components update export PROJECT_ID=your-project-id gcloud config set project $PROJECT
Set the following variables based on your Google environment. The variables will be reused throughout the post for multiple commands.
export REGION=your-region export ZONE=your-zone export BUCKET_NAME=your-bucket
The post assumes you still have the Cloud Storage bucket we created in the previous post. In the bucket, you will need the two Kaggle IBRD CSV files, available on Kaggle, the compiled Java JAR file from the dataproc-java-demo project, and a new Python script, international_loans_dataproc.py, from the dataproc-python-demo project.
Use gsutil
with the copy (cp
) command to upload the four files to your Storage bucket.
gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME gsutil cp international_loans_dataproc.py $BUCKET_NAME
Following Google’s suggested process, we create a workflow template using the workflow-templates create
command.
export TEMPLATE_ID=template-demo-1 gcloud dataproc workflow-templates create \ $TEMPLATE_ID --region $REGION
Adding a Cluster
Next, we need to set a cluster for the workflow to use, in order to run the jobs. Cloud Dataproc will create and use a Managed Cluster for your workflow or use an existing cluster. If the workflow uses a managed cluster, it creates the cluster, runs the jobs, and then deletes the cluster when the jobs are finished. This means, for many use cases, there is no need to maintain long-lived clusters, they become just an ephemeral part of the workflow.
We set a managed cluster for our Workflow using the workflow-templates set-managed-cluster
command. We will re-use the same cluster specifications we used in the previous post, the Standard, 1 master node and 2 worker nodes, cluster type.
gcloud dataproc workflow-templates set-managed-cluster \ $TEMPLATE_ID \ --region $REGION \ --zone $ZONE \ --cluster-name three-node-cluster \ --master-machine-type n1-standard-4 \ --master-boot-disk-size 500 \ --worker-machine-type n1-standard-4 \ --worker-boot-disk-size 500 \ --num-workers 2 \ --image-version 1.3-deb9
Alternatively, if we already had an existing cluster, we would use the workflow-templates set-cluster-selector
command, to associate that cluster with the workflow template.
gcloud dataproc workflow-templates set-cluster-selector \ $TEMPLATE_ID \ --region $REGION \ --cluster-labels goog-dataproc-cluster-uuid=$CLUSTER_UUID
To get the existing cluster’s UUID label value, you could use a command similar to the following.
CLUSTER_UUID=$(gcloud dataproc clusters describe $CLUSTER_2 \ --region $REGION \ | grep 'goog-dataproc-cluster-uuid:' \ | sed 's/.* //') echo $CLUSTER_UUID 1c27efd2-f296-466e-b14e-c4263d0d7e19
Adding Jobs
Next, we add the jobs we want to run to the template. Each job is considered a step in the template, each step requires a unique step id. We will add three jobs to the template, two Java-based Spark jobs from the previous post, and a new Python-based PySpark job.
First, we add the two Java-based Spark jobs, using the workflow-templates add-job spark
command. This command’s flags are nearly identical to the dataproc jobs submit spark
command, used in the previous post.
export STEP_ID=ibrd-small-spark gcloud dataproc workflow-templates add-job spark \ --region $REGION \ --step-id $STEP_ID \ --workflow-template $TEMPLATE_ID \ --class org.example.dataproc.InternationalLoansAppDataprocSmall \ --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar export STEP_ID=ibrd-large-spark gcloud dataproc workflow-templates add-job spark \ --region $REGION \ --step-id $STEP_ID \ --workflow-template $TEMPLATE_ID \ --class org.example.dataproc.InternationalLoansAppDataprocLarge \ --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar
Next, we add the Python-based PySpark job, international_loans_dataproc.py, as the second job in the template. This Python script requires three input arguments, on lines 15–17, which are the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket where the results will be placed (gist).
#!/usr/bin/python | |
# Author: Gary A. Stafford | |
# License: MIT | |
# Arguments Example: | |
# gs://dataproc-demo-bucket | |
# ibrd-statement-of-loans-historical-data.csv | |
# ibrd-summary-large-python | |
from pyspark.sql import SparkSession | |
import sys | |
def main(argv): | |
storage_bucket = argv[0] | |
data_file = argv[1] | |
results_directory = argv[2] | |
print "Number of arguments: {0} arguments.".format(len(sys.argv)) | |
print "Argument List: {0}".format(str(sys.argv)) | |
spark = SparkSession \ | |
.builder \ | |
.master("yarn") \ | |
.appName('dataproc-python-demo') \ | |
.getOrCreate() | |
# Defaults to INFO | |
sc = spark.sparkContext | |
sc.setLogLevel("WARN") | |
# Loads CSV file from Google Storage Bucket | |
df_loans = spark \ | |
.read \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.option("inferSchema", "true") \ | |
.load(storage_bucket + "/" + data_file) | |
# Creates temporary view using DataFrame | |
df_loans.withColumnRenamed("Country", "country") \ | |
.withColumnRenamed("Country Code", "country_code") \ | |
.withColumnRenamed("Disbursed Amount", "disbursed") \ | |
.withColumnRenamed("Borrower's Obligation", "obligation") \ | |
.withColumnRenamed("Interest Rate", "interest_rate") \ | |
.createOrReplaceTempView("loans") | |
# Performs basic analysis of dataset | |
df_disbursement = spark.sql(""" | |
SELECT country, country_code, | |
format_number(total_disbursement, 0) AS total_disbursement, | |
format_number(ABS(total_obligation), 0) AS total_obligation, | |
format_number(avg_interest_rate, 2) AS avg_interest_rate | |
FROM ( | |
SELECT country, country_code, | |
SUM(disbursed) AS total_disbursement, | |
SUM(obligation) AS total_obligation, | |
AVG(interest_rate) AS avg_interest_rate | |
FROM loans | |
GROUP BY country, country_code | |
ORDER BY total_disbursement DESC | |
LIMIT 25) | |
""").cache() | |
print "Results:" | |
df_disbursement.show(25, True) | |
# Saves results to single CSV file in Google Storage Bucket | |
df_disbursement.write \ | |
.mode("overwrite") \ | |
.format("parquet") \ | |
.save(storage_bucket + "/" + results_directory) | |
spark.stop() | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
We pass the arguments to the Python script as part of the PySpark job, using the workflow-templates add-job pyspark
command.
export STEP_ID=ibrd-large-pyspark gcloud dataproc workflow-templates add-job pyspark \ $BUCKET_NAME/international_loans_dataproc.py \ --step-id $STEP_ID \ --workflow-template $TEMPLATE_ID \ --region $REGION \ -- $BUCKET_NAME \ ibrd-statement-of-loans-historical-data.csv \ ibrd-summary-large-python
That’s it, we have created our first Cloud Dataproc Workflow Template using the Dataproc WorkflowTemplate API. To view our template we can use the following two commands. First, use the workflow-templates list
command to display a list of available templates. The list
command output displays the version of the workflow template and how many jobs are in the template.
gcloud dataproc workflow-templates list --region $REGION ID JOBS UPDATE_TIME VERSION template-demo-1 3 2018-12-15T16:32:06.508Z 5
Then, we use the workflow-templates describe
command to show the details of a specific template.
gcloud dataproc workflow-templates describe \ $TEMPLATE_ID --region $REGION
Using the workflow-templates describe
command, we should see output similar to the following (gist).
createTime: '2018-12-15T16:31:21.779Z' | |
id: template-demo-1 | |
jobs: | |
– sparkJob: | |
jarFileUris: | |
– gs://dataproc-demo-bucket/dataprocJavaDemo-1.0-SNAPSHOT.jar | |
mainClass: org.example.dataproc.InternationalLoansAppDataprocSmall | |
stepId: ibrd-small-spark | |
– sparkJob: | |
jarFileUris: | |
– gs://dataproc-demo-bucket/dataprocJavaDemo-1.0-SNAPSHOT.jar | |
mainClass: org.example.dataproc.InternationalLoansAppDataprocLarge | |
stepId: ibrd-large-spark | |
– pysparkJob: | |
args: | |
– gs://dataproc-demo-bucket | |
– ibrd-statement-of-loans-historical-data.csv | |
– ibrd-summary-large-python | |
mainPythonFileUri: gs://dataproc-demo-bucket/international_loans_dataproc.py | |
stepId: ibrd-large-pyspark | |
name: projects/dataproc-demo-224523/regions/us-east1/workflowTemplates/template-demo-1 | |
placement: | |
managedCluster: | |
clusterName: three-node-cluster | |
config: | |
gceClusterConfig: | |
zoneUri: us-east1-b | |
masterConfig: | |
diskConfig: | |
bootDiskSizeGb: 500 | |
machineTypeUri: n1-standard-4 | |
softwareConfig: | |
imageVersion: 1.3-deb9 | |
workerConfig: | |
diskConfig: | |
bootDiskSizeGb: 500 | |
machineTypeUri: n1-standard-4 | |
numInstances: 2 | |
updateTime: '2018-12-15T16:32:06.508Z' | |
version: 5 |
In the template description, notice the template’s id, the managed cluster in the placement section, and the three jobs, all which we added using the above series of workflow-templates
commands. Also, notice the creation and update timestamps and version number, which were automatically generated by Dataproc. Lastly, notice the name, which refers to the GCP project and region where this copy of the template is located. Had we used an existing cluster with our workflow, as opposed to a managed cluster, the placement section would have looked as follows.
placement: clusterSelector: clusterLabels: goog-dataproc-cluster-uuid: your_clusters_uuid_label_value
To instantiate the workflow, we use the workflow-templates instantiate
command. This command will create the managed cluster, run all the steps (jobs), then delete the cluster. I have added the time
command to see how fast the workflow will take to complete.
time gcloud dataproc workflow-templates instantiate \ $TEMPLATE_ID --region $REGION #--async
We can observe the progress from the Google Cloud Dataproc Console, or from the command line by omitting the --async
flag. Below we see the three jobs completed successfully on the managed cluster.
Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/e720bb96-9c87-330e-b1cd-efa4612b3c57]. WorkflowTemplate [template-demo-1] RUNNING Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/e1fe53de-92f2-4f8c-8b3a-fda5e13829b6]. Created cluster: three-node-cluster-ugdo4ygpl52bo. Job ID ibrd-small-spark-ugdo4ygpl52bo RUNNING Job ID ibrd-large-spark-ugdo4ygpl52bo RUNNING Job ID ibrd-large-pyspark-ugdo4ygpl52bo RUNNING Job ID ibrd-small-spark-ugdo4ygpl52bo COMPLETED Job ID ibrd-large-spark-ugdo4ygpl52bo COMPLETED Job ID ibrd-large-pyspark-ugdo4ygpl52bo COMPLETED Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/f2a40c33-3cdf-47f5-92d6-345463fbd404]. WorkflowTemplate [template-demo-1] DONE Deleted cluster: three-node-cluster-ugdo4ygpl52bo. 1.02s user 0.35s system 0% cpu 5:03.55 total
In the output, you see the creation of the cluster, the three jobs running and completing successfully, and finally the cluster deletion. The entire workflow took approximately 5 minutes to complete. Below is the view of the workflow’s results from the Dataproc Clusters Console Jobs tab.
Below we see the output from the PySpark job, run as part of the workflow template, shown in the Dataproc Clusters Console Output tab. Notice the three input arguments we passed to the Python script from the workflow template, listed in the output.
We see the arguments passed to the job, from the Jobs Configuration tab.
Examining the Google Cloud Dataproc Jobs Console, we will observe that the WorkflowTemplate API automatically adds a unique alphanumeric extension to both the name of the managed clusters we create, as well as to the name of each job that is run. The extension on the cluster name matches the extension on the jobs ran on that cluster.
YAML-based Workflow Template
Although, the above WorkflowTemplates API-based workflow was certainly more convenient than using the individual Cloud Dataproc API commands. At a minimum, we don’t have to remember to delete our cluster when the jobs are complete, as I often do. To further optimize the workflow, we will introduce YAML-based Workflow Template. According to Google, you can define a workflow template in a YAML file, then instantiate the template to run the workflow. You can also import and export a workflow template YAML file to create and update a Cloud Dataproc workflow template resource.
We can export our first workflow template to create our YAML-based template file.
gcloud dataproc workflow-templates export template-demo-1 \ --destination template-demo-2.yaml \ --region $REGION
Below is our first YAML-based template, template-demo-2.yaml. You will need to replace the values in the template with your own values, based on your environment (gist).
jobs: | |
– sparkJob: | |
jarFileUris: | |
– gs://dataproc-demo-bucket/dataprocJavaDemo-1.0-SNAPSHOT.jar | |
mainClass: org.example.dataproc.InternationalLoansAppDataprocSmall | |
stepId: ibrd-small-spark | |
– sparkJob: | |
jarFileUris: | |
– gs://dataproc-demo-bucket/dataprocJavaDemo-1.0-SNAPSHOT.jar | |
mainClass: org.example.dataproc.InternationalLoansAppDataprocLarge | |
stepId: ibrd-large-spark | |
– pysparkJob: | |
args: | |
– gs://dataproc-demo-bucket | |
– ibrd-statement-of-loans-historical-data.csv | |
– ibrd-summary-large-python | |
mainPythonFileUri: gs://dataproc-demo-bucket/international_loans_dataproc.py | |
stepId: ibrd-large-pyspark | |
placement: | |
managedCluster: | |
clusterName: three-node-cluster | |
config: | |
gceClusterConfig: | |
zoneUri: us-east1-b | |
masterConfig: | |
diskConfig: | |
bootDiskSizeGb: 500 | |
machineTypeUri: n1-standard-4 | |
softwareConfig: | |
imageVersion: 1.3-deb9 | |
workerConfig: | |
diskConfig: | |
bootDiskSizeGb: 500 | |
machineTypeUri: n1-standard-4 | |
numInstances: 2 |
Note the template looks almost similar to the template we just created previously using the WorkflowTemplates API. The YAML-based template requires the placement and jobs fields. All the available fields are detailed, here.
To run the template we use the workflow-templates instantiate-from-file
command. Again, I will use the time
command to measure performance.
time gcloud dataproc workflow-templates instantiate-from-file \ --file template-demo-2.yaml \ --region $REGION
Running the workflow-templates instantiate-from-file
command will run a workflow, nearly identical to the workflow we ran in the previous example, with a similar timing. Below we see the three jobs completed successfully on the managed cluster, in approximately the same time as the previous workflow.
Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/7ba3c28e-ebfa-32e7-9dd6-d938a1cfe23b]. WorkflowTemplate RUNNING Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/8d05199f-ed36-4787-8a28-ae784c5bc8ae]. Created cluster: three-node-cluster-5k3bdmmvnna2y. Job ID ibrd-small-spark-5k3bdmmvnna2y RUNNING Job ID ibrd-large-spark-5k3bdmmvnna2y RUNNING Job ID ibrd-large-pyspark-5k3bdmmvnna2y RUNNING Job ID ibrd-small-spark-5k3bdmmvnna2y COMPLETED Job ID ibrd-large-spark-5k3bdmmvnna2y COMPLETED Job ID ibrd-large-pyspark-5k3bdmmvnna2y COMPLETED Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/a436ae82-f171-4b0a-9b36-5e16406c75d5]. WorkflowTemplate DONE Deleted cluster: three-node-cluster-5k3bdmmvnna2y. 1.16s user 0.44s system 0% cpu 4:48.84 total
Parameterization of Templates
To further optimize the workflow template process for re-use, we have the option of passing parameters to our template. Imagine you now receive new loan snapshot data files every night. Imagine you need to run the same data analysis on the financial transactions of thousands of your customers, nightly. Parameterizing templates makes it more flexible and reusable. By removing hard-codes values, such as Storage bucket paths and data file names, a single template may be re-used for multiple variations of the same job. Parameterization allows you to automate hundreds or thousands of Spark and Hadoop jobs in a workflow or workflows, each with different parameters, programmatically.
To demonstrate the parameterization of a workflow template, we create another YAML-based template with just the Python/PySpark job, template-demo-3.yaml. If you recall from our first example, the Python script, international_loans_dataproc.py, requires three input arguments: the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket, where the results will be placed.
We will replace four of the values in the template with parameters. We will inject those parameter’s values when we instantiate the workflow. Below is the new parameterized template. The template now has a parameters section from lines 26–46. They define parameters that will be used to replace the four values on lines 3–7 (gist).
jobs: | |
– pysparkJob: | |
args: | |
– storage_bucket_parameter | |
– data_file_parameter | |
– results_directory_parameter | |
mainPythonFileUri: main_python_file_parameter | |
stepId: ibrd-pyspark | |
placement: | |
managedCluster: | |
clusterName: three-node-cluster | |
config: | |
gceClusterConfig: | |
zoneUri: us-east1-b | |
masterConfig: | |
diskConfig: | |
bootDiskSizeGb: 500 | |
machineTypeUri: n1-standard-4 | |
softwareConfig: | |
imageVersion: 1.3-deb9 | |
workerConfig: | |
diskConfig: | |
bootDiskSizeGb: 500 | |
machineTypeUri: n1-standard-4 | |
numInstances: 2 | |
parameters: | |
– description: Python script to run | |
fields: | |
– jobs['ibrd-pyspark'].pysparkJob.mainPythonFileUri | |
name: MAIN_PYTHON_FILE | |
– description: Storage bucket location of data file and results | |
fields: | |
– jobs['ibrd-pyspark'].pysparkJob.args[0] | |
name: STORAGE_BUCKET | |
validation: | |
regex: | |
regexes: | |
– gs://.* | |
– description: IBRD data file | |
fields: | |
– jobs['ibrd-pyspark'].pysparkJob.args[1] | |
name: IBRD_DATA_FILE | |
– description: Result directory | |
fields: | |
– jobs['ibrd-pyspark'].pysparkJob.args[2] | |
name: RESULTS_DIRECTORY |
Note the PySpark job’s three arguments and the location of the Python script have been parameterized. Parameters may include validation. As an example of validation, the template uses regex to validate the format of the Storage bucket path. The regex follows Google’s RE2 regular expression library syntax. If you need help with regex, the Regex Tester – Golang website is a convenient way to test your parameter’s regex validations.
First, we import the new parameterized YAML-based workflow template, using the workflow-templates import
command. Then, we instantiate the template using the workflow-templates instantiate
command. The workflow-templates instantiate
command will run the single PySpark job, analyzing the smaller IBRD data file, and placing the resulting Parquet-format file in a directory within the Storage bucket. We pass the Python script location, bucket link, smaller IBRD data file name, and output directory, as parameters to the template, and therefore indirectly, three of these, as input arguments to the Python script.
export TEMPLATE_ID=template-demo-3 gcloud dataproc workflow-templates import $TEMPLATE_ID \ --region $REGION --source template-demo-3.yaml gcloud dataproc workflow-templates instantiate \ $TEMPLATE_ID --region $REGION --async \ --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-latest-available-snapshot.csv",RESULTS_DIRECTORY="ibrd-summary-small-python"
Next, we will analyze the larger historic data file, using the same parameterized YAML-based workflow template, but changing two of the four parameters we are passing to the template with the workflow-templates instantiate
command. This will run a single PySpark job on the larger IBRD data file and place the resulting Parquet-format file in a different directory within the Storage bucket.
time gcloud dataproc workflow-templates instantiate \ $TEMPLATE_ID --region $REGION \ --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-historical-data.csv",RESULTS_DIRECTORY="ibrd-summary-large-python"
This is the power of parameterization—one workflow template and one job script, but two different datasets and two different results.
Below we see the single PySpark job ran on the managed cluster.
Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/b3c5063f-e3cf-3833-b613-83db12b82f32]. WorkflowTemplate [template-demo-3] RUNNING Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105]. Created cluster: three-node-cluster-j6q2al2mkkqck. Job ID ibrd-pyspark-j6q2al2mkkqck RUNNING Job ID ibrd-pyspark-j6q2al2mkkqck COMPLETED Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/fe4a263e-7c6d-466e-a6e2-52292cbbdc9b]. WorkflowTemplate [template-demo-3] DONE Deleted cluster: three-node-cluster-j6q2al2mkkqck. 0.98s user 0.40s system 0% cpu 4:19.42 total
Using the workflow-templates list
command again, should display a list of two workflow templates.
gcloud dataproc workflow-templates list --region $REGION ID JOBS UPDATE_TIME VERSION template-demo-3 1 2018-12-15T17:04:39.064Z 2 template-demo-1 3 2018-12-15T16:32:06.508Z 5
Looking within the Google Cloud Storage bucket, we should now see four different folders, the results of the workflows.
Job Results and Testing
To check on the status of a job, we use the dataproc jobs wait
command. This returns the standard output (stdout) and standard error (stderr) for that specific job.
export SET_ID=ibrd-large-dataset-pyspark-cxzzhr2ro3i54 gcloud dataproc jobs wait $SET_ID \ --project $PROJECT_ID \ --region $REGION
The dataproc jobs wait
command is frequently used for automated testing of jobs, often within a CI/CD pipeline. Assume we have expected part of the job output that indicates success, such as a string, boolean, or numeric value. We could any number of test frameworks or other methods to confirm the existence of that expected value or values. Below is a simple example of using the grep
command to check for the existence of the expected line ‘ state: FINISHED
’ in the standard output of the dataproc jobs wait
command.
command=$(gcloud dataproc jobs wait $SET_ID \ --project $PROJECT_ID \ --region $REGION) &>/dev/null if grep -Fqx " state: FINISHED" <<< $command &>/dev/null; then echo "Job Success!" else echo "Job Failure?" fi # single line alternative if grep -Fqx " state: FINISHED" <<< $command &>/dev/null;then echo "Job Success!";else echo "Job Failure?";fi Job Success!
Individual Operations
To view individual workflow operations, use the operations list
and operations describe
commands. The operations list
command will list all operations.
Notice the three distinct series of operations within each workflow, shown with the operations list
command: WORKFLOW, CREATE, and DELETE. In the example below, I’ve separated the operations by workflow, for better clarity.
gcloud dataproc operations list --region $REGION NAME TIMESTAMP TYPE STATE ERROR WARNINGS fe4a263e-7c6d-466e-a6e2-52292cbbdc9b 2018-12-15T17:11:45.178Z DELETE DONE 896b7922-da8e-49a9-bd80-b1ac3fda5105 2018-12-15T17:08:38.322Z CREATE DONE b3c5063f-e3cf-3833-b613-83db12b82f32 2018-12-15T17:08:37.497Z WORKFLOW DONE --- be0e5293-275f-46ad-b1f4-696ba44c222e 2018-12-15T17:07:26.305Z DELETE DONE 6784078c-cbe3-4c1e-a56e-217149f555a4 2018-12-15T17:04:40.613Z CREATE DONE fcd8039e-a260-3ab3-ad31-01abc1a524b4 2018-12-15T17:04:40.007Z WORKFLOW DONE --- b4b23ca6-9442-4ffb-8aaf-460bac144dd8 2018-12-15T17:02:16.744Z DELETE DONE 89ef9c7c-f3c9-4d01-9091-61ed9e1f085d 2018-12-15T17:01:45.514Z CREATE DONE 243fa7c1-502d-3d7a-aaee-b372fe317570 2018-12-15T17:01:44.895Z WORKFLOW DONE
We use the results of the operations list
command to execute the operations describe
command to describe a specific operation.
gcloud dataproc operations describe \ projects/$PROJECT_ID/regions/$REGION/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105
Each type of operation contains different details. Note the fine-grain of detail we get from Dataproc using the operations describe
command for a CREATE operation (gist).
projects/$PROJECT_ID/regions/$REGION/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105 | |
done: true | |
metadata: | |
'@type': type.googleapis.com/google.cloud.dataproc.v1beta2.ClusterOperationMetadata | |
clusterName: three-node-cluster-j6q2al2mkkqck | |
clusterUuid: 10656c6e-ef49-4264-805b-463e1e819626 | |
description: Create cluster with 2 workers | |
operationType: CREATE | |
status: | |
innerState: DONE | |
state: DONE | |
stateStartTime: '2018-12-15T17:10:12.722Z' | |
statusHistory: | |
– state: PENDING | |
stateStartTime: '2018-12-15T17:08:38.322Z' | |
– state: RUNNING | |
stateStartTime: '2018-12-15T17:08:38.380Z' | |
name: projects/dataproc-demo-224523/regions/us-east1/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105 | |
response: | |
'@type': type.googleapis.com/google.cloud.dataproc.v1beta2.Cluster | |
clusterName: three-node-cluster-j6q2al2mkkqck | |
clusterUuid: 10656c6e-ef49-4264-805b-463e1e819626 | |
config: | |
configBucket: dataproc-5214e13c-d3ea-400b-9c70-11ee08fac5ab-us-east1 | |
gceClusterConfig: | |
networkUri: https://www.googleapis.com/compute/v1/projects/dataproc-demo-224523/global/networks/default | |
serviceAccountScopes: | |
– https://www.googleapis.com/auth/bigquery | |
– https://www.googleapis.com/auth/bigtable.admin.table | |
– https://www.googleapis.com/auth/bigtable.data | |
– https://www.googleapis.com/auth/cloud.useraccounts.readonly | |
– https://www.googleapis.com/auth/devstorage.full_control | |
– https://www.googleapis.com/auth/devstorage.read_write | |
– https://www.googleapis.com/auth/logging.write | |
zoneUri: https://www.googleapis.com/compute/v1/projects/dataproc-demo-224523/zones/us-east1-b | |
masterConfig: | |
diskConfig: | |
bootDiskSizeGb: 500 | |
bootDiskType: pd-standard | |
imageUri: https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-1-3-deb9-20181206-000000-rc01 | |
machineTypeUri: https://www.googleapis.com/compute/v1/projects/dataproc-demo-224523/zones/us-east1-b/machineTypes/n1-standard-4 | |
minCpuPlatform: AUTOMATIC | |
numInstances: 1 | |
softwareConfig: | |
imageVersion: 1.3.19-deb9 | |
properties: | |
capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy: fair | |
core:fs.gs.block.size: '134217728' | |
core:fs.gs.metadata.cache.enable: 'false' | |
distcp:mapreduce.map.java.opts: -Xmx768m | |
distcp:mapreduce.map.memory.mb: '1024' | |
distcp:mapreduce.reduce.java.opts: -Xmx768m | |
distcp:mapreduce.reduce.memory.mb: '1024' | |
hdfs:dfs.datanode.address: 0.0.0.0:9866 | |
hdfs:dfs.datanode.http.address: 0.0.0.0:9864 | |
hdfs:dfs.datanode.https.address: 0.0.0.0:9865 | |
hdfs:dfs.datanode.ipc.address: 0.0.0.0:9867 | |
hdfs:dfs.namenode.handler.count: '20' | |
hdfs:dfs.namenode.http-address: 0.0.0.0:9870 | |
hdfs:dfs.namenode.https-address: 0.0.0.0:9871 | |
hdfs:dfs.namenode.lifeline.rpc-address: three-node-cluster-j6q2al2mkkqck-m:8050 | |
hdfs:dfs.namenode.secondary.http-address: 0.0.0.0:9868 | |
hdfs:dfs.namenode.secondary.https-address: 0.0.0.0:9869 | |
hdfs:dfs.namenode.service.handler.count: '10' | |
hdfs:dfs.namenode.servicerpc-address: three-node-cluster-j6q2al2mkkqck-m:8051 | |
mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE: '3840' | |
mapred:mapreduce.job.maps: '21' | |
mapred:mapreduce.job.reduce.slowstart.completedmaps: '0.95' | |
mapred:mapreduce.job.reduces: '7' | |
mapred:mapreduce.map.cpu.vcores: '1' | |
mapred:mapreduce.map.java.opts: -Xmx2457m | |
mapred:mapreduce.map.memory.mb: '3072' | |
mapred:mapreduce.reduce.cpu.vcores: '1' | |
mapred:mapreduce.reduce.java.opts: -Xmx2457m | |
mapred:mapreduce.reduce.memory.mb: '3072' | |
mapred:mapreduce.task.io.sort.mb: '256' | |
mapred:yarn.app.mapreduce.am.command-opts: -Xmx2457m | |
mapred:yarn.app.mapreduce.am.resource.cpu-vcores: '1' | |
mapred:yarn.app.mapreduce.am.resource.mb: '3072' | |
presto-jvm:MaxHeapSize: 12288m | |
presto:query.max-memory-per-node: 7372MB | |
presto:query.max-total-memory-per-node: 7372MB | |
spark-env:SPARK_DAEMON_MEMORY: 3840m | |
spark:spark.driver.maxResultSize: 1920m | |
spark:spark.driver.memory: 3840m | |
spark:spark.executor.cores: '2' | |
spark:spark.executor.instances: '2' | |
spark:spark.executor.memory: 5586m | |
spark:spark.executorEnv.OPENBLAS_NUM_THREADS: '1' | |
spark:spark.scheduler.mode: FAIR | |
spark:spark.sql.cbo.enabled: 'true' | |
spark:spark.yarn.am.memory: 640m | |
yarn-env:YARN_TIMELINESERVER_HEAPSIZE: '3840' | |
yarn:yarn.nodemanager.resource.memory-mb: '12288' | |
yarn:yarn.scheduler.maximum-allocation-mb: '12288' | |
yarn:yarn.scheduler.minimum-allocation-mb: '1024' | |
workerConfig: | |
diskConfig: | |
bootDiskSizeGb: 500 | |
bootDiskType: pd-standard | |
imageUri: https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-1-3-deb9-20181206-000000-rc01 | |
machineTypeUri: https://www.googleapis.com/compute/v1/projects/dataproc-demo-224523/zones/us-east1-b/machineTypes/n1-standard-4 | |
minCpuPlatform: AUTOMATIC | |
numInstances: 2 | |
labels: | |
goog-dataproc-cluster-name: three-node-cluster-j6q2al2mkkqck | |
goog-dataproc-cluster-uuid: 10656c6e-ef49-4264-805b-463e1e819626 | |
goog-dataproc-location: us-east1 | |
goog-dataproc-workflow-instance-id: b3c5063f-e3cf-3833-b613-83db12b82f32 | |
goog-dataproc-workflow-template-id: template-demo-3 | |
projectId: dataproc-demo-224523 |
Conclusion
In this brief, follow-up post to the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we have seen how easy the WorkflowTemplates API and YAML-based workflow templates make automating our analytics jobs. This post only scraped the surface of the complete functionality of the WorkflowTemplates API and parameterization of templates.
In a future post, we leverage the automation capabilities of the Google Cloud Platform, the WorkflowTemplates API, YAML-based workflow templates, and parameterization, to develop a fully-automated DevOps for Big Data workflow, capable of running hundreds of Spark and Hadoop jobs.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.
Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service
Posted by Gary A. Stafford in Big Data, Cloud, GCP, Java Development, Python, Software Development, Technology Consulting on December 11, 2018
There is little question, big data analytics, data science, artificial intelligence (AI), and machine learning (ML), a subcategory of AI, have all experienced a tremendous surge in popularity over the last few years. Behind the hype curves and marketing buzz, these technologies are having a significant influence on all aspects of our modern lives.
However, installing, configuring, and managing the technologies that support big data analytics, data science, ML, and AI, at scale and in Production, often demands an advanced level of familiarity with Linux, distributed systems, cloud- and container-based platforms, databases, and data-streaming applications. The mere ability to manage terabytes and petabytes of transient data is beyond the capability of many enterprises, let alone performing analysis of that data.
To ease the burden of implementing these technologies, the three major cloud providers, AWS, Azure, and Google Cloud, all have multiple Big Data Analytics-, AI-, and ML-as-a-Service offerings. In this post, we will explore one such cloud-based service offering in the field of big data analytics, Google Cloud Dataproc. We will focus on Cloud Dataproc’s ability to quickly and efficiently run Spark jobs written in Java and Python, two widely adopted enterprise programming languages.
Featured Technologies
The following technologies are featured prominently in this post.
Google Cloud Dataproc
According to Google, Cloud Dataproc is a fast, easy-to-use, fully-managed cloud service for running the Apache Spark and Apache Hadoop ecosystem on Google Cloud Platform. Dataproc is a complete platform for data processing, analytics, and machine learning. Dataproc offers per-second billing, so you only pay for exactly the resources you consume. Dataproc offers frequently updated and native versions of Apache Spark, Hadoop, Pig, and Hive, as well as other related applications. Dataproc has built-in integrations with other Google Cloud Platform (GCP) services, such as Cloud Storage, BigQuery, Bigtable, Stackdriver Logging, and Stackdriver Monitoring. Dataproc’s clusters are configurable and resizable from a three to hundreds of nodes, and each cluster action takes less than 90 seconds on average.
Similar Platform as a Service (PaaS) offerings to Dataproc, include Amazon Elastic MapReduce (EMR), Microsoft Azure HDInsight, and Qubole Data Service. Qubole is offered on AWS, Azure, and Oracle Cloud Infrastructure (Oracle OCI).
According to Google, Cloud Dataproc and Cloud Dataflow, both part of GCP’s Data Analytics/Big Data Product offerings, can both be used for data processing, and there’s overlap in their batch and streaming capabilities. Cloud Dataflow is a fully-managed service for transforming and enriching data in stream and batch modes. Dataflow uses the Apache Beam SDK to provide developers with Java and Python APIs, similar to Spark.
Apache Spark
According to Apache, Spark is a unified analytics engine for large-scale data processing, used by well-known, modern enterprises, such as Netflix, Yahoo, and eBay. With in-memory speeds up to 100x faster than Hadoop, Apache Spark achieves high performance for static, batch, and streaming data, using a state-of-the-art DAG (Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.
According to a post by DataFlair, ‘the DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD (Resilient Distributed Dataset). In Spark DAG, every edge directs from earlier to later in the sequence. On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.’ Below, we see a three-stage DAG visualization, displayed using the Spark History Server Web UI, from a job demonstrated in this post.
Spark’s polyglot programming model allows users to write applications in Scala, Java, Python, R, and SQL. Spark includes libraries for Spark SQL (DataFrames and Datasets), MLlib (Machine Learning), GraphX (Graph Processing), and DStreams (Spark Streaming). Spark may be run using its standalone cluster mode or on Apache Hadoop YARN, Mesos, and Kubernetes.
PySpark
The Spark Python API, PySpark, exposes the Spark programming model to Python. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached and shuffled in the JVM. According to Apache, Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a JVM.
Apache Hadoop
According to Apache, the Apache Hadoop project develops open-source software for reliable, scalable, distributed computing. The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. This is a rather modest description of such a significant and transformative project. When we talk about Hadoop, often it is in the context of the project’s well-known modules, which includes:
- Hadoop Common: The common utilities that support the other Hadoop modules
- Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data
- Hadoop YARN (Yet Another Resource Negotiator): A framework for job scheduling and cluster resource management, also known as ‘Hadoop NextGen’
- Hadoop MapReduce: A YARN-based system for parallel processing of large datasets
- Hadoop Ozone: An object store for Hadoop
Based on the Powered by Apache Hadoop list, there are many well-known enterprises and academic institutions using Apache Hadoop, including Adobe, eBay, Facebook, Hulu, LinkedIn, and The New York Times.
Spark vs. Hadoop
There are many articles and posts that delve into the Spark versus Hadoop debate, this post is not one of them. Although both are mature technologies, Spark, the new kid on the block, reached version 1.0.0 in May 2014, whereas Hadoop reached version 1.0.0, earlier, in December 2011. According to Google Trends, interest in both technologies has remained relatively high over the last three years. However, interest in Spark, based on the volume of searches, has been steadily outpacing Hadoop for well over a year now. The in-memory speed of Spark over HDFS-based Hadoop and ease of Spark SQL for working with structured data are likely big differentiators for many users coming from a traditional relational database background and users with large or streaming datasets, requiring near real-time processing.
In this post, all examples are built to run on Spark. This is not meant to suggest Spark is necessarily superior or that Spark runs better on Dataproc than Hadoop. In fact, Dataproc’s implementation of Spark relies on Hadoop’s core HDFS and YARN technologies to run.
Demonstration
To show the capabilities of Cloud Dataproc, we will create both a single-node Dataproc cluster and three-node cluster, upload Java- and Python-based analytics jobs and data to Google Cloud Storage, and execute the jobs on the Spark cluster. Finally, we will enable monitoring and notifications for the Dataproc clusters and the jobs running on the clusters with Stackdriver. The post will demonstrate the use of the Google Cloud Console, as well as Google’s Cloud SDK’s command line tools, for all tasks.
In this post, we will be uploading and running individual jobs on the Dataproc Spark cluster, as opposed to using the Cloud Dataproc Workflow Templates. According to Google, Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. Workflow Templates are useful for automating your Datapoc workflows, however, automation is not the primary topic of this post.
Source Code
All open-sourced code for this post can be found on GitHub in two repositories, one for Java with Spark and one for Python with PySpark. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.
Cost
Of course, there is a cost associated with provisioning cloud services. However, if you manage the Google Cloud Dataproc resources prudently, the costs are negligible. Regarding pricing, according to Google, Cloud Dataproc pricing is based on the size of Cloud Dataproc clusters and the duration of time that they run. The size of a cluster is based on the aggregate number of virtual CPUs (vCPUs) across the entire cluster, including the master and worker nodes. The duration of a cluster is the length of time, measured in minutes, between cluster creation and cluster deletion.
Over the course of writing the code for this post, as well as writing the post itself, the entire cost of all the related resources was a minuscule US$7.50. The cost includes creating, running, and deleting more than a dozen Dataproc clusters and uploading and executing approximately 75-100 Spark and PySpark jobs. Given the quick creation time of a cluster, 2 minutes on average or less in this demonstration, there is no reason to leave a cluster running longer than it takes to complete your workloads.
Kaggle Datasets
To explore the features of Dataproc, we will use a publicly-available dataset from Kaggle. Kaggle is a popular open-source resource for datasets used for big-data and ML applications. Their tagline is ‘Kaggle is the place to do data science projects’.
For this demonstration, I chose the IBRD Statement Of Loans Data dataset, from World Bank Financial Open Data, and available on Kaggle. The International Bank for Reconstruction and Development (IBRD) loans are public and publicly guaranteed debt extended by the World Bank Group. IBRD loans are made to, or guaranteed by, countries that are members of IBRD. This dataset contains historical snapshots of the Statement of Loans including the latest available snapshots.
There are two data files available. The ‘Statement of Loans’ latest available snapshots data file contains 8,713 rows of loan data (~3 MB), ideal for development and testing. The ‘Statement of Loans’ historic data file contains approximately 750,000 rows of data (~265 MB). Although not exactly ‘big data’, the historic dataset is large enough to sufficiently explore Dataproc. Both IBRD files have an identical schema with 33 columns of data (gist).
End of Period | Loan Number | Region | Country Code | Country | Borrower | Guarantor Country Code | Guarantor | Loan Type | Loan Status | Interest Rate | Currency of Commitment | Project ID | Project Name | Original Principal Amount | Cancelled Amount | Undisbursed Amount | Disbursed Amount | Repaid to IBRD | Due to IBRD | Exchange Adjustment | Borrower's Obligation | Sold 3rd Party | Repaid 3rd Party | Due 3rd Party | Loans Held | First Repayment Date | Last Repayment Date | Agreement Signing Date | Board Approval Date | Effective Date (Most Recent) | Closed Date (Most Recent) | Last Disbursement Date | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2018-10-31T00:00:00 | IBRD00010 | EUROPE AND CENTRAL ASIA | FR | France | CREDIT NATIONAL | FR | France | NON POOL | Fully Repaid | 4.2500 | P037383 | RECONSTRUCTION | 250000000.00 | 0.00 | 0.00 | 250000000.00 | 38000.00 | 0.00 | 0.00 | 0.00 | 249962000.00 | 249962000.00 | 0.00 | 0.00 | 1952-11-01T00:00:00 | 1977-05-01T00:00:00 | 1947-05-09T00:00:00 | 1947-05-09T00:00:00 | 1947-06-09T00:00:00 | 1947-12-31T00:00:00 | |||
2018-10-31T00:00:00 | IBRD00020 | EUROPE AND CENTRAL ASIA | NL | Netherlands | NON POOL | Fully Repaid | 4.2500 | P037452 | RECONSTRUCTION | 191044211.75 | 0.00 | 0.00 | 191044211.75 | 103372211.75 | 0.00 | 0.00 | 0.00 | 87672000.00 | 87672000.00 | 0.00 | 0.00 | 1952-04-01T00:00:00 | 1972-10-01T00:00:00 | 1947-08-07T00:00:00 | 1947-08-07T00:00:00 | 1947-09-11T00:00:00 | 1948-03-31T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00021 | EUROPE AND CENTRAL ASIA | NL | Netherlands | NON POOL | Fully Repaid | 4.2500 | P037452 | RECONSTRUCTION | 3955788.25 | 0.00 | 0.00 | 3955788.25 | 0.00 | 0.00 | 0.00 | 0.00 | 3955788.25 | 3955788.25 | 0.00 | 0.00 | 1953-04-01T00:00:00 | 1954-04-01T00:00:00 | 1948-05-25T00:00:00 | 1947-08-07T00:00:00 | 1948-06-01T00:00:00 | 1948-06-30T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00030 | EUROPE AND CENTRAL ASIA | DK | Denmark | NON POOL | Fully Repaid | 4.2500 | P037362 | RECONSTRUCTION | 40000000.00 | 0.00 | 0.00 | 40000000.00 | 17771000.00 | 0.00 | 0.00 | 0.00 | 22229000.00 | 22229000.00 | 0.00 | 0.00 | 1953-02-01T00:00:00 | 1972-08-01T00:00:00 | 1947-08-22T00:00:00 | 1947-08-22T00:00:00 | 1947-10-17T00:00:00 | 1949-03-31T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00040 | EUROPE AND CENTRAL ASIA | LU | Luxembourg | NON POOL | Fully Repaid | 4.2500 | P037451 | RECONSTRUCTION | 12000000.00 | 238016.98 | 0.00 | 11761983.02 | 1619983.02 | 0.00 | 0.00 | 0.00 | 10142000.00 | 10142000.00 | 0.00 | 0.00 | 1949-07-15T00:00:00 | 1972-07-15T00:00:00 | 1947-08-28T00:00:00 | 1947-08-28T00:00:00 | 1947-10-24T00:00:00 | 1949-03-31T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00050 | LATIN AMERICA AND CARIBBEAN | CL | Chile | Ministry of Finance | CL | Chile | NON POOL | Fully Repaid | 4.5000 | P006578 | POWER | 13500000.00 | 0.00 | 0.00 | 13500000.00 | 12167000.00 | 0.00 | 0.00 | 0.00 | 1333000.00 | 1333000.00 | 0.00 | 0.00 | 1953-07-01T00:00:00 | 1968-07-01T00:00:00 | 1948-03-25T00:00:00 | 1948-03-25T00:00:00 | 1949-04-07T00:00:00 | 1954-12-31T00:00:00 | |||
2018-10-31T00:00:00 | IBRD00060 | LATIN AMERICA AND CARIBBEAN | CL | Chile | Ministry of Finance | CL | Chile | NON POOL | Fully Repaid | 3.7500 | P006577 | FOMENTO AGRIC CREDIT | 2500000.00 | 0.00 | 0.00 | 2500000.00 | 755000.00 | 0.00 | 0.00 | 0.00 | 1745000.00 | 1745000.00 | 0.00 | 0.00 | 1950-07-01T00:00:00 | 1955-01-01T00:00:00 | 1948-03-25T00:00:00 | 1948-03-25T00:00:00 | 1949-04-07T00:00:00 | 1950-01-01T00:00:00 | |||
2018-10-31T00:00:00 | IBRD00070 | EUROPE AND CENTRAL ASIA | NL | Netherlands | NON POOL | Fully Repaid | 3.5625 | P037453 | SHIPPING I | 2000000.00 | 0.00 | 0.00 | 2000000.00 | 0.00 | 0.00 | 0.00 | 0.00 | 2000000.00 | 2000000.00 | 0.00 | 0.00 | 1949-01-15T00:00:00 | 1958-07-15T00:00:00 | 1948-07-15T00:00:00 | 1948-05-21T00:00:00 | 1948-08-03T00:00:00 | 1948-08-03T00:00:00 | ||||||
2018-10-31T00:00:00 | IBRD00071 | EUROPE AND CENTRAL ASIA | NL | Netherlands | NON POOL | Fully Repaid | 3.5625 | P037453 | SHIPPING I | 2000000.00 | 0.00 | 0.00 | 2000000.00 | 0.00 | 0.00 | 0.00 | 0.00 | 2000000.00 | 2000000.00 | 0.00 | 0.00 | 1949-01-15T00:00:00 | 1958-07-15T00:00:00 | 1948-07-15T00:00:00 | 1948-05-21T00:00:00 | 1948-08-03T00:00:00 | 1948-08-03T00:00:00 |
In this demonstration, both the Java and Python jobs will perform the same simple analysis of the larger historic dataset. For the analysis, we will ascertain the top 25 historic IBRD borrower, we will determine their total loan disbursements, current loan obligations, and the average interest rates they were charged for all loans. This simple analysis will be performed using Spark’s SQL capabilities. The results of the analysis, a Spark DataFrame containing 25 rows, will be saved as a CSV-format data file.
SELECT country, country_code, Format_number(total_disbursement, 0) AS total_disbursement, Format_number(total_obligation, 0) AS total_obligation, Format_number(avg_interest_rate, 2) AS avg_interest_rate FROM (SELECT country, country_code, Sum(disbursed) AS total_disbursement, Sum(obligation) AS total_obligation, Avg(interest_rate) AS avg_interest_rate FROM loans GROUP BY country, country_code ORDER BY total_disbursement DESC LIMIT 25)
Google Cloud Storage
First, we need a location to store our Spark jobs, data files, and results, which will be accessible to Dataproc. Although there are a number of choices, the simplest and most convenient location for Dataproc is a Google Cloud Storage bucket. According to Google, Cloud Storage offers the highest level of availability and performance within a single region and is ideal for compute, analytics, and ML workloads in a particular region. Cloud Storage buckets are nearly identical to Amazon Simple Storage Service (Amazon S3), their object storage service.
Using the Google Cloud Console, Google’s Web Admin UI, create a new, uniquely named Cloud Storage bucket. Our Dataproc clusters will eventually be created in a single regional location. We need to ensure our new bucket is created in the same regional location as the clusters; I chose us-east1.
We will need the new bucket’s link, to use within the Java and Python code as well from the command line with gsutil
. The gsutil
tool is a Python application that lets you access Cloud Storage from the command line. The bucket’s link may be found on the Storage Browser Console’s Overview tab. A bucket’s link is always in the format, gs://bucket-name
.
Alternatively, we may also create the Cloud Storage bucket using gsutil
with the make buckets (mb
) command, as follows:
# Always best practice since features are updated frequently gcloud components update export PROJECT=your_project_name export REGION=us-east1 export BUCKET_NAME=gs://your_bucket_name # Make sure you are creating resources in the correct project gcloud config set project $PROJECT gsutil mb -p $PROJECT -c regional -l $REGION $BUCKET_NAME
Cloud Dataproc Cluster
Next, we will create two different Cloud Dataproc clusters for demonstration purposes. If you have not used Cloud Dataproc previously in your GCP Project, you will first need to enable the API for Cloud Dataproc.
Single Node Cluster
We will start with a single node cluster with no worker nodes, suitable for development and testing Spark and Hadoop jobs, using small datasets. Create a single-node Dataproc cluster using the Single Node Cluster mode option. Create the cluster in the same region as the new Cloud Storage bucket. This will allow the Dataproc cluster access to the bucket without additional security or IAM configuration. I used the n1-standard-1
machine type, with 1 vCPU and 3.75 GB of memory. Observe the resources assigned to Hadoop YARN for Spark job scheduling and cluster resource management.
The new cluster, consisting of a single node and no worker nodes, should be ready for use in a few minutes or less.
Note the Image version, 1.3.16-deb9
. According to Google, Dataproc uses image versions to bundle operating system, big data components, and Google Cloud Platform connectors into one package that is deployed on a cluster. This image, released in November 2018, is the latest available version at the time of this post. The image contains:
- Apache Spark 2.3.1
- Apache Hadoop 2.9.0
- Apache Pig 0.17.0
- Apache Hive 2.3.2
- Apache Tez 0.9.0
- Cloud Storage connector 1.9.9-hadoop2
- Scala 2.11.8
- Python 2.7
To avoid lots of troubleshooting, make sure your code is compatible with the image’s versions. It is important to note the image does not contain a version of Python 3. You will need to ensure your Python code is built to run with Python 2.7. Alternatively, use Dataproc’s --initialization-actions
flag along with bootstrap and setup shell scripts to install Python 3 on the cluster using pip
or conda
. Tips for installing Python 3 on Datapoc be found on Stack Overflow and elsewhere on the Internet.
As as an alternative to the Google Cloud Console, we are able to create the cluster using a REST command. Google provides the Google Cloud Console’s equivalent REST request, as shown in the example below.
Additionally, we have the option of using the gcloud
command line tool. This tool provides the primary command-line interface to Google Cloud Platform and is part of Google’s Cloud SDK, which also includes the aforementioned gsutil
. Here again, Google provides the Google Cloud Console’s equivalent gcloud
command. This is a great way to learn to use the command line.
Using the dataproc clusters create
command, we are able to create the same cluster as shown above from the command line, as follows:
export PROJECT=your_project_name export CLUSTER_1=your_single_node_cluster_name export REGION=us-east1 export ZONE=us-east1-b export MACHINE_TYPE_SMALL=n1-standard-1 gcloud dataproc clusters create $CLUSTER_1 \ --region $REGION \ --zone $ZONE \ --single-node \ --master-machine-type $MACHINE_TYPE_SMALL \ --master-boot-disk-size 500 \ --image-version 1.3-deb9 \ --project $PROJECT
There are a few useful commands to inspect your running Dataproc clusters. The dataproc clusters describe
command, in particular, provides detailed information about all aspects of the cluster’s configuration and current state.
gcloud dataproc clusters list --region $REGION gcloud dataproc clusters describe $CLUSTER_2 \ --region $REGION --format json
Standard Cluster
In addition to the single node cluster, we will create a second three-node Dataproc cluster. We will compare the speed of a single-node cluster to that of a true cluster with multiple worker nodes. Create a new Dataproc cluster using the Standard Cluster mode option. Again, make sure to create the cluster in the same region as the new Storage bucket.
The second cluster contains a single master node and two worker nodes. All three nodes use the n1-standard-4
machine type, with 4 vCPU and 15 GB of memory. Although still considered a minimally-sized cluster, this cluster represents a significant increase in compute power over the first single-node cluster, which had a total of 2 vCPU, 3.75 GB of memory, and no worker nodes on which to distribute processing. Between the two workers in the second cluster, we have 8 vCPU and 30 GB of memory for computation.
Again, we have the option of using the gcloud
command line tool to create the cluster:
export PROJECT=your_project_name export CLUSTER_2=your_three_node_cluster_name export REGION=us-east1 export ZONE=us-east1-b export NUM_WORKERS=2 export MACHINE_TYPE_LARGE=n1-standard-4 gcloud dataproc clusters create $CLUSTER_2 \ --region $REGION \ --zone $ZONE \ --master-machine-type $MACHINE_TYPE_LARGE \ --master-boot-disk-size 500 \ --num-workers $NUM_WORKERS \ --worker-machine-type $MACHINE_TYPE_LARGE \ --worker-boot-disk-size 500 \ --image-version 1.3-deb9 \ --project $PROJECT
Cluster Creation Speed: Cloud Dataproc versus Amazon EMS?
In a series of rather unscientific tests, I found the three-node Dataproc cluster took less than two minutes on average to be created. Compare that time to a similar three-node cluster built with Amazon’s EMR service using their general purpose m4.4xlarge Amazon EC2 instance type. In a similar series of tests, I found the EMR cluster took seven minutes on average to be created. The EMR cluster took 3.5 times longer to create than the comparable Dataproc cluster. Again, although not a totally accurate comparison, since both services offer different features, it gives you a sense of the speed of Dataproc as compared to Amazon EMR.
Staging Buckets
According to Google, when you create a cluster, Cloud Dataproc creates a Cloud Storage staging bucket in your project or reuses an existing Cloud Dataproc-created bucket from a previous cluster creation request. Staging buckets are used to stage miscellaneous configuration and control files that are needed by your cluster. Below, we see the staging buckets created for the two Dataproc clusters.
Project Files
Before uploading the jobs and running them on the Cloud Dataproc clusters, we need to understand what is included in the two GitHub projects. If you recall from the Kaggle section of the post, both projects are basically the same but, written in different languages, Java and Python. The jobs they contain all perform the same basic analysis on the dataset.
Java Project
The dataproc-java-demo
Java-based GitHub project contains three classes, each which are jobs to run by Spark. The InternationalLoansApp
Java class is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file (gist).
package org.example.dataproc; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.SaveMode; | |
import org.apache.spark.sql.SparkSession; | |
public class InternationalLoansApp { | |
public static void main(String[] args) { | |
InternationalLoansApp app = new InternationalLoansApp(); | |
app.start(); | |
} | |
private void start() { | |
SparkSession spark = SparkSession.builder() | |
.appName("dataproc-java-demo") | |
.master("local[*]") | |
.getOrCreate(); | |
spark.sparkContext().setLogLevel("INFO"); // INFO by default | |
// Loads CSV file from local directory | |
Dataset<Row> dfLoans = spark.read() | |
.format("csv") | |
.option("header", "true") | |
.option("inferSchema", true) | |
.load("data/ibrd-statement-of-loans-latest-available-snapshot.csv"); | |
// Basic stats | |
System.out.printf("Rows of data:%d%n", dfLoans.count()); | |
System.out.println("Inferred Schema:"); | |
dfLoans.printSchema(); | |
// Creates temporary view using DataFrame | |
dfLoans.withColumnRenamed("Country", "country") | |
.withColumnRenamed("Country Code", "country_code") | |
.withColumnRenamed("Disbursed Amount", "disbursed") | |
.withColumnRenamed("Borrower's Obligation", "obligation") | |
.withColumnRenamed("Interest Rate", "interest_rate") | |
.createOrReplaceTempView("loans"); | |
// Performs basic analysis of dataset | |
Dataset<Row> dfDisbursement = spark.sql( | |
"SELECT country, country_code, " | |
+ "format_number(total_disbursement, 0) AS total_disbursement, " | |
+ "format_number(ABS(total_obligation), 0) AS total_obligation, " | |
+ "format_number(avg_interest_rate, 2) AS avg_interest_rate " | |
+ "FROM ( " | |
+ "SELECT country, country_code, " | |
+ "SUM(disbursed) AS total_disbursement, " | |
+ "SUM(obligation) AS total_obligation, " | |
+ "AVG(interest_rate) AS avg_interest_rate " | |
+ "FROM loans " | |
+ "GROUP BY country, country_code " | |
+ "ORDER BY total_disbursement DESC " | |
+ "LIMIT 25)" | |
); | |
dfDisbursement.show(25, 100); | |
// Calculates and displays the grand total disbursed amount | |
Dataset<Row> dfGrandTotalDisbursement = spark.sql( | |
"SELECT format_number(SUM(disbursed),0) AS grand_total_disbursement FROM loans" | |
); | |
dfGrandTotalDisbursement.show(); | |
// Calculates and displays the grand total remaining obligation amount | |
Dataset<Row> dfGrandTotalObligation = spark.sql( | |
"SELECT format_number(SUM(obligation),0) AS grand_total_obligation FROM loans" | |
); | |
dfGrandTotalObligation.show(); | |
// Saves results to a locally CSV file | |
dfDisbursement.repartition(1) | |
.write() | |
.mode(SaveMode.Overwrite) | |
.format("csv") | |
.option("header", "true") | |
.save("data/ibrd-summary-small-java"); | |
System.out.println("Results successfully written to CSV file"); | |
} | |
} |
On line 20, the Spark Session’s Master URL, .master("local[*]")
, directs Spark to run locally with as many worker threads as logical cores on the machine. There are several options for setting the Master URL, detailed here.
On line 30, the path to the data file, and on line 84, the output path for the data file, is a local relative file path.
On lines 38–42, we do a bit of clean up on the column names, for only those columns we are interested in for the analysis. Be warned, the column names of the IBRD data are less than ideal for SQL-based analysis, containing mixed-cased characters, word spaces, and brackets.
On line 79, we call Spark DataFrame’s repartition method, dfDisbursement.repartition(1)
. The repartition method allows us to recombine the results of our analysis and output a single CSV file to the bucket. Ordinarily, Spark splits the data into partitions and executes computations on the partitions in parallel. Each partition’s data is written to separate CSV files when a DataFrame is written back to the bucket.
Using coalesce(1)
or repartition(1)
to recombine the resulting 25-Row DataFrame on a single node is okay for the sake of this demonstration, but is not practical for recombining partitions from larger DataFrames. There are more efficient and less costly ways to manage the results of computations, depending on the intended use of the resulting data.
The InternationalLoansAppDataprocSmall
class is intended to be run on the Dataproc clusters, analyzing the same smaller CSV data file. The InternationalLoansAppDataprocLarge
class is also intended to be run on the Dataproc clusters, however, it analyzes the larger 750K rows of data in the IRBD historic CSV file (gist).
package org.example.dataproc; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.SaveMode; | |
import org.apache.spark.sql.SparkSession; | |
public class InternationalLoansAppDataprocLarge { | |
public static void main(String[] args) { | |
InternationalLoansAppDataprocLarge app = new InternationalLoansAppDataprocLarge(); | |
app.start(); | |
} | |
private void start() { | |
SparkSession spark = SparkSession.builder() | |
.appName("dataproc-java-demo") | |
.master("yarn") | |
.getOrCreate(); | |
spark.sparkContext().setLogLevel("WARN"); // INFO by default | |
// Loads CSV file from Google Storage Bucket | |
Dataset<Row> dfLoans = spark.read() | |
.format("csv") | |
.option("header", "true") | |
.option("inferSchema", true) | |
.load("gs://dataproc-demo-bucket/ibrd-statement-of-loans-historical-data.csv"); | |
// Creates temporary view using DataFrame | |
dfLoans.withColumnRenamed("Country", "country") | |
.withColumnRenamed("Country Code", "country_code") | |
.withColumnRenamed("Disbursed Amount", "disbursed") | |
.withColumnRenamed("Borrower's Obligation", "obligation") | |
.withColumnRenamed("Interest Rate", "interest_rate") | |
.createOrReplaceTempView("loans"); | |
// Performs basic analysis of dataset | |
Dataset<Row> dfDisbursement = spark.sql( | |
"SELECT country, country_code, " | |
+ "format_number(total_disbursement, 0) AS total_disbursement, " | |
+ "format_number(ABS(total_obligation), 0) AS total_obligation, " | |
+ "format_number(avg_interest_rate, 2) AS avg_interest_rate " | |
+ "FROM ( " | |
+ "SELECT country, country_code, " | |
+ "SUM(disbursed) AS total_disbursement, " | |
+ "SUM(obligation) AS total_obligation, " | |
+ "AVG(interest_rate) AS avg_interest_rate " | |
+ "FROM loans " | |
+ "GROUP BY country, country_code " | |
+ "ORDER BY total_disbursement DESC " | |
+ "LIMIT 25)" | |
); | |
// Saves results to single CSV file in Google Storage Bucket | |
dfDisbursement.repartition(1) | |
.write() | |
.mode(SaveMode.Overwrite) | |
.format("csv") | |
.option("header", "true") | |
.save("gs://dataproc-demo-bucket/ibrd-summary-large-java"); | |
System.out.println("Results successfully written to CSV file"); | |
} | |
} |
On line 20, note the Spark Session’s Master URL, .master(yarn)
, directs Spark to connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode
when submitting the job. The cluster location will be found based on the HADOOP_CONF_DIR
or YARN_CONF_DIR
variable. Recall, the Dataproc cluster runs Spark on YARN.
Also, note on line 30, the path to the data file, and on line 63, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv"
). Cloud Dataproc clusters automatically install the Cloud Storage connector. According to Google, there are a number of benefits to choosing Cloud Storage over traditional HDFS including data persistence, reliability, and performance.
These are the only two differences between the local version of the Spark job and the version of the Spark job intended for Dataproc. To build the project’s JAR file, which you will later upload to the Cloud Storage bucket, compile the Java project using the gradle build
command from the root of the project. For convenience, the JAR file is also included in the GitHub repository.
Python Project
The dataproc-python-demo
Python-based GitHub project contains two Python scripts to be run using PySpark for this post. The international_loans_local.py
Python script is only intended to be run locally with the smaller 8.7K rows of data in the snapshot CSV file. It does a few different analysis with the smaller dataset. (gist).
#!/usr/bin/python | |
# Author: Gary A. Stafford | |
# License: MIT | |
from pyspark.sql import SparkSession | |
def main(): | |
spark = SparkSession \ | |
.builder \ | |
.master("local[*]") \ | |
.appName('dataproc-python-demo') \ | |
.getOrCreate() | |
# Defaults to INFO | |
sc = spark.sparkContext | |
sc.setLogLevel("INFO") | |
# Loads CSV file from local directory | |
df_loans = spark \ | |
.read \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.option("inferSchema", "true") \ | |
.load("data/ibrd-statement-of-loans-latest-available-snapshot.csv") | |
# Prints basic stats | |
print "Rows of data:" + str(df_loans.count()) | |
print "Inferred Schema:" | |
df_loans.printSchema() | |
# Creates temporary view using DataFrame | |
df_loans.withColumnRenamed("Country", "country") \ | |
.withColumnRenamed("Country Code", "country_code") \ | |
.withColumnRenamed("Disbursed Amount", "disbursed") \ | |
.withColumnRenamed("Borrower's Obligation", "obligation") \ | |
.withColumnRenamed("Interest Rate", "interest_rate") \ | |
.createOrReplaceTempView("loans") | |
# Performs basic analysis of dataset | |
df_disbursement = spark.sql(""" | |
SELECT country, country_code, | |
format_number(total_disbursement, 0) AS total_disbursement, | |
format_number(ABS(total_obligation), 0) AS total_obligation, | |
format_number(avg_interest_rate, 2) AS avg_interest_rate | |
FROM ( | |
SELECT country, country_code, | |
SUM(disbursed) AS total_disbursement, | |
SUM(obligation) AS total_obligation, | |
AVG(interest_rate) AS avg_interest_rate | |
FROM loans | |
GROUP BY country, country_code | |
ORDER BY total_disbursement DESC | |
LIMIT 25) | |
""").cache() | |
df_disbursement.show(25, True) | |
# Saves results to a locally CSV file | |
df_disbursement.repartition(1) \ | |
.write \ | |
.mode("overwrite") \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.save("data/ibrd-summary-small-python") | |
print "Results successfully written to CSV file" | |
spark.stop() | |
if __name__ == "__main__": | |
main() |
Identical to the corresponding Java class, note on line 12, the Spark Session’s Master URL, .master("local[*]")
, directs Spark to run locally with as many worker threads as logical cores on the machine.
Also identical to the corresponding Java class, note on line 26, the path to the data file, and on line 66, the output path for the resulting data file, is a local relative file path.
The international_loans_dataproc-large.py
Python script is intended to be run on the Dataproc clusters, analyzing the larger 750K rows of data in the IRBD historic CSV file (gist).
#!/usr/bin/python | |
# Author: Gary A. Stafford | |
# License: MIT | |
from pyspark.sql import SparkSession | |
def main(): | |
spark = SparkSession \ | |
.builder \ | |
.master("yarn") \ | |
.appName('dataproc-python-demo') \ | |
.getOrCreate() | |
# Defaults to INFO | |
sc = spark.sparkContext | |
sc.setLogLevel("WARN") | |
# Loads CSV file from Google Storage Bucket | |
df_loans = spark \ | |
.read \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.option("inferSchema", "true") \ | |
.load("gs://dataproc-demo-bucket/ibrd-statement-of-loans-historical-data.csv") | |
# Creates temporary view using DataFrame | |
df_loans.withColumnRenamed("Country", "country") \ | |
.withColumnRenamed("Country Code", "country_code") \ | |
.withColumnRenamed("Disbursed Amount", "disbursed") \ | |
.withColumnRenamed("Borrower's Obligation", "obligation") \ | |
.withColumnRenamed("Interest Rate", "interest_rate") \ | |
.createOrReplaceTempView("loans") | |
# Performs basic analysis of dataset | |
df_disbursement = spark.sql(""" | |
SELECT country, country_code, | |
format_number(total_disbursement, 0) AS total_disbursement, | |
format_number(ABS(total_obligation), 0) AS total_obligation, | |
format_number(avg_interest_rate, 2) AS avg_interest_rate | |
FROM ( | |
SELECT country, country_code, | |
SUM(disbursed) AS total_disbursement, | |
SUM(obligation) AS total_obligation, | |
AVG(interest_rate) AS avg_interest_rate | |
FROM loans | |
GROUP BY country, country_code | |
ORDER BY total_disbursement DESC | |
LIMIT 25) | |
""").cache() | |
# Saves results to single CSV file in Google Storage Bucket | |
df_disbursement.repartition(1) \ | |
.write \ | |
.mode("overwrite") \ | |
.format("csv") \ | |
.option("header", "true") \ | |
.save("gs://dataproc-demo-bucket/ibrd-summary-large-python") | |
spark.stop() | |
if __name__ == "__main__": | |
main() |
On line 12, note the Spark Session’s Master URL, .master(yarn)
, directs Spark to connect to a YARN cluster.
Again, note on line 26, the path to the data file, and on line 59, the output path for the data file, is to the Cloud Storage bucket we created earlier (.load("gs://your-bucket-name/your-data-file.csv"
).
These are the only two differences between the local version of the PySpark job and the version of the PySpark job intended for Dataproc. With Python, there is no pre-compilation necessary. We will upload the second script, directly.
Uploading Job Resources to Cloud Storage
In total, we need to upload four items to the new Cloud Storage bucket we created previously. The items include the two Kaggle IBRD CSV files, the compiled Java JAR file from the dataproc-java-demo
project, and the Python script from the dataproc-python-demo
project. Using the Google Cloud Console, upload the four files to the new Google Storage bucket, as shown below. Make sure you unzip the two Kaggle IRBD CSV data files before uploading.
Like before, we also have the option of using gsutil
with the copy (cp
) command to upload the four files. The cp
command accepts wildcards, as shown below.
export PROJECT=your_project_name export BUCKET_NAME=gs://your_bucket_name gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME gsutil cp international_loans_dataproc_large.py $BUCKET_NAME
If our Java or Python jobs were larger, or more complex and required multiple files to run, we could also choose to upload ZIP or other common compression formatted archives using the --archives
flag.
Running Jobs on Dataproc
The easiest way to run a job on the Dataproc cluster is by submitting a job through the Dataproc Jobs UI, part of the Google Cloud Console.
Dataproc has the capability of running multiple types of jobs, including:
- Hadoop
- Spark
- SparkR
- PySpark
- Hive
- SparkSql
- Pig
We will be running both Spark and PySpark jobs as part of this demonstration.
Spark Jobs
To run a Spark job using the JAR file, select Job type Spark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters in your chosen region. Run both jobs at least twice, once on both clusters, for a total of four jobs.
Lastly, you will need to input the main class and the path to the JAR file. The JAR location will be:
gs://your_bucket_name/dataprocJavaDemo-1.0-SNAPSHOT.jar
The main class for the smaller dataset will be:
org.example.dataproc.InternationalLoansAppDataprocSmall
The main class for the larger dataset will be:
org.example.dataproc.InternationalLoansAppDataprocLarge
During or after job execution, you may view details in the Output tab of the Dataproc Jobs console.
Like every other step in this demonstration, we can also use the gcloud
command line tool, instead of the web console, to submit our Spark jobs to each cluster. Here, I am submitting the larger dataset Spark job to the three-node cluster.
export CLUSTER_2=your_three_node_cluster_name export REGION=us-east1 export BUCKET_NAME=gs://your_bucket_name gcloud dataproc jobs submit spark \ --region $REGION \ --cluster $CLUSTER_2 \ --class org.example.dataproc.InternationalLoansAppDataprocLarge \ --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar \ --async
PySpark Jobs
To run a Spark job using the Python script, select Job type PySpark. The Region will match your Dataproc cluster and bucket locations, us-east-1 in my case. You should have a choice of both clusters. Run the job at least twice, once on both clusters.
Lastly, you will need to input the main Python file path. There is only one Dataproc Python script, which analyzes the larger dataset. The script location will be:
gs://your_bucket_name/international_loans_dataproc_large.py
Like every other step in this demonstration, we can also use the gcloud
command line tool instead of the web console to submit our PySpark jobs to each cluster. Below, I am submitting the PySpark job to the three-node cluster.
export CLUSTER_2=your_three_node_cluster_name export REGION=us-east1 export BUCKET_NAME=gs://your_bucket_name gcloud dataproc jobs submit pyspark \ $BUCKET_NAME/international_loans_dataproc_large.py \ --region $REGION \ --cluster $CLUSTER_2 \ --async
Including the optional --async
flag with any of the dataproc jobs submit command, the job will be sent to the Dataproc cluster and immediately release the terminal back to the user. If you do not to use the --async
flag, the terminal will be unavailable until the job is finished.
However, without the flag, we will get the standard output (stdout) and standard error (stderr) from Dataproc. The output includes some useful information, including different stages of the job execution lifecycle and execution times.
File Output
During development and testing, outputting results to the console is useful. However, in Production, the output from jobs is most often written to Apache Parquet, Apache Avro, CSV, JSON, or XML format files, persisted Apache Hive, SQL, or NoSQL database, or streamed to another system for post-processing, using technologies such as Apache Kafka.
Once both the Java and Python jobs have run successfully on the Dataproc cluster, you should observe the results have been saved back to the Storage bucket. Each script saves its results to a single CSV file in separate directories, as shown below.
The final dataset, written to the CSV file, contains the results of the analysis results (gist).
country | country_code | total_disbursement | total_obligation | avg_interest_rate | |
---|---|---|---|---|---|
Brazil | BR | 4,302,455,404,056 | 1,253,228,385,979 | 4.08 | |
Mexico | MX | 4,219,081,270,927 | 1,297,489,060,082 | 4.94 | |
Indonesia | ID | 3,270,346,860,046 | 1,162,592,633,450 | 4.67 | |
China | CN | 3,065,658,803,841 | 1,178,177,730,111 | 3.01 | |
India | IN | 3,052,082,309,937 | 1,101,910,589,590 | 3.81 | |
Turkey | TR | 2,797,634,959,120 | 1,111,562,740,520 | 4.85 | |
Argentina | AR | 2,241,512,056,786 | 524,815,800,115 | 3.38 | |
Colombia | CO | 1,701,021,819,054 | 758,168,606,621 | 4.48 | |
Korea, Republic of | KR | 1,349,701,565,303 | 9,609,765,857 | 6.81 | |
Philippines | PH | 1,166,976,603,303 | 365,840,981,818 | 5.38 | |
Poland | PL | 1,157,181,357,135 | 671,373,801,971 | 2.89 | |
Morocco | MA | 1,045,267,705,436 | 365,073,667,924 | 4.39 | |
Russian Federation | RU | 915,318,843,306 | 98,207,276,721 | 1.70 | |
Romania | RO | 902,736,599,033 | 368,321,253,522 | 4.36 | |
Egypt, Arab Republic of | EG | 736,945,143,568 | 431,086,774,867 | 4.43 | |
Thailand | TH | 714,203,701,665 | 70,485,749,749 | 5.93 | |
Peru | PE | 655,818,700,812 | 191,464,347,544 | 3.83 | |
Ukraine | UA | 644,031,278,339 | 394,273,593,116 | 1.47 | |
Pakistan | PK | 628,853,154,827 | 121,673,028,048 | 3.82 | |
Tunisia | TN | 625,648,381,742 | 202,230,595,005 | 4.56 | |
Nigeria | NG | 484,529,279,526 | 2,351,912,541 | 5.86 | |
Kazakhstan | KZ | 453,938,975,114 | 292,590,991,287 | 2.81 | |
Algeria | DZ | 390,644,588,386 | 251,720,881 | 5.30 | |
Chile | CL | 337,041,916,083 | 11,479,003,904 | 4.86 | |
Serbia | YF | 331,975,671,975 | 173,516,517,964 | 5.30 |
Cleaning Up
When you are finished, make sure to delete your running clusters. This may be done through the Google Cloud Console. Deletion of the three-node cluster took, on average, slightly more than one minute.
As usual, we can also use the gcloud
command line tool instead of the web console to delete the Dataproc clusters.
export CLUSTER_1=your_single_node_cluster_name export CLUSTER_2=your_three_node_cluster_name export REGION=us-east1 yes | gcloud dataproc clusters delete $CLUSTER_1 --region $REGION yes | gcloud dataproc clusters delete $CLUSTER_2 --region $REGION
Results
Some observations, based on approximately 75 successful jobs. First, both the Python job and the Java jobs ran in nearly the same amount of time on the single-node cluster and then on the three-node cluster. This is beneficial since, although, a lot of big data analysis is performed with Python, Java is still the lingua franca of many large enterprises.
Consecutive Execution
Below are the average times for running the larger dataset on both clusters, in Java, and in Python. The jobs were all run consecutively as opposed to concurrently. The best time was 59 seconds on the three-node cluster compared to the best time of 150 seconds on the single-node cluster, a difference of 256%. Given the differences in the two clusters, this large variation is expected. The average difference between the two clusters for running the large dataset was 254%.
Concurrent Execution
It is important to understand the impact of concurrently running multiple jobs on the same Dataproc cluster. To demonstrate this, both the Java and Python jobs were also run concurrently. In one such test, ten copies of the Python job were run concurrently on the three-node cluster.
Observe that the execution times of the concurrent jobs increase in near-linear time. The first job completes in roughly the same time as the consecutively executed jobs, shown above, but each proceeding job’s execution time increases linearly.
According to Apache, when running on a cluster, each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application. Each application is given a maximum amount of resources it can use and holds onto them for its whole duration. Note no tuning was done to the Dataproc clusters to optimize for concurrent execution.
Really Big Data?
Although there is no exact definition of ‘big data’, 750K rows of data at 265 MB is probably not generally considered big data. Likewise, the three-node cluster used in this demonstration is still pretty diminutive. Lastly, the SQL query was less than complex. To really test the abilities of Dataproc would require a multi-gigabyte or multi-terabyte-sized dataset, divided amongst multiple files, computed on a much beefier cluster with more workers nodes and more computer resources.
Monitoring and Instrumentation
In addition to viewing the results of running and completed jobs, there are a number of additional monitoring resources, including the Hadoop Yarn Resource Manager, HDFS NameNode, and Spark History Server Web UIs, and Google Stackdriver. I will only briefly introduce these resources, and not examine any of these interfaces in detail. You’re welcome to investigate the resources for your own clusters. Apache lists other Spark monitoring and instrumentation resources in their documentation.
To access the Hadoop Yarn Resource Manager, HDFS NameNode, and Spark History Server Web UIs, you must create an SSH tunnel and run Chrome through a proxy. Google Dataproc provides both commands and a link to documentation in the Dataproc Cluster tab, to connect.
Hadoop Yarn Resource Manager Web UI
Once you are connected to the Dataproc cluster, via the SSH tunnel and proxy, the Hadoop Yarn Resource Manager Web UI is accessed on port 8088. The UI allows you to view all aspects of the YARN cluster and the distributed applications running on the YARN system.
HDFS NameNode Web UI
Once you are connected to the Dataproc cluster, via the SSH tunnel and proxy, the HDFS NameNode Web UI may is accessed on port 9870. According to the Hadoop Wiki, the NameNode is the centerpiece of an HDFS file system. It keeps the directory tree of all files in the file system, and tracks were across the cluster the file data is kept. It does not store the data of these files itself.
Spark History Server Web UI
We can view the details of all completed jobs using the Spark History Server Web UI. Once you are connected to the cluster, via the SSH tunnel and proxy, the Spark History Server Web UI is accessed on port 18080. Of all the methods of reviewing aspects of a completed Spark job, the History Server provides the most detailed.
Using the History Server UI, we can drill into fine-grained details of each job, including the event timeline.
Also, using the History Server UI, we can see a visualization of the Spark job’s DAG (Directed Acyclic Graph). DataBricks published an excellent post on learning how to interpret the information visualized in the Spark UI.
Not only can view the DAG and drill into each Stage of the DAG, from the UI.
Stackdriver
We can also enable Google Stackdriver for monitoring and management of services, containers, applications, and infrastructure. Stackdriver offers an impressive array of services, including debugging, error reporting, monitoring, alerting, tracing, logging, and dashboards, to mention only a few Stackdriver features.
There are dozens of metrics available, which collectively, reflect the health of the Dataproc clusters. Below we see the states of one such metric, the YARN virtual cores (vcores). A YARN vcore, introduced in Hadoop 2.4, is a usage share of a host CPU. The number of YARN virtual cores is equivalent to the number of worker nodes (2) times the number of vCPUs per node (4), for a total of eight YARN virtual cores. Below, we see that at one point in time, 5 of the 8 vcores have been allocated, with 2 more available.
Next, we see the states of the YARN memory size. YARN memory size is calculated as the number of worker nodes (2) times the amount of memory on each node (15 GB) times the fraction given to YARN (0.8), for a total of 24 GB (2 x 15 GB x 0.8). Below, we see that at one point in time, 20 GB of RAM is allocated with 4 GB available. At that instant in time, the workload does not appear to be exhausting the cluster’s memory.
Notifications
Since no one actually watches dashboards all day, waiting for something to fail, how do know when we have an issue with Dataproc? Stackdrive offers integrations with most popular notification channels, including email, SMS, Slack, PagerDuty, HipChat, and Webhooks. With Stackdriver, we define a condition which describes when a service is considered unhealthy. When triggered, Stackdriver sends a notification to one or more channels.
Below is a preview of two alert notifications in Slack. I enabled Slack as a notification channel and created an alert which is triggered each time a Dataproc job fails. Whenever a job fails, such as the two examples below, I receive a Slack notification through the Slack Channel defined in Stackdriver.
Slack notifications contain a link, which routes you back to Stackdriver, to an incident which was opened on your behalf, due to the job failure.
For convenience, the incident also includes a pre-filtered link directly to the log entries at the time of the policy violation. Stackdriver logging offers advanced filtering capabilities to quickly find log entries, as shown below.
With Stackdriver, you get monitoring, logging, alerting, notification, and incident management as a service, with minimal cost and upfront configuration. Think about how much time and effort it takes the average enterprise to achieve this level of infrastructure observability on their own, most never do.
Conclusion
In this post, we have seen the ease-of-use, extensive feature-set, out-of-the-box integration ability with other cloud services, low cost, and speed of Google Cloud Dataproc, to run big data analytics workloads. Couple this with the ability of Stackdriver to provide monitoring, logging, alerting, notification, and incident management for Dataproc with minimal up-front configuration. In my opinion, based on these features, Google Cloud Dataproc leads other cloud competitors for fully-managed Spark and Hadoop Cluster management.
In future posts, we will examine the use of Cloud Dataproc Workflow Templates for process automation, the integration capabilities of Dataproc with services such as BigQuery, Bigtable, Cloud Dataflow, and Google Cloud Pub/Sub, and finally, DevOps for Big Data with Dataproc and tools like Spinnaker and Jenkins on GKE.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers, their clients, nor Apache or Google.
Integrating Search Capabilities with Actions for Google Assistant, using GKE and Elasticsearch: Part 2
Posted by Gary A. Stafford in Cloud, GCP, Java Development, JavaScript, Serverless, Software Development on September 24, 2018
Voice and text-based conversational interfaces, such as chatbots, have recently seen tremendous growth in popularity. Much of this growth can be attributed to leading Cloud providers, such as Google, Amazon, and Microsoft, who now provide affordable, end-to-end development, machine learning-based training, and hosting platforms for conversational interfaces.
Cloud-based machine learning services greatly improve a conversational interface’s ability to interpret user intent with greater accuracy. However, the ability to return relevant responses to user inquiries, also requires interfaces have access to rich informational datastores, and the ability to quickly and efficiently query and analyze that data.
In this two-part post, we will enhance the capabilities of a voice and text-based conversational interface by integrating it with a search and analytics engine. By interfacing an Action for Google Assistant conversational interface with Elasticsearch, we will improve the Action’s ability to provide relevant results to the end-user. Instead of querying a traditional database for static responses to user intent, our Action will access a Near Real-time (NRT) Elasticsearch index of searchable documents. The Action will leverage Elasticsearch’s advanced search and analytics capabilities to optimize and shape user responses, based on their intent.
Action Preview
Here is a brief YouTube video preview of the final Action for Google Assistant, integrated with Elasticsearch, running on an Apple iPhone.
Architecture
If you recall from part one of this post, the high-level architecture of our search engine-enhanced Action for Google Assistant resembles the following. Most of the components are running on Google Cloud.
Source Code
All open-sourced code for this post can be found on GitHub in two repositories, one for the Spring Boot Service and one for the Action for Google Assistant. Code samples in this post are displayed as GitHub Gists, which may not display correctly on some mobile and social media browsers. Links to gists are also provided.
Development Process
In part two of this post, we will tie everything together by creating and integrating our Action for Google Assistant:
- Create the new Actions for Google Assistant project using the Actions on Google console;
- Develop the Action’s Intents and Entities using the Dialogflow console;
- Develop, deploy, and test the Cloud Function to GCP;
Let’s explore each step in more detail.
New ‘Actions on Google’ Project
With Elasticsearch running and the Spring Boot Service deployed to our GKE cluster, we can start building our Actions for Google Assistant. Using the Actions on Google web console, we first create a new Actions project.
The Directory Information tab is where we define metadata about the project. This information determines how it will look in the Actions directory and is required to publish your project. The Actions directory is where users discover published Actions on the web and mobile devices.
The Directory Information tab also includes sample invocations, which may be used to invoke our Actions.
Actions and Intents
Our project will contain a series of related Actions. According to Google, an Action is ‘an interaction you build for the Assistant that supports a specific intent and has a corresponding fulfillment that processes the intent.’ To build our Actions, we first want to create our Intents. To do so, we will want to switch from the Actions on Google console to the Dialogflow console. Actions on Google provides a link for switching to Dialogflow in the Actions tab.
We will build our Action’s Intents in Dialogflow. The term Intent, used by Dialogflow, is standard terminology across other voice-assistant platforms, such as Amazon’s Alexa and Microsoft’s Azure Bot Service and LUIS. In Dialogflow, will be building Intents — the Find Multiple Posts Intent, Find Post Intent, Find By ID Intent, and so forth.
Below, we see the Find Post Intent. The Find Post Intent is responsible for handling our user’s requests for a single post about a topic, for example, ‘Find a post about Docker.’ The Intent shown below contains a fair number, but indeed not an exhaustive list, of training phrases. These represent possible ways a user might express intent when invoking the Action.
Below, we see the Find Multiple Posts Intent. The Find Multiple Posts Intent is responsible for handling our user’s requests for a list of posts about a topic, for example, ‘I’m interested in Docker.’ Similar to the Find Post Intent above, the Find Multiple Posts Intent contains a list of training phrases.
Dialog Model Training
According to Google, the greater the number of natural language examples in the Training Phrases section of Intents, the better the classification accuracy. Every time a user interacts with our Action, the user’s utterances are logged. Using the Training tab in the Dialogflow console, we can train our model by reviewing and approving or correcting how the Action handled the user’s utterances.
Below we see the user’s utterances, part of an interaction with the Action. We have the option to review and approve the Intent that was called to handle the utterance, re-assign it, or delete it. This helps improve our accuracy of our dialog model.
Dialogflow Entities
Each of the highlighted words in the training phrases maps to the facts parameter, which maps to a collection of @topic Entities. Entities represent a list of intents the Action is trained to understand. According to Google, there are three types of entities: ‘system’ (defined by Dialogflow), ‘developer’ (defined by a developer), and ‘user’ (built for each individual end-user in every request) objects. We will be creating ‘developer’ type entities for our Action’s Intents.
Automated Expansion
We do not have to define all possible topics a user might search for, as an entity. By enabling the Allow Automated Expansion option, an Agent will recognize values that have not been explicitly listed in the entity list. Google describes Agents as NLU (Natural Language Understanding) modules.
Entity Synonyms
An entity may contain synonyms. Multiple synonyms are mapped to a single reference value. The reference value is the value passed to the Cloud Function by the Action. For example, take the reference value of ‘GCP.’ The user might ask Google about ‘GCP’. However, the user might also substitute the words ‘Google Cloud’ or ‘Google Cloud Platform.’ Using synonyms, if the user utters any of these three synonymous words or phrase in their intent, the reference value, ‘GCP’, is passed in the request.
But, what if the post contains the phrase, ‘Google Cloud Platform’ more frequently than, or instead of, ‘GCP’? If the acronym, ‘GCP’, is defined as the entity reference value, then it is the value passed to the function, even if you ask for ‘Google Cloud Platform’. In the use case of searching blog posts by topic, entity synonyms are not an effective search strategy.
Elasticsearch Synonyms
A better way to solve for synonyms is by using the synonyms feature of Elasticsearch. Take, for example, the topic of ‘Istio’, Istio is also considered a Service Mesh. If I ask for posts about ‘Service Mesh’, I would like to get back posts that contain the phrase ‘Service Mesh’, but also the word ‘Istio’. To accomplish this, you would define an association between ‘Istio’ and ‘Service Mesh’, as part of the Elasticsearch WordPress posts index.
Searches for ‘Istio’ against that index would return results that contain ‘Istio’ and/or contain ‘Service Mesh’; the reverse is also true. Having created and applied a custom synonyms filter to the index, we see how Elasticsearch responds to an analysis of the natural language style phrase, ‘What is a Service Mesh?’. As shown by the tokens output in Kibana’s Dev Tools Console, Elasticsearch understands that ‘service mesh’ is synonymous with ‘istio’.
If we query the same five fields as our Action, for the topic of ‘service mesh’, we get four hits for posts (indexed documents) that contain ‘service mesh’ and/or ‘istio’.
Actions on Google Integration
Another configuration item in Dialogflow that needs to be completed is the Dialogflow’s Actions on Google integration. This will integrate our Action with Google Assistant. Google currently provides more than fifteen different integrations, including Google Assistant, Slack, Facebook Messanger, Twitter, and Twilio, as shown below.
To configure the Google Assistant integration, choose the Welcome Intent as our Action’s Explicit Invocation intent. Then we designate our other Intents as Implicit Invocation intents. According to Google, this Google Assistant Integration allows our Action to reach users on every device where the Google Assistant is available.
Action Fulfillment
When a user’s intent is received, it is fulfilled by the Action. In the Dialogflow Fulfillment console, we see the Action has two fulfillment options, a Webhook or an inline-editable Cloud Function, edited inline. A Webhook allows us to pass information from a matched intent into a web service and get a result back from the service. Our Action’s Webhook will call our Cloud Function on GCP, using the Cloud Function’s URL endpoint (we’ll get this URL in the next section).
Google Cloud Functions
Our Cloud Function, called by our Action, is written in Node.js. Our function, index.js, is divided into four sections, which are: constants and environment variables, intent handlers, helper functions, and the function’s entry point. The helper functions are part of the Helper module, contained in the helper.js file.
Constants and Environment Variables
The section, in both index.js
and helper.js
, defines the global constants and environment variables used within the function. Values that reference environment variables, such as SEARCH_API_HOSTNAME
are defined in the .env.yaml
file. All environment variables in the .env.yaml
file will be set during the Cloud Function’s deployment, described later in this post. Environment variables were recently released, and are still considered beta functionality (gist).
// author: Gary A. Stafford | |
// site: https://programmaticponderings.com | |
// license: MIT License | |
'use strict'; | |
/* CONSTANTS AND GLOBAL VARIABLES */ | |
const Helper = require('./helper'); | |
let helper = new Helper(); | |
const { | |
dialogflow, | |
Button, | |
Suggestions, | |
BasicCard, | |
SimpleResponse, | |
List | |
} = require('actions-on-google'); | |
const functions = require('firebase-functions'); | |
const app = dialogflow({debug: true}); | |
app.middleware(conv => { | |
conv.hasScreen = | |
conv.surface.capabilities.has('actions.capability.SCREEN_OUTPUT'); | |
conv.hasAudioPlayback = | |
conv.surface.capabilities.has('actions.capability.AUDIO_OUTPUT'); | |
}); | |
const SUGGESTION_1 = 'tell me about Docker'; | |
const SUGGESTION_2 = 'help'; | |
const SUGGESTION_3 = 'cancel'; |
The npm module dependencies declared in this section are defined in the dependencies section of the package.json
file. Function dependencies include Actions on Google, Firebase Functions, Winston, and Request (gist).
{ | |
"name": "functionBlogSearchAction", | |
"description": "Programmatic Ponderings Search Action for Google Assistant", | |
"version": "1.0.0", | |
"private": true, | |
"license": "MIT License", | |
"author": "Gary A. Stafford", | |
"engines": { | |
"node": ">=8" | |
}, | |
"scripts": { | |
"deploy": "sh ./deploy-cloud-function.sh" | |
}, | |
"dependencies": { | |
"@google-cloud/logging-winston": "^0.9.0", | |
"actions-on-google": "^2.2.0", | |
"dialogflow": "^0.6.0", | |
"dialogflow-fulfillment": "^0.5.0", | |
"firebase-admin": "^6.0.0", | |
"firebase-functions": "^2.0.2", | |
"request": "^2.88.0", | |
"request-promise-native": "^1.0.5", | |
"winston": "2.4.4" | |
} | |
} |
Intent Handlers
The intent handlers in this section correspond to the intents in the Dialogflow console. Each handler responds with a SimpleResponse, BasicCard, and Suggestion Chip response types, or Simple Response, List, and Suggestion Chip response types. These response types were covered in part one of this post. (gist).
/* INTENT HANDLERS */ | |
app.intent('Welcome Intent', conv => { | |
const WELCOME_TEXT_SHORT = 'What topic are you interested in reading about?'; | |
const WELCOME_TEXT_LONG = `You can say things like: \n` + | |
` _'Find a post about GCP'_ \n` + | |
` _'I'd like to read about Kubernetes'_ \n` + | |
` _'I'm interested in Docker'_`; | |
conv.ask(new SimpleResponse({ | |
speech: WELCOME_TEXT_SHORT, | |
text: WELCOME_TEXT_SHORT, | |
})); | |
if (conv.hasScreen) { | |
conv.ask(new BasicCard({ | |
text: WELCOME_TEXT_LONG, | |
title: 'Programmatic Ponderings Search', | |
})); | |
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3])); | |
} | |
}); | |
app.intent('Fallback Intent', conv => { | |
const FACTS_LIST = "Kubernetes, Docker, Cloud, DevOps, AWS, Spring, Azure, Messaging, and GCP"; | |
const HELP_TEXT_SHORT = 'Need a little help?'; | |
const HELP_TEXT_LONG = `Some popular topics include: ${FACTS_LIST}.`; | |
conv.ask(new SimpleResponse({ | |
speech: HELP_TEXT_LONG, | |
text: HELP_TEXT_SHORT, | |
})); | |
if (conv.hasScreen) { | |
conv.ask(new BasicCard({ | |
text: HELP_TEXT_LONG, | |
title: 'Programmatic Ponderings Search Help', | |
})); | |
conv.ask(new Suggestions([SUGGESTION_1, SUGGESTION_2, SUGGESTION_3])); | |
} | |
}); | |
app.intent('Find Post Intent', async (conv, {topic}) => { | |
let postTopic = topic.toString(); | |
let posts = await helper.getPostsByTopic(postTopic, 1); | |
if (posts !== undefined && posts.length < 1) { | |
helper.topicNotFound(conv, postTopic); | |
return; |