Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 71522b6

Browse filesBrowse files
gioiabwaprin
authored andcommitted
Fix region handling and allow to use an existing cluster. (GoogleCloudPlatform#1053)
1 parent 86da27e commit 71522b6
Copy full SHA for 71522b6

File tree

Expand file treeCollapse file tree

3 files changed

+61
-35
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+61
-35
lines changed

‎dataproc/README.md

Copy file name to clipboardExpand all lines: dataproc/README.md
+8-3Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,19 @@ To run list_clusters.py:
3535
python list_clusters.py <YOUR-PROJECT-ID> --region=us-central1
3636

3737

38-
To run create_cluster_and_submit_job, first create a GCS bucket, from the Cloud Console or with
38+
To run submit_job_to_cluster.py, first create a GCS bucket, from the Cloud Console or with
3939
gsutil:
4040

4141
gsutil mb gs://<your-input-bucket-name>
4242

43-
Then run:
43+
Then, if you want to rely on an existing cluster, run:
4444

45-
python create_cluster_and_submit_job.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>
45+
python submit_job_to_cluster.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>
46+
47+
Otherwise, if you want the script to create a new cluster for you:
48+
49+
python submit_job_to_cluster.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name> --create_new_cluster
50+
4651

4752
This will setup a cluster, upload the PySpark file, submit the job, print the result, then
4853
delete the cluster.

‎dataproc/dataproc_e2e_test.py

Copy file name to clipboardExpand all lines: dataproc/dataproc_e2e_test.py
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from gcp_devrel.testing.flaky import flaky
2222

23-
import create_cluster_and_submit_job
23+
import submit_job_to_cluster
2424

2525
PROJECT = os.environ['GCLOUD_PROJECT']
2626
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
@@ -30,6 +30,6 @@
3030

3131
@flaky
3232
def test_e2e():
33-
output = create_cluster_and_submit_job.main(
33+
output = submit_job_to_cluster.main(
3434
PROJECT, ZONE, CLUSTER_NAME, BUCKET)
3535
assert b"['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output

‎dataproc/create_cluster_and_submit_job.py renamed to ‎dataproc/submit_job_to_cluster.py

Copy file name to clipboardExpand all lines: dataproc/submit_job_to_cluster.py
+51-30Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
from google.cloud import storage
2020
import googleapiclient.discovery
2121

22-
# Currently only the "global" region is supported
23-
REGION = 'global'
2422
DEFAULT_FILENAME = 'pyspark_sort.py'
2523

2624

@@ -36,6 +34,14 @@ def get_pyspark_file(filename):
3634
return f, os.path.basename(filename)
3735

3836

37+
def get_region_from_zone(zone):
38+
try:
39+
region_as_list = zone.split('-')[:-1]
40+
return '-'.join(region_as_list)
41+
except (AttributeError, IndexError, ValueError):
42+
raise ValueError('Invalid zone provided, please check your input.')
43+
44+
3945
def upload_pyspark_file(project_id, bucket_name, filename, file):
4046
"""Uploads the PySpark file in this directory to the configured
4147
input bucket."""
@@ -59,8 +65,8 @@ def download_output(project_id, cluster_id, output_bucket, job_id):
5965

6066

6167
# [START create_cluster]
62-
def create_cluster(dataproc, project, cluster_name, zone):
63-
print('Creating cluster.')
68+
def create_cluster(dataproc, project, zone, region, cluster_name):
69+
print('Creating cluster...')
6470
zone_uri = \
6571
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
6672
project, zone)
@@ -75,19 +81,19 @@ def create_cluster(dataproc, project, cluster_name, zone):
7581
}
7682
result = dataproc.projects().regions().clusters().create(
7783
projectId=project,
78-
region=REGION,
84+
region=region,
7985
body=cluster_data).execute()
8086
return result
8187
# [END create_cluster]
8288

8389

84-
def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
85-
print('Waiting for cluster creation')
90+
def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
91+
print('Waiting for cluster creation...')
8692

8793
while True:
8894
result = dataproc.projects().regions().clusters().list(
8995
projectId=project_id,
90-
region=REGION).execute()
96+
region=region).execute()
9197
cluster_list = result['clusters']
9298
cluster = [c
9399
for c in cluster_list
@@ -100,10 +106,10 @@ def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
100106

101107

102108
# [START list_clusters_with_detail]
103-
def list_clusters_with_details(dataproc, project):
109+
def list_clusters_with_details(dataproc, project, region):
104110
result = dataproc.projects().regions().clusters().list(
105111
projectId=project,
106-
region=REGION).execute()
112+
region=region).execute()
107113
cluster_list = result['clusters']
108114
for cluster in cluster_list:
109115
print("{} - {}"
@@ -120,7 +126,8 @@ def get_cluster_id_by_name(cluster_list, cluster_name):
120126

121127

122128
# [START submit_pyspark_job]
123-
def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
129+
def submit_pyspark_job(dataproc, project, region,
130+
cluster_name, bucket_name, filename):
124131
"""Submits the Pyspark job to the cluster, assuming `filename` has
125132
already been uploaded to `bucket_name`"""
126133
job_details = {
@@ -136,7 +143,7 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
136143
}
137144
result = dataproc.projects().regions().jobs().submit(
138145
projectId=project,
139-
region=REGION,
146+
region=region,
140147
body=job_details).execute()
141148
job_id = result['reference']['jobId']
142149
print('Submitted job ID {}'.format(job_id))
@@ -145,29 +152,29 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
145152

146153

147154
# [START delete]
148-
def delete_cluster(dataproc, project, cluster):
155+
def delete_cluster(dataproc, project, region, cluster):
149156
print('Tearing down cluster')
150157
result = dataproc.projects().regions().clusters().delete(
151158
projectId=project,
152-
region=REGION,
159+
region=region,
153160
clusterName=cluster).execute()
154161
return result
155162
# [END delete]
156163

157164

158165
# [START wait]
159-
def wait_for_job(dataproc, project, job_id):
166+
def wait_for_job(dataproc, project, region, job_id):
160167
print('Waiting for job to finish...')
161168
while True:
162169
result = dataproc.projects().regions().jobs().get(
163170
projectId=project,
164-
region=REGION,
171+
region=region,
165172
jobId=job_id).execute()
166173
# Handle exceptions
167174
if result['status']['state'] == 'ERROR':
168175
raise Exception(result['status']['details'])
169176
elif result['status']['state'] == 'DONE':
170-
print('Job finished')
177+
print('Job finished.')
171178
return result
172179
# [END wait]
173180

@@ -181,34 +188,44 @@ def get_client():
181188
# [END get_client]
182189

183190

184-
def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
191+
def main(project_id, zone, cluster_name, bucket_name,
192+
pyspark_file=None, create_new_cluster=True):
185193
dataproc = get_client()
194+
region = get_region_from_zone(zone)
186195
try:
187196
if pyspark_file:
188197
spark_file, spark_filename = get_pyspark_file(pyspark_file)
189198
else:
190199
spark_file, spark_filename = get_default_pyspark_file()
191200

192-
create_cluster(dataproc, project_id, cluster_name, zone)
193-
wait_for_cluster_creation(dataproc, project_id, cluster_name, zone)
194-
upload_pyspark_file(project_id, bucket_name,
195-
spark_filename, spark_file)
201+
if create_new_cluster:
202+
create_cluster(
203+
dataproc, project_id, zone, region, cluster_name)
204+
wait_for_cluster_creation(
205+
dataproc, project_id, region, cluster_name)
206+
207+
upload_pyspark_file(
208+
project_id, bucket_name, spark_filename, spark_file)
209+
196210
cluster_list = list_clusters_with_details(
197-
dataproc, project_id)['clusters']
211+
dataproc, project_id, region)['clusters']
198212

199213
(cluster_id, output_bucket) = (
200214
get_cluster_id_by_name(cluster_list, cluster_name))
215+
201216
# [START call_submit_pyspark_job]
202217
job_id = submit_pyspark_job(
203-
dataproc, project_id, cluster_name, bucket_name, spark_filename)
218+
dataproc, project_id, region,
219+
cluster_name, bucket_name, spark_filename)
204220
# [END call_submit_pyspark_job]
205-
wait_for_job(dataproc, project_id, job_id)
221+
wait_for_job(dataproc, project_id, region, job_id)
206222

207223
output = download_output(project_id, cluster_id, output_bucket, job_id)
208224
print('Received job output {}'.format(output))
209225
return output
210226
finally:
211-
delete_cluster(dataproc, project_id, cluster_name)
227+
if create_new_cluster:
228+
delete_cluster(dataproc, project_id, region, cluster_name)
212229
spark_file.close()
213230

214231

@@ -220,15 +237,19 @@ def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
220237
parser.add_argument(
221238
'--project_id', help='Project ID you want to access.', required=True),
222239
parser.add_argument(
223-
'--zone', help='Region to create clusters in', required=True)
240+
'--zone', help='Zone to create clusters in/connect to', required=True)
224241
parser.add_argument(
225-
'--cluster_name', help='Name of the cluster to create', required=True)
242+
'--cluster_name',
243+
help='Name of the cluster to create/connect to', required=True)
226244
parser.add_argument(
227245
'--gcs_bucket', help='Bucket to upload Pyspark file to', required=True)
228246
parser.add_argument(
229247
'--pyspark_file', help='Pyspark filename. Defaults to pyspark_sort.py')
248+
parser.add_argument(
249+
'--create_new_cluster',
250+
action='store_true', help='States if the cluster should be created')
230251

231252
args = parser.parse_args()
232253
main(
233-
args.project_id, args.zone,
234-
args.cluster_name, args.gcs_bucket, args.pyspark_file)
254+
args.project_id, args.zone, args.cluster_name,
255+
args.gcs_bucket, args.pyspark_file, args.create_new_cluster)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.