In the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we explored Google Cloud Dataproc using the Google Cloud Console as well as the Google Cloud SDK and Cloud Dataproc API. We created clusters, then uploaded and ran Spark and PySpark jobs, then deleted clusters, each as discrete tasks. Although each task could be done via the Dataproc API and therefore automatable, they were independent tasks, without awareness of the previous task’s state.
In this brief follow-up post, we will examine the Cloud Dataproc WorkflowTemplates API to more efficiently and effectively automate Spark and Hadoop workloads. According to Google, the Cloud Dataproc WorkflowTemplates API provides a flexible and easy-to-use mechanism for managing and executing Dataproc workflows. A Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs. A Workflow is an operation that runs a Directed Acyclic Graph (DAG) of jobs on a cluster. Shown below, we see one of the Workflows that will be demonstrated in this post, displayed in Spark History Server Web UI.
Here we see a four-stage DAG of one of the three jobs in the workflow, displayed in Spark History Server Web UI.
Workflows are ideal for automating large batches of dynamic Spark and Hadoop jobs, and for long-running and unattended job execution, such as overnight.
Using the Python and Java projects from the previous post, we will first create workflow templates using the just the WorkflowTemplates API. We will create the template, set a managed cluster, add jobs to the template, and instantiate the workflow. Next, we will further optimize and simplify our workflow by using a YAML-based workflow template file. The YAML-based template file eliminates the need to make API calls to set the template’s cluster and add the jobs to the template. Finally, to further enhance the workflow and promote re-use of the template, we will incorporate parameterization. Parameters will allow us to pass parameters (key/value) pairs from the command line to workflow template, and on to the Python script as input arguments.
It is not necessary to use the Google Cloud Console for this post. All steps will be done using Google Cloud SDK shell commands. This means all steps may be automated using CI/CD DevOps tools, like Jenkins and Spinnaker on GKE.
All open-sourced code for this post can be found on GitHub within three repositories: dataproc-java-demo, dataproc-python-demo, and dataproc-workflow-templates. Source code samples are displayed as GitHub Gists, which may not display correctly on all mobile and social media browsers.
Always start by ensuring you have the latest Google Cloud SDK updates and are working within the correct Google Cloud project.
gcloud components update export PROJECT_ID=your-project-id gcloud config set project $PROJECT
Set the following variables based on your Google environment. The variables will be reused throughout the post for multiple commands.
export REGION=your-region export ZONE=your-zone export BUCKET_NAME=your-bucket
The post assumes you still have the Cloud Storage bucket we created in the previous post. In the bucket, you will need the two Kaggle IBRD CSV files, available on Kaggle, the compiled Java JAR file from the dataproc-java-demo project, and a new Python script, international_loans_dataproc.py, from the dataproc-python-demo project.
gsutil with the copy (
cp) command to upload the four files to your Storage bucket.
gsutil cp data/ibrd-statement-of-loans-*.csv $BUCKET_NAME gsutil cp build/libs/dataprocJavaDemo-1.0-SNAPSHOT.jar $BUCKET_NAME gsutil cp international_loans_dataproc.py $BUCKET_NAME
export TEMPLATE_ID=template-demo-1 gcloud dataproc workflow-templates create \ $TEMPLATE_ID --region $REGION
Adding a Cluster
Next, we need to set a cluster for the workflow to use, in order to run the jobs. Cloud Dataproc will create and use a Managed Cluster for your workflow or use an existing cluster. If the workflow uses a managed cluster, it creates the cluster, runs the jobs, and then deletes the cluster when the jobs are finished. This means, for many use cases, there is no need to maintain long-lived clusters, they become just an ephemeral part of the workflow.
We set a managed cluster for our Workflow using the
workflow-templates set-managed-cluster command. We will re-use the same cluster specifications we used in the previous post, the Standard, 1 master node and 2 worker nodes, cluster type.
gcloud dataproc workflow-templates set-managed-cluster \ $TEMPLATE_ID \ --region $REGION \ --zone $ZONE \ --cluster-name three-node-cluster \ --master-machine-type n1-standard-4 \ --master-boot-disk-size 500 \ --worker-machine-type n1-standard-4 \ --worker-boot-disk-size 500 \ --num-workers 2 \ --image-version 1.3-deb9
Alternatively, if we already had an existing cluster, we would use the
workflow-templates set-cluster-selector command, to associate that cluster with the workflow template.
gcloud dataproc workflow-templates set-cluster-selector \ $TEMPLATE_ID \ --region $REGION \ --cluster-labels goog-dataproc-cluster-uuid=$CLUSTER_UUID
To get the existing cluster’s UUID label value, you could use a command similar to the following.
CLUSTER_UUID=$(gcloud dataproc clusters describe $CLUSTER_2 \ --region $REGION \ | grep 'goog-dataproc-cluster-uuid:' \ | sed 's/.* //') echo $CLUSTER_UUID 1c27efd2-f296-466e-b14e-c4263d0d7e19
Next, we add the jobs we want to run to the template. Each job is considered a step in the template, each step requires a unique step id. We will add three jobs to the template, two Java-based Spark jobs from the previous post, and a new Python-based PySpark job.
First, we add the two Java-based Spark jobs, using the
workflow-templates add-job spark command. This command’s flags are nearly identical to the
dataproc jobs submit spark command, used in the previous post.
export STEP_ID=ibrd-small-spark gcloud dataproc workflow-templates add-job spark \ --region $REGION \ --step-id $STEP_ID \ --workflow-template $TEMPLATE_ID \ --class org.example.dataproc.InternationalLoansAppDataprocSmall \ --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar export STEP_ID=ibrd-large-spark gcloud dataproc workflow-templates add-job spark \ --region $REGION \ --step-id $STEP_ID \ --workflow-template $TEMPLATE_ID \ --class org.example.dataproc.InternationalLoansAppDataprocLarge \ --jars $BUCKET_NAME/dataprocJavaDemo-1.0-SNAPSHOT.jar
Next, we add the Python-based PySpark job, international_loans_dataproc.py, as the second job in the template. This Python script requires three input arguments, on lines 15–17, which are the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket where the results will be placed (gist).
We pass the arguments to the Python script as part of the PySpark job, using the
workflow-templates add-job pyspark command.
export STEP_ID=ibrd-large-pyspark gcloud dataproc workflow-templates add-job pyspark \ $BUCKET_NAME/international_loans_dataproc.py \ --step-id $STEP_ID \ --workflow-template $TEMPLATE_ID \ --region $REGION \ -- $BUCKET_NAME \ ibrd-statement-of-loans-historical-data.csv \ ibrd-summary-large-python
That’s it, we have created our first Cloud Dataproc Workflow Template using the Dataproc WorkflowTemplate API. To view our template we can use the following two commands. First, use the
workflow-templates list command to display a list of available templates. The
list command output displays the version of the workflow template and how many jobs are in the template.
gcloud dataproc workflow-templates list --region $REGION ID JOBS UPDATE_TIME VERSION template-demo-1 3 2018-12-15T16:32:06.508Z 5
Then, we use the
workflow-templates describe command to show the details of a specific template.
gcloud dataproc workflow-templates describe \ $TEMPLATE_ID --region $REGION
workflow-templates describe command, we should see output similar to the following (gist).
In the template description, notice the template’s id, the managed cluster in the placement section, and the three jobs, all which we added using the above series of
workflow-templates commands. Also, notice the creation and update timestamps and version number, which were automatically generated by Dataproc. Lastly, notice the name, which refers to the GCP project and region where this copy of the template is located. Had we used an existing cluster with our workflow, as opposed to a managed cluster, the placement section would have looked as follows.
placement: clusterSelector: clusterLabels: goog-dataproc-cluster-uuid: your_clusters_uuid_label_value
To instantiate the workflow, we use the
workflow-templates instantiate command. This command will create the managed cluster, run all the steps (jobs), then delete the cluster. I have added the
time command to see how fast the workflow will take to complete.
time gcloud dataproc workflow-templates instantiate \ $TEMPLATE_ID --region $REGION #--async
We can observe the progress from the Google Cloud Dataproc Console, or from the command line by omitting the
--async flag. Below we see the three jobs completed successfully on the managed cluster.
Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/e720bb96-9c87-330e-b1cd-efa4612b3c57]. WorkflowTemplate [template-demo-1] RUNNING Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/e1fe53de-92f2-4f8c-8b3a-fda5e13829b6]. Created cluster: three-node-cluster-ugdo4ygpl52bo. Job ID ibrd-small-spark-ugdo4ygpl52bo RUNNING Job ID ibrd-large-spark-ugdo4ygpl52bo RUNNING Job ID ibrd-large-pyspark-ugdo4ygpl52bo RUNNING Job ID ibrd-small-spark-ugdo4ygpl52bo COMPLETED Job ID ibrd-large-spark-ugdo4ygpl52bo COMPLETED Job ID ibrd-large-pyspark-ugdo4ygpl52bo COMPLETED Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/f2a40c33-3cdf-47f5-92d6-345463fbd404]. WorkflowTemplate [template-demo-1] DONE Deleted cluster: three-node-cluster-ugdo4ygpl52bo. 1.02s user 0.35s system 0% cpu 5:03.55 total
In the output, you see the creation of the cluster, the three jobs running and completing successfully, and finally the cluster deletion. The entire workflow took approximately 5 minutes to complete. Below is the view of the workflow’s results from the Dataproc Clusters Console Jobs tab.
Below we see the output from the PySpark job, run as part of the workflow template, shown in the Dataproc Clusters Console Output tab. Notice the three input arguments we passed to the Python script from the workflow template, listed in the output.
We see the arguments passed to the job, from the Jobs Configuration tab.
Examining the Google Cloud Dataproc Jobs Console, we will observe that the WorkflowTemplate API automatically adds a unique alphanumeric extension to both the name of the managed clusters we create, as well as to the name of each job that is run. The extension on the cluster name matches the extension on the jobs ran on that cluster.
YAML-based Workflow Template
Although, the above WorkflowTemplates API-based workflow was certainly more convenient than using the individual Cloud Dataproc API commands. At a minimum, we don’t have to remember to delete our cluster when the jobs are complete, as I often do. To further optimize the workflow, we will introduce YAML-based Workflow Template. According to Google, you can define a workflow template in a YAML file, then instantiate the template to run the workflow. You can also import and export a workflow template YAML file to create and update a Cloud Dataproc workflow template resource.
We can export our first workflow template to create our YAML-based template file.
gcloud dataproc workflow-templates export template-demo-1 \ --destination template-demo-2.yaml \ --region $REGION
Note the template looks almost similar to the template we just created previously using the WorkflowTemplates API. The YAML-based template requires the placement and jobs fields. All the available fields are detailed, here.
To run the template we use the
workflow-templates instantiate-from-file command. Again, I will use the
time command to measure performance.
time gcloud dataproc workflow-templates instantiate-from-file \ --file template-demo-2.yaml \ --region $REGION
workflow-templates instantiate-from-file command will run a workflow, nearly identical to the workflow we ran in the previous example, with a similar timing. Below we see the three jobs completed successfully on the managed cluster, in approximately the same time as the previous workflow.
Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/7ba3c28e-ebfa-32e7-9dd6-d938a1cfe23b]. WorkflowTemplate RUNNING Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/8d05199f-ed36-4787-8a28-ae784c5bc8ae]. Created cluster: three-node-cluster-5k3bdmmvnna2y. Job ID ibrd-small-spark-5k3bdmmvnna2y RUNNING Job ID ibrd-large-spark-5k3bdmmvnna2y RUNNING Job ID ibrd-large-pyspark-5k3bdmmvnna2y RUNNING Job ID ibrd-small-spark-5k3bdmmvnna2y COMPLETED Job ID ibrd-large-spark-5k3bdmmvnna2y COMPLETED Job ID ibrd-large-pyspark-5k3bdmmvnna2y COMPLETED Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/a436ae82-f171-4b0a-9b36-5e16406c75d5]. WorkflowTemplate DONE Deleted cluster: three-node-cluster-5k3bdmmvnna2y. 1.16s user 0.44s system 0% cpu 4:48.84 total
Parameterization of Templates
To further optimize the workflow template process for re-use, we have the option of passing parameters to our template. Imagine you now receive new loan snapshot data files every night. Imagine you need to run the same data analysis on the financial transactions of thousands of your customers, nightly. Parameterizing templates makes it more flexible and reusable. By removing hard-codes values, such as Storage bucket paths and data file names, a single template may be re-used for multiple variations of the same job. Parameterization allows you to automate hundreds or thousands of Spark and Hadoop jobs in a workflow or workflows, each with different parameters, programmatically.
To demonstrate the parameterization of a workflow template, we create another YAML-based template with just the Python/PySpark job, template-demo-3.yaml. If you recall from our first example, the Python script, international_loans_dataproc.py, requires three input arguments: the bucket where the data is located and the and results are placed, the name of the data file, and the directory in the bucket, where the results will be placed.
We will replace four of the values in the template with parameters. We will inject those parameter’s values when we instantiate the workflow. Below is the new parameterized template. The template now has a parameters section from lines 26–46. They define parameters that will be used to replace the four values on lines 3–7 (gist).
Note the PySpark job’s three arguments and the location of the Python script have been parameterized. Parameters may include validation. As an example of validation, the template uses regex to validate the format of the Storage bucket path. The regex follows Google’s RE2 regular expression library syntax. If you need help with regex, the Regex Tester – Golang website is a convenient way to test your parameter’s regex validations.
First, we import the new parameterized YAML-based workflow template, using the
workflow-templates import command. Then, we instantiate the template using the
workflow-templates instantiate command. The
workflow-templates instantiate command will run the single PySpark job, analyzing the smaller IBRD data file, and placing the resulting Parquet-format file in a directory within the Storage bucket. We pass the Python script location, bucket link, smaller IBRD data file name, and output directory, as parameters to the template, and therefore indirectly, three of these, as input arguments to the Python script.
export TEMPLATE_ID=template-demo-3 gcloud dataproc workflow-templates import $TEMPLATE_ID \ --region $REGION --source template-demo-3.yaml gcloud dataproc workflow-templates instantiate \ $TEMPLATE_ID --region $REGION --async \ --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-latest-available-snapshot.csv",RESULTS_DIRECTORY="ibrd-summary-small-python"
Next, we will analyze the larger historic data file, using the same parameterized YAML-based workflow template, but changing two of the four parameters we are passing to the template with the
workflow-templates instantiate command. This will run a single PySpark job on the larger IBRD data file and place the resulting Parquet-format file in a different directory within the Storage bucket.
time gcloud dataproc workflow-templates instantiate \ $TEMPLATE_ID --region $REGION \ --parameters MAIN_PYTHON_FILE="$BUCKET_NAME/international_loans_dataproc.py",STORAGE_BUCKET=$BUCKET_NAME,IBRD_DATA_FILE="ibrd-statement-of-loans-historical-data.csv",RESULTS_DIRECTORY="ibrd-summary-large-python"
This is the power of parameterization—one workflow template and one job script, but two different datasets and two different results.
Below we see the single PySpark job ran on the managed cluster.
Waiting on operation [projects/dataproc-demo-224523/regions/us-east1/operations/b3c5063f-e3cf-3833-b613-83db12b82f32]. WorkflowTemplate [template-demo-3] RUNNING Creating cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105]. Created cluster: three-node-cluster-j6q2al2mkkqck. Job ID ibrd-pyspark-j6q2al2mkkqck RUNNING Job ID ibrd-pyspark-j6q2al2mkkqck COMPLETED Deleting cluster: Operation ID [projects/dataproc-demo-224523/regions/us-east1/operations/fe4a263e-7c6d-466e-a6e2-52292cbbdc9b]. WorkflowTemplate [template-demo-3] DONE Deleted cluster: three-node-cluster-j6q2al2mkkqck. 0.98s user 0.40s system 0% cpu 4:19.42 total
workflow-templates list command again, should display a list of two workflow templates.
gcloud dataproc workflow-templates list --region $REGION ID JOBS UPDATE_TIME VERSION template-demo-3 1 2018-12-15T17:04:39.064Z 2 template-demo-1 3 2018-12-15T16:32:06.508Z 5
Looking within the Google Cloud Storage bucket, we should now see four different folders, the results of the workflows.
Job Results and Testing
To check on the status of a job, we use the
dataproc jobs wait command. This returns the standard output (stdout) and standard error (stderr) for that specific job.
export SET_ID=ibrd-large-dataset-pyspark-cxzzhr2ro3i54 gcloud dataproc jobs wait $SET_ID \ --project $PROJECT_ID \ --region $REGION
dataproc jobs wait command is frequently used for automated testing of jobs, often within a CI/CD pipeline. Assume we have expected part of the job output that indicates success, such as a string, boolean, or numeric value. We could any number of test frameworks or other methods to confirm the existence of that expected value or values. Below is a simple example of using the
grep command to check for the existence of the expected line ‘
state: FINISHED’ in the standard output of the
dataproc jobs wait command.
command=$(gcloud dataproc jobs wait $SET_ID \ --project $PROJECT_ID \ --region $REGION) &>/dev/null if grep -Fqx " state: FINISHED" <<< $command &>/dev/null; then echo "Job Success!" else echo "Job Failure?" fi # single line alternative if grep -Fqx " state: FINISHED" <<< $command &>/dev/null;then echo "Job Success!";else echo "Job Failure?";fi Job Success!
Notice the three distinct series of operations within each workflow, shown with the
operations list command: WORKFLOW, CREATE, and DELETE. In the example below, I’ve separated the operations by workflow, for better clarity.
gcloud dataproc operations list --region $REGION NAME TIMESTAMP TYPE STATE ERROR WARNINGS fe4a263e-7c6d-466e-a6e2-52292cbbdc9b 2018-12-15T17:11:45.178Z DELETE DONE 896b7922-da8e-49a9-bd80-b1ac3fda5105 2018-12-15T17:08:38.322Z CREATE DONE b3c5063f-e3cf-3833-b613-83db12b82f32 2018-12-15T17:08:37.497Z WORKFLOW DONE --- be0e5293-275f-46ad-b1f4-696ba44c222e 2018-12-15T17:07:26.305Z DELETE DONE 6784078c-cbe3-4c1e-a56e-217149f555a4 2018-12-15T17:04:40.613Z CREATE DONE fcd8039e-a260-3ab3-ad31-01abc1a524b4 2018-12-15T17:04:40.007Z WORKFLOW DONE --- b4b23ca6-9442-4ffb-8aaf-460bac144dd8 2018-12-15T17:02:16.744Z DELETE DONE 89ef9c7c-f3c9-4d01-9091-61ed9e1f085d 2018-12-15T17:01:45.514Z CREATE DONE 243fa7c1-502d-3d7a-aaee-b372fe317570 2018-12-15T17:01:44.895Z WORKFLOW DONE
We use the results of the
operations list command to execute the
operations describe command to describe a specific operation.
gcloud dataproc operations describe \ projects/$PROJECT_ID/regions/$REGION/operations/896b7922-da8e-49a9-bd80-b1ac3fda5105
Each type of operation contains different details. Note the fine-grain of detail we get from Dataproc using the
operations describe command for a CREATE operation (gist).
In this brief, follow-up post to the previous post, Big Data Analytics with Java and Python, using Cloud Dataproc, Google’s Fully-Managed Spark and Hadoop Service, we have seen how easy the WorkflowTemplates API and YAML-based workflow templates make automating our analytics jobs. This post only scraped the surface of the complete functionality of the WorkflowTemplates API and parameterization of templates.
In a future post, we leverage the automation capabilities of the Google Cloud Platform, the WorkflowTemplates API, YAML-based workflow templates, and parameterization, to develop a fully-automated DevOps for Big Data workflow, capable of running hundreds of Spark and Hadoop jobs.
All opinions expressed in this post are my own and not necessarily the views of my current or past employers or their clients.