Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit c2bcd20

Browse filesBrowse files
committed
Merge pull request GoogleCloudPlatform#314 from GoogleCloudPlatform/dataproc
Add Dataproc Sample
2 parents 4258361 + 03ba7a1 commit c2bcd20
Copy full SHA for c2bcd20

File tree

Expand file treeCollapse file tree

7 files changed

+419
-0
lines changed
Filter options
Expand file treeCollapse file tree

7 files changed

+419
-0
lines changed

‎dataproc/README.md

Copy file name to clipboard
+63Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Cloud Dataproc API Example
2+
3+
Sample command-line programs for interacting with the Cloud Dataproc API.
4+
5+
Note that while this sample demonstrates interacting with Dataproc via the API, the functionality
6+
demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.
7+
8+
`list_clusters.py` is a simple command-line program to demonstrate connecting to the
9+
Dataproc API and listing the clusters in a ergion
10+
11+
`create_cluster_and_submit_jbo.py` demonstrates how to create a cluster, submit the
12+
`pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result.
13+
14+
## Prerequisites to run locally:
15+
16+
* [pip](https://pypi.python.org/pypi/pip)
17+
18+
Go to the [Google Cloud Console](https://console.cloud.google.com).
19+
20+
Under API Manager, search for the Google Cloud Dataproc API and enable it.
21+
22+
23+
# Set Up Your Local Dev Environment
24+
To install, run the following commands. If you want to use [virtualenv](https://virtualenv.readthedocs.org/en/latest/)
25+
(recommended), run the commands within a virtualenv.
26+
27+
* pip install -r requirements.txt
28+
29+
Create local credentials by running the following command and following the oauth2 flow:
30+
31+
gcloud beta auth application-default login
32+
33+
To run list_clusters.py:
34+
35+
python list_clusters.py --project_id=<YOUR-PROJECT-ID> --zone=us-central1-b
36+
37+
38+
To run create_cluster_and_submit_job, first create a GCS bucket, from the Cloud Console or with
39+
gsutil:
40+
41+
gsutil mb gs://<your-input-bucket-name>
42+
43+
Then run:
44+
45+
python create_cluster_and_submit_job.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>
46+
47+
This will setup a cluster, upload the PySpark file, submit the job, print the result, then
48+
delete the cluster.
49+
50+
You can optionally specify a `--pyspark_file` argument to change from the default
51+
`pyspark_sort.py` included in this script to a new script.
52+
53+
## Running on GCE, GAE, or other environments
54+
55+
On Google App Engine, the credentials should be found automatically.
56+
57+
On Google Compute Engine, the credentials should be found automatically, but require that
58+
you create the instance with the correct scopes.
59+
60+
gcloud compute instances create --scopes="https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/compute,https://www.googleapis.com/auth/compute.readonly" test-instance
61+
62+
If you did not create the instance with the right scopes, you can still upload a JSON service
63+
account and set GOOGLE_APPLICATION_CREDENTIALS as described below.

‎dataproc/create_cluster.py

Copy file name to clipboardExpand all lines: dataproc/create_cluster.py
Whitespace-only changes.
+235Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
#!/usr/bin/env python
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
""" Sample command-line program for listing Google Dataproc Clusters"""
15+
16+
import argparse
17+
import os
18+
19+
from apiclient import discovery
20+
from gcloud import storage
21+
from oauth2client.client import GoogleCredentials
22+
23+
# Currently only the "global" region is supported
24+
REGION = 'global'
25+
DEFAULT_FILENAME = 'pyspark_sort.py'
26+
27+
28+
def get_default_pyspark_file():
29+
"""Gets the PySpark file from this directory"""
30+
current_dir = os.path.dirname(os.path.abspath(__file__))
31+
f = open(os.path.join(current_dir, DEFAULT_FILENAME), 'r')
32+
return f, DEFAULT_FILENAME
33+
34+
35+
def get_pyspark_file(filename):
36+
f = open(filename, 'r')
37+
return f, os.path.basename(filename)
38+
39+
40+
def upload_pyspark_file(project_id, bucket_name, filename, file):
41+
"""Uploads the PySpark file in this directory to the configured
42+
input bucket."""
43+
print('Uploading pyspark file to GCS')
44+
client = storage.Client(project=project_id)
45+
bucket = client.get_bucket(bucket_name)
46+
blob = bucket.blob(filename)
47+
blob.upload_from_file(file)
48+
49+
50+
def download_output(project_id, cluster_id, output_bucket, job_id):
51+
"""Downloads the output file from Cloud Storage and returns it as a
52+
string."""
53+
print('Downloading output file')
54+
client = storage.Client(project=project_id)
55+
bucket = client.get_bucket(output_bucket)
56+
output_blob = (
57+
'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'
58+
.format(cluster_id, job_id))
59+
return bucket.blob(output_blob).download_as_string()
60+
61+
62+
# [START create_cluster]
63+
def create_cluster(dataproc, project, cluster_name, zone):
64+
print('Creating cluster.')
65+
zone_uri = \
66+
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
67+
project, zone)
68+
cluster_data = {
69+
'projectId': project,
70+
'clusterName': cluster_name,
71+
'config': {
72+
'gceClusterConfig': {
73+
'zoneUri': zone_uri
74+
}
75+
}
76+
}
77+
result = dataproc.projects().regions().clusters().create(
78+
projectId=project,
79+
region=REGION,
80+
body=cluster_data).execute()
81+
return result
82+
# [END create_cluster]
83+
84+
85+
def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
86+
print('Waiting for cluster creation')
87+
88+
while True:
89+
result = dataproc.projects().regions().clusters().list(
90+
projectId=project_id,
91+
region=REGION).execute()
92+
cluster_list = result['clusters']
93+
cluster = [c
94+
for c in cluster_list
95+
if c['clusterName'] == cluster_name][0]
96+
if cluster['status']['state'] == 'ERROR':
97+
raise Exception(result['status']['details'])
98+
if cluster['status']['state'] == 'RUNNING':
99+
print("Cluster created.")
100+
break
101+
102+
103+
# [START list_clusters_with_detail]
104+
def list_clusters_with_details(dataproc, project):
105+
result = dataproc.projects().regions().clusters().list(
106+
projectId=project,
107+
region=REGION).execute()
108+
cluster_list = result['clusters']
109+
for cluster in cluster_list:
110+
print("{} - {}"
111+
.format(cluster['clusterName'], cluster['status']['state']))
112+
return result
113+
# [END list_clusters_with_detail]
114+
115+
116+
def get_cluster_id_by_name(cluster_list, cluster_name):
117+
"""Helper function to retrieve the ID and output bucket of a cluster by
118+
name."""
119+
cluster = [c for c in cluster_list if c['clusterName'] == cluster_name][0]
120+
return cluster['clusterUuid'], cluster['config']['configBucket']
121+
122+
123+
# [START submit_pyspark_job]
124+
def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
125+
"""Submits the Pyspark job to the cluster, assuming `filename` has
126+
already been uploaded to `bucket_name`"""
127+
job_details = {
128+
'projectId': project,
129+
'job': {
130+
'placement': {
131+
'clusterName': cluster_name
132+
},
133+
'pysparkJob': {
134+
'mainPythonFileUri': 'gs://{}/{}'.format(bucket_name, filename)
135+
}
136+
}
137+
}
138+
result = dataproc.projects().regions().jobs().submit(
139+
projectId=project,
140+
region=REGION,
141+
body=job_details).execute()
142+
job_id = result['reference']['jobId']
143+
print('Submitted job ID {}'.format(job_id))
144+
return job_id
145+
# [END submit_pyspark_job]
146+
147+
148+
# [START delete]
149+
def delete_cluster(dataproc, project, cluster):
150+
print('Tearing down cluster')
151+
result = dataproc.projects().regions().clusters().delete(
152+
projectId=project,
153+
region=REGION,
154+
clusterName=cluster).execute()
155+
return result
156+
# [END delete]
157+
158+
159+
# [START wait]
160+
def wait_for_job(dataproc, project, job_id):
161+
print('Waiting for job to finish...')
162+
while True:
163+
result = dataproc.projects().regions().jobs().get(
164+
projectId=project,
165+
region=REGION,
166+
jobId=job_id).execute()
167+
# Handle exceptions
168+
if result['status']['state'] == 'ERROR':
169+
print(result)
170+
raise Exception(result['status']['details'])
171+
elif result['status']['state'] == 'DONE':
172+
print('Job finished')
173+
return result
174+
# [END wait]
175+
176+
177+
# [START get_client]
178+
def get_client():
179+
"""Builds an http client authenticated with the service account
180+
credentials."""
181+
credentials = GoogleCredentials.get_application_default()
182+
dataproc = discovery.build('dataproc', 'v1', credentials=credentials)
183+
return dataproc
184+
# [END get_client]
185+
186+
187+
def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
188+
dataproc = get_client()
189+
try:
190+
if pyspark_file:
191+
spark_file, spark_filename = get_pyspark_file(pyspark_file)
192+
else:
193+
spark_file, spark_filename = get_default_pyspark_file()
194+
195+
create_cluster(dataproc, project_id, cluster_name, zone)
196+
wait_for_cluster_creation(dataproc, project_id, cluster_name, zone)
197+
upload_pyspark_file(project_id, bucket_name,
198+
spark_filename, spark_file)
199+
cluster_list = list_clusters_with_details(
200+
dataproc, project_id)['clusters']
201+
202+
(cluster_id, output_bucket) = (
203+
get_cluster_id_by_name(cluster_list, cluster_name))
204+
job_id = submit_pyspark_job(
205+
dataproc, project_id, cluster_name, bucket_name, spark_filename)
206+
wait_for_job(dataproc, project_id, job_id)
207+
208+
output = download_output(project_id, cluster_id, output_bucket, job_id)
209+
print('Received job output {}'.format(output))
210+
return output
211+
finally:
212+
delete_cluster(dataproc, project_id, cluster_name)
213+
spark_file.close()
214+
215+
216+
if __name__ == '__main__':
217+
parser = argparse.ArgumentParser(
218+
description=__doc__,
219+
formatter_class=argparse.RawDescriptionHelpFormatter
220+
)
221+
parser.add_argument(
222+
'--project_id', help='Project ID you want to access.', required=True),
223+
parser.add_argument(
224+
'--zone', help='Region to create clusters in', required=True)
225+
parser.add_argument(
226+
'--cluster_name', help='Region to create clusters in', required=True)
227+
parser.add_argument(
228+
'--gcs_bucket', help='Bucket to upload Pyspark file to', required=True)
229+
parser.add_argument(
230+
'--pyspark_file', help='Pyspark filename. Defaults to pyspark_sort.py')
231+
232+
args = parser.parse_args()
233+
main(
234+
args.project_id, args.zone,
235+
args.cluster_name, args.gcs_bucket, args.pyspark_file)

‎dataproc/dataproc_e2e_test.py

Copy file name to clipboard
+30Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
""" Integration tests for Dataproc samples.
14+
15+
Creates a Dataproc cluster, uploads a pyspark file to Google Cloud Storage,
16+
submits a job to Dataproc that runs the pyspark file, then downloads
17+
the output logs from Cloud Storage and verifies the expected output."""
18+
19+
import create_cluster_and_submit_job
20+
from gcp.testing.flaky import flaky
21+
22+
CLUSTER_NAME = 'testcluster2'
23+
ZONE = 'us-central1-b'
24+
25+
26+
@flaky
27+
def test_e2e(cloud_config):
28+
output = create_cluster_and_submit_job.main(
29+
cloud_config.project, ZONE, CLUSTER_NAME, cloud_config.storage_bucket)
30+
assert "['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output

‎dataproc/list_clusters.py

Copy file name to clipboard
+61Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env python
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
""" Sample command-line program for listing Google Dataproc Clusters
15+
"""
16+
17+
import argparse
18+
19+
from apiclient import discovery
20+
from oauth2client.client import GoogleCredentials
21+
22+
# Currently only the "global" region is supported
23+
REGION = 'global'
24+
25+
26+
# [START list_clusters]
27+
def list_clusters(dataproc, project):
28+
result = dataproc.projects().regions().clusters().list(
29+
projectId=project,
30+
region=REGION).execute()
31+
return result
32+
# [END list_clusters]
33+
34+
35+
# [START get_client]
36+
def get_client():
37+
"""Builds an http client authenticated with the service account
38+
credentials."""
39+
credentials = GoogleCredentials.get_application_default()
40+
dataproc = discovery.build('dataproc', 'v1', credentials=credentials)
41+
return dataproc
42+
# [END get_client]
43+
44+
45+
def main(project_id, zone):
46+
dataproc = get_client()
47+
result = list_clusters(dataproc, project_id)
48+
print(result)
49+
50+
if __name__ == '__main__':
51+
parser = argparse.ArgumentParser(
52+
description=__doc__,
53+
formatter_class=argparse.RawDescriptionHelpFormatter
54+
)
55+
parser.add_argument(
56+
'project_id', help='Project ID you want to access.'),
57+
parser.add_argument(
58+
'zone', help='Region to create clusters in')
59+
60+
args = parser.parse_args()
61+
main(args.project_id, args.zone)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.