Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 1bddafd

Browse filesBrowse files
aman-ebaytswast
authored andcommitted
Update Dataproc samples. (GoogleCloudPlatform#2158)
* Update requirements.txt * Update python-api-walkthrough.md * Update submit_job_to_cluster.py * Update list_clusters.py
1 parent ff8f235 commit 1bddafd
Copy full SHA for 1bddafd

File tree

Expand file treeCollapse file tree

6 files changed

+426
-194
lines changed
Filter options
Expand file treeCollapse file tree

6 files changed

+426
-194
lines changed

‎dataproc/README.md

Copy file name to clipboardExpand all lines: dataproc/README.md
+16-22Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Cloud Dataproc API Example
1+
# Cloud Dataproc API Examples
22

33
[![Open in Cloud Shell][shell_img]][shell_link]
44

@@ -7,21 +7,20 @@
77

88
Sample command-line programs for interacting with the Cloud Dataproc API.
99

10-
11-
Please see [the tutorial on the using the Dataproc API with the Python client
10+
See [the tutorial on the using the Dataproc API with the Python client
1211
library](https://cloud.google.com/dataproc/docs/tutorials/python-library-example)
13-
for more information.
12+
for information on a walkthrough you can run to try out the Cloud Dataproc API sample code.
1413

15-
Note that while this sample demonstrates interacting with Dataproc via the API, the functionality
16-
demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.
14+
Note that while this sample demonstrates interacting with Dataproc via the API, the functionality demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.
1715

18-
`list_clusters.py` is a simple command-line program to demonstrate connecting to the
19-
Dataproc API and listing the clusters in a region
16+
`list_clusters.py` is a simple command-line program to demonstrate connecting to the Cloud Dataproc API and listing the clusters in a region.
2017

21-
`create_cluster_and_submit_job.py` demonstrates how to create a cluster, submit the
18+
`submit_job_to_cluster.py` demonstrates how to create a cluster, submit the
2219
`pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result.
2320

24-
`pyspark_sort.py_gcs` is the asme as `pyspark_sort.py` but demonstrates
21+
`single_job_workflow.py` uses the Cloud Dataproc InstantiateInlineWorkflowTemplate API to create an ephemeral cluster, run a job, then delete the cluster with one API request.
22+
23+
`pyspark_sort.py_gcs` is the same as `pyspark_sort.py` but demonstrates
2524
reading from a GCS bucket.
2625

2726
## Prerequisites to run locally:
@@ -59,32 +58,27 @@ To run list_clusters.py:
5958

6059
python list_clusters.py $GOOGLE_CLOUD_PROJECT --region=$REGION
6160

62-
`submit_job_to_cluster.py` can create the Dataproc cluster, or use an existing one.
63-
If you'd like to create a cluster ahead of time, either use the
64-
[Cloud Console](console.cloud.google.com) or run:
61+
`submit_job_to_cluster.py` can create the Dataproc cluster or use an existing cluster. To create a cluster before running the code, you can use the [Cloud Console](console.cloud.google.com) or run:
6562

6663
gcloud dataproc clusters create your-cluster-name
6764

68-
To run submit_job_to_cluster.py, first create a GCS bucket for Dataproc to stage files, from the Cloud Console or with
69-
gsutil:
65+
To run submit_job_to_cluster.py, first create a GCS bucket (used by Cloud Dataproc to stage files) from the Cloud Console or with gsutil:
7066

7167
gsutil mb gs://<your-staging-bucket-name>
7268

73-
Set the environment variable's name:
69+
Next, set the following environment variables:
7470

7571
BUCKET=your-staging-bucket
7672
CLUSTER=your-cluster-name
7773

78-
Then, if you want to rely on an existing cluster, run:
74+
Then, if you want to use an existing cluster, run:
7975

8076
python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET
8177

82-
Otherwise, if you want the script to create a new cluster for you:
78+
Alternatively, to create a new cluster, which will be deleted at the end of the job, run:
8379

8480
python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET --create_new_cluster
8581

86-
This will setup a cluster, upload the PySpark file, submit the job, print the result, then
87-
delete the cluster.
82+
The script will setup a cluster, upload the PySpark file, submit the job, print the result, then, if it created the cluster, delete the cluster.
8883

89-
You can optionally specify a `--pyspark_file` argument to change from the default
90-
`pyspark_sort.py` included in this script to a new script.
84+
Optionally, you can add the `--pyspark_file` argument to change from the default `pyspark_sort.py` included in this script to a new script.

‎dataproc/list_clusters.py

Copy file name to clipboardExpand all lines: dataproc/list_clusters.py
+29-24Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,49 +10,54 @@
1010
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
13+
"""Sample command-line program to list Cloud Dataproc clusters in a region.
1314
14-
""" Sample command-line program for listing Google Dataproc Clusters
15-
"""
15+
Example usage:
16+
python list_clusters.py --project_id=my-project-id --region=global
1617
18+
"""
1719
import argparse
1820

19-
import googleapiclient.discovery
21+
from google.cloud import dataproc_v1
22+
from google.cloud.dataproc_v1.gapic.transports import (
23+
cluster_controller_grpc_transport)
2024

2125

2226
# [START dataproc_list_clusters]
2327
def list_clusters(dataproc, project, region):
24-
result = dataproc.projects().regions().clusters().list(
25-
projectId=project,
26-
region=region).execute()
27-
return result
28+
"""List the details of clusters in the region."""
29+
for cluster in dataproc.list_clusters(project, region):
30+
print(('{} - {}'.format(cluster.cluster_name,
31+
cluster.status.State.Name(
32+
cluster.status.state))))
2833
# [END dataproc_list_clusters]
2934

3035

31-
# [START dataproc_get_client]
32-
def get_client():
33-
"""Builds a client to the dataproc API."""
34-
dataproc = googleapiclient.discovery.build('dataproc', 'v1')
35-
return dataproc
36-
# [END dataproc_get_client]
36+
def main(project_id, region):
3737

38+
if region == 'global':
39+
# Use the default gRPC global endpoints.
40+
dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
41+
else:
42+
# Use a regional gRPC endpoint. See:
43+
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
44+
client_transport = (
45+
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
46+
address='{}-dataproc.googleapis.com:443'.format(region)))
47+
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
48+
client_transport)
3849

39-
def main(project_id, region):
40-
dataproc = get_client()
41-
result = list_clusters(dataproc, project_id, region)
42-
print(result)
50+
list_clusters(dataproc_cluster_client, project_id, region)
4351

4452

4553
if __name__ == '__main__':
4654
parser = argparse.ArgumentParser(
47-
description=__doc__,
48-
formatter_class=argparse.RawDescriptionHelpFormatter
49-
)
55+
description=__doc__, formatter_class=(
56+
argparse.RawDescriptionHelpFormatter))
5057
parser.add_argument(
51-
'project_id', help='Project ID you want to access.'),
52-
# Sets the region to "global" if it's not provided
53-
# Note: sub-regions (e.g.: us-central1-a/b) are currently not supported
58+
'--project_id', help='Project ID to access.', required=True)
5459
parser.add_argument(
55-
'--region', default='global', help='Region to list clusters')
60+
'--region', help='Region of clusters to list.', required=True)
5661

5762
args = parser.parse_args()
5863
main(args.project_id, args.region)

‎dataproc/python-api-walkthrough.md

Copy file name to clipboardExpand all lines: dataproc/python-api-walkthrough.md
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ Job output in Cloud Shell shows cluster creation, job submission,
121121
...
122122
Creating cluster...
123123
Cluster created.
124-
Uploading pyspark file to GCS
124+
Uploading pyspark file to Cloud Storage
125125
new-cluster-name - RUNNING
126126
Submitted job ID ...
127127
Waiting for job to finish...
@@ -140,12 +140,12 @@ Job output in Cloud Shell shows cluster creation, job submission,
140140
### Next Steps:
141141
142142
* **View job details from the Console.** View job details by selecting the
143-
PySpark job from the Cloud Dataproc
143+
PySpark job from the Cloud Dataproc
144144
[Jobs page](https://console.cloud.google.com/dataproc/jobs)
145145
in the Google Cloud Platform Console.
146146
147147
* **Delete resources used in the walkthrough.**
148-
The `submit_job.py` job deletes the cluster that it created for this
148+
The `submit_job_to_cluster.py` job deletes the cluster that it created for this
149149
walkthrough.
150150
151151
If you created a bucket to use for this walkthrough,

‎dataproc/requirements.txt

Copy file name to clipboard
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
google-api-python-client==1.7.8
1+
grpcio>=1.2.0
22
google-auth==1.6.2
33
google-auth-httplib2==0.0.3
44
google-cloud==0.34.0
55
google-cloud-storage==1.13.2
6+
google-cloud-dataproc==0.3.1

‎dataproc/single_job_workflow.py

Copy file name to clipboard
+208Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#!/usr/bin/env python
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
r"""Sample Cloud Dataproc inline workflow to run a pyspark job on an ephermeral
14+
cluster.
15+
Example Usage to run the inline workflow on a managed cluster:
16+
python single_job_workflow.py --project_id=$PROJECT --gcs_bucket=$BUCKET \
17+
--cluster_name=$CLUSTER --zone=$ZONE
18+
Example Usage to run the inline workflow on a global region managed cluster:
19+
python submit_job_to_cluster.py --project_id=$PROJECT --gcs_bucket=$BUCKET \
20+
--cluster_name=$CLUSTER --zone=$ZONE --global_region
21+
"""
22+
23+
from __future__ import absolute_import
24+
from __future__ import division
25+
from __future__ import print_function
26+
27+
import argparse
28+
import os
29+
30+
from google.cloud import dataproc_v1
31+
from google.cloud.dataproc_v1.gapic.transports import (
32+
workflow_template_service_grpc_transport)
33+
from google.cloud import storage
34+
35+
DEFAULT_FILENAME = "pyspark_sort.py"
36+
waiting_callback = False
37+
38+
39+
def get_pyspark_file(pyspark_file=None):
40+
if pyspark_file:
41+
f = open(pyspark_file, "rb")
42+
return f, os.path.basename(pyspark_file)
43+
else:
44+
"""Gets the PySpark file from current directory."""
45+
current_dir = os.path.dirname(os.path.abspath(__file__))
46+
f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb")
47+
return f, DEFAULT_FILENAME
48+
49+
50+
def get_region_from_zone(zone):
51+
try:
52+
region_as_list = zone.split("-")[:-1]
53+
return "-".join(region_as_list)
54+
except (AttributeError, IndexError, ValueError):
55+
raise ValueError("Invalid zone provided, please check your input.")
56+
57+
58+
def upload_pyspark_file(project, bucket_name, filename, spark_file):
59+
"""Uploads the PySpark file in this directory to the configured input
60+
bucket."""
61+
print("Uploading pyspark file to Cloud Storage.")
62+
client = storage.Client(project=project)
63+
bucket = client.get_bucket(bucket_name)
64+
blob = bucket.blob(filename)
65+
blob.upload_from_file(spark_file)
66+
67+
68+
def run_workflow(dataproc, project, region, zone, bucket_name, filename,
69+
cluster_name):
70+
71+
parent = "projects/{}/regions/{}".format(project, region)
72+
zone_uri = ("https://www.googleapis.com/compute/v1/projects/{}/zones/{}"
73+
.format(project, zone))
74+
75+
workflow_data = {
76+
"placement": {
77+
"managed_cluster": {
78+
"cluster_name": cluster_name,
79+
"config": {
80+
"gce_cluster_config": {"zone_uri": zone_uri},
81+
"master_config": {
82+
"num_instances": 1,
83+
"machine_type_uri": "n1-standard-1",
84+
},
85+
"worker_config": {
86+
"num_instances": 2,
87+
"machine_type_uri": "n1-standard-1",
88+
},
89+
},
90+
}
91+
},
92+
"jobs": [
93+
{
94+
"pyspark_job": {
95+
"main_python_file_uri": "gs://{}/{}".format(
96+
bucket_name, filename)
97+
},
98+
"step_id": "pyspark-job",
99+
}
100+
],
101+
}
102+
103+
workflow = dataproc.instantiate_inline_workflow_template(parent,
104+
workflow_data)
105+
106+
workflow.add_done_callback(callback)
107+
global waiting_callback
108+
waiting_callback = True
109+
110+
111+
def callback(operation_future):
112+
# Reset global when callback returns.
113+
global waiting_callback
114+
waiting_callback = False
115+
116+
117+
def wait_for_workflow_end():
118+
"""Wait for cluster creation."""
119+
print("Waiting for workflow completion ...")
120+
print("Workflow and job progress, and job driver output available from: "
121+
"https://console.cloud.google.com/dataproc/workflows/")
122+
123+
while True:
124+
if not waiting_callback:
125+
print("Workflow completed.")
126+
break
127+
128+
129+
def main(
130+
project_id,
131+
zone,
132+
cluster_name,
133+
bucket_name,
134+
pyspark_file=None,
135+
create_new_cluster=True,
136+
global_region=True,
137+
):
138+
139+
# [START dataproc_get_workflow_template_client]
140+
if global_region:
141+
region = "global"
142+
# Use the default gRPC global endpoints.
143+
dataproc_workflow_client = dataproc_v1.WorkflowTemplateServiceClient()
144+
else:
145+
region = get_region_from_zone(zone)
146+
# Use a regional gRPC endpoint. See:
147+
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
148+
client_transport = (workflow_template_service_grpc_transport
149+
.WorkflowTemplateServiceGrpcTransport(
150+
address="{}-dataproc.googleapis.com:443"
151+
.format(region)))
152+
dataproc_workflow_client = dataproc_v1.WorkflowTemplateServiceClient(
153+
client_transport
154+
)
155+
# [END dataproc_get_workflow_template_client]
156+
157+
try:
158+
spark_file, spark_filename = get_pyspark_file(pyspark_file)
159+
upload_pyspark_file(project_id, bucket_name, spark_filename,
160+
spark_file)
161+
162+
run_workflow(
163+
dataproc_workflow_client,
164+
project_id,
165+
region,
166+
zone,
167+
bucket_name,
168+
spark_filename,
169+
cluster_name
170+
)
171+
wait_for_workflow_end()
172+
173+
finally:
174+
spark_file.close()
175+
176+
177+
if __name__ == "__main__":
178+
parser = argparse.ArgumentParser(
179+
description=__doc__, formatter_class=(argparse
180+
.RawDescriptionHelpFormatter))
181+
parser.add_argument(
182+
"--project_id", help="Project ID you want to access.", required=True
183+
)
184+
parser.add_argument(
185+
"--zone", help="Zone to create clusters in/connect to", required=True
186+
)
187+
parser.add_argument(
188+
"--cluster_name", help="Name of the cluster to create/connect to",
189+
required=True
190+
)
191+
parser.add_argument(
192+
"--gcs_bucket", help="Bucket to upload Pyspark file to", required=True
193+
)
194+
parser.add_argument(
195+
"--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py"
196+
)
197+
parser.add_argument("--global_region",
198+
action="store_true",
199+
help="If cluster is in the global region")
200+
201+
args = parser.parse_args()
202+
main(
203+
args.project_id,
204+
args.zone,
205+
args.cluster_name,
206+
args.gcs_bucket,
207+
args.pyspark_file,
208+
)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.