Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit cbb910e

Browse filesBrowse files
authored
Dataflow client library (GoogleCloudPlatform#2450)
* Updated requirements * Update service naming convention * Prefer client libraries over shell commands * Update README format
1 parent 2b2cef9 commit cbb910e
Copy full SHA for cbb910e

File tree

Expand file treeCollapse file tree

5 files changed

+91
-75
lines changed
Filter options
Expand file treeCollapse file tree

5 files changed

+91
-75
lines changed

‎dataflow/run_template/README.md

Copy file name to clipboard
+38-52Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,52 @@
11
# Run template
22

3-
[`main.py`](main.py) - Script to run an [Apache Beam] template on [Google Cloud Dataflow].
3+
[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor)
44

5-
The following examples show how to run the [`Word_Count` template], but you can run any other template.
5+
This sample demonstrate how to run an
6+
[Apache Beam](https://beam.apache.org/)
7+
template on [Google Cloud Dataflow](https://cloud.google.com/dataflow/docs/).
8+
For more information, see the
9+
[Running templates](https://cloud.google.com/dataflow/docs/guides/templates/running-templates)
10+
docs page.
611

7-
For the `Word_Count` template, we require to pass an `output` Cloud Storage path prefix, and optionally we can pass an `inputFile` Cloud Storage file pattern for the inputs.
12+
The following examples show how to run the
13+
[`Word_Count` template](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/WordCount.java),
14+
but you can run any other template.
15+
16+
For the `Word_Count` template, we require to pass an `output` Cloud Storage path prefix,
17+
and optionally we can pass an `inputFile` Cloud Storage file pattern for the inputs.
818
If `inputFile` is not passed, it will take `gs://apache-beam-samples/shakespeare/kinglear.txt` as default.
919

1020
## Before you begin
1121

12-
1. Install the [Cloud SDK].
13-
14-
1. [Create a new project].
15-
16-
1. [Enable billing].
17-
18-
1. [Enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,datastore.googleapis.com,cloudfunctions.googleapis.com,cloudresourcemanager.googleapis.com): Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Datastore, Cloud Functions, and Cloud Resource Manager.
19-
20-
1. Setup the Cloud SDK to your GCP project.
21-
22-
```bash
23-
gcloud init
24-
```
22+
Follow the
23+
[Getting started with Google Cloud Dataflow](../README.md)
24+
page, and make sure you have a Google Cloud project with billing enabled
25+
and a *service account JSON key* set up in your `GOOGLE_APPLICATION_CREDENTIALS` environment variable.
26+
Additionally, for this sample you need the following:
2527

2628
1. Create a Cloud Storage bucket.
2729

28-
```bash
29-
gsutil mb gs://your-gcs-bucket
30+
```sh
31+
export BUCKET=your-gcs-bucket
32+
gsutil mb gs://$BUCKET
3033
```
3134

32-
## Setup
33-
34-
The following instructions will help you prepare your development environment.
35-
36-
1. [Install Python and virtualenv].
37-
3835
1. Clone the `python-docs-samples` repository.
3936

40-
```bash
41-
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
42-
```
37+
```sh
38+
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
39+
```
4340

4441
1. Navigate to the sample code directory.
4542

46-
```bash
43+
```sh
4744
cd python-docs-samples/dataflow/run_template
4845
```
4946

5047
1. Create a virtual environment and activate it.
5148

52-
```bash
49+
```sh
5350
virtualenv env
5451
source env/bin/activate
5552
```
@@ -58,18 +55,18 @@ The following instructions will help you prepare your development environment.
5855
5956
1. Install the sample requirements.
6057

61-
```bash
58+
```sh
6259
pip install -U -r requirements.txt
6360
```
6461

6562
## Running locally
6663

67-
To run a Dataflow template from the command line.
64+
* [`main.py`](main.py)
65+
* [REST API dataflow/projects.templates.launch](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.templates/launch)
6866

69-
> NOTE: To run locally, you'll need to [create a service account key] as a JSON file.
70-
> Then export an environment variable called `GOOGLE_APPLICATION_CREDENTIALS` pointing it to your service account file.
67+
To run a Dataflow template from the command line.
7168

72-
```bash
69+
```sh
7370
python main.py \
7471
--project <your-gcp-project> \
7572
--job wordcount-$(date +'%Y%m%d-%H%M%S') \
@@ -80,10 +77,10 @@ python main.py \
8077

8178
## Running in Python
8279

83-
To run a Dataflow template from Python.
80+
* [`main.py`](main.py)
81+
* [REST API dataflow/projects.templates.launch](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.templates/launch)
8482

85-
> NOTE: To run locally, you'll need to [create a service account key] as a JSON file.
86-
> Then export an environment variable called `GOOGLE_APPLICATION_CREDENTIALS` pointing it to your service account file.
83+
To run a Dataflow template from Python.
8784

8885
```py
8986
import main as run_template
@@ -101,9 +98,12 @@ run_template.run(
10198

10299
## Running in Cloud Functions
103100

101+
* [`main.py`](main.py)
102+
* [REST API dataflow/projects.templates.launch](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.templates/launch)
103+
104104
To deploy this into a Cloud Function and run a Dataflow template via an HTTP request as a REST API.
105105

106-
```bash
106+
```sh
107107
PROJECT=$(gcloud config get-value project) \
108108
REGION=$(gcloud config get-value functions/region)
109109

@@ -121,17 +121,3 @@ curl -X POST "https://$REGION-$PROJECT.cloudfunctions.net/run_template" \
121121
-d inputFile=gs://apache-beam-samples/shakespeare/kinglear.txt \
122122
-d output=gs://<your-gcs-bucket>/wordcount/outputs
123123
```
124-
125-
[Apache Beam]: https://beam.apache.org/
126-
[Google Cloud Dataflow]: https://cloud.google.com/dataflow/docs/
127-
[`Word_Count` template]: https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/WordCount.java
128-
129-
[Cloud SDK]: https://cloud.google.com/sdk/docs/
130-
[Create a new project]: https://console.cloud.google.com/projectcreate
131-
[Enable billing]: https://cloud.google.com/billing/docs/how-to/modify-project
132-
[Create a service account key]: https://console.cloud.google.com/apis/credentials/serviceaccountkey
133-
[Creating and managing service accounts]: https://cloud.google.com/iam/docs/creating-managing-service-accounts
134-
[GCP Console IAM page]: https://console.cloud.google.com/iam-admin/iam
135-
[Granting roles to service accounts]: https://cloud.google.com/iam/docs/granting-roles-to-service-accounts
136-
137-
[Install Python and virtualenv]: https://cloud.google.com/python/setup

‎dataflow/run_template/main.py

Copy file name to clipboardExpand all lines: dataflow/run_template/main.py
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ def run(project, job, template, parameters=None):
4343
# 'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
4444
# }
4545

46-
service = build('dataflow', 'v1b3')
47-
request = service.projects().templates().launch(
46+
dataflow = build('dataflow', 'v1b3')
47+
request = dataflow.projects().templates().launch(
4848
projectId=project,
4949
gcsPath=template,
5050
body={

‎dataflow/run_template/main_test.py

Copy file name to clipboardExpand all lines: dataflow/run_template/main_test.py
+47-17Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,38 +12,62 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
# To run the tests:
16+
# nox -s "lint(sample='./dataflow/run_template')"
17+
# nox -s "py27(sample='./dataflow/run_template')"
18+
# nox -s "py36(sample='./dataflow/run_template')"
19+
1520
import flask
1621
import json
1722
import os
1823
import pytest
19-
import subprocess as sp
2024
import time
2125

2226
from datetime import datetime
27+
from googleapiclient.discovery import build
28+
from googleapiclient.errors import HttpError
2329
from werkzeug.urls import url_encode
2430

2531
import main
2632

2733
PROJECT = os.environ['GCLOUD_PROJECT']
2834
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
2935

30-
# Wait time until a job can be cancelled, as a best effort.
31-
# If it fails to be cancelled, the job will run for ~8 minutes.
32-
WAIT_TIME = 5 # seconds
36+
dataflow = build('dataflow', 'v1b3')
3337

3438
# Create a fake "app" for generating test request contexts.
3539
@pytest.fixture(scope="module")
3640
def app():
3741
return flask.Flask(__name__)
3842

3943

40-
def test_run_template_empty_args(app):
44+
def test_run_template_python_empty_args(app):
45+
project = PROJECT
46+
job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S')
47+
template = 'gs://dataflow-templates/latest/Word_Count'
48+
with pytest.raises(HttpError):
49+
main.run(project, job, template)
50+
51+
52+
def test_run_template_python(app):
53+
project = PROJECT
54+
job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S')
55+
template = 'gs://dataflow-templates/latest/Word_Count'
56+
parameters = {
57+
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
58+
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
59+
}
60+
res = main.run(project, job, template, parameters)
61+
dataflow_jobs_cancel(res['job']['id'])
62+
63+
64+
def test_run_template_http_empty_args(app):
4165
with app.test_request_context():
4266
with pytest.raises(KeyError):
4367
main.run_template(flask.request)
4468

4569

46-
def test_run_template_url(app):
70+
def test_run_template_http_url(app):
4771
args = {
4872
'project': PROJECT,
4973
'job': datetime.now().strftime('test_run_template_url-%Y%m%d-%H%M%S'),
@@ -54,12 +78,10 @@ def test_run_template_url(app):
5478
with app.test_request_context('/?' + url_encode(args)):
5579
res = main.run_template(flask.request)
5680
data = json.loads(res)
57-
job_id = data['job']['id']
58-
time.sleep(WAIT_TIME)
59-
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
81+
dataflow_jobs_cancel(data['job']['id'])
6082

6183

62-
def test_run_template_data(app):
84+
def test_run_template_http_data(app):
6385
args = {
6486
'project': PROJECT,
6587
'job': datetime.now().strftime('test_run_template_data-%Y%m%d-%H%M%S'),
@@ -70,12 +92,10 @@ def test_run_template_data(app):
7092
with app.test_request_context(data=args):
7193
res = main.run_template(flask.request)
7294
data = json.loads(res)
73-
job_id = data['job']['id']
74-
time.sleep(WAIT_TIME)
75-
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
95+
dataflow_jobs_cancel(data['job']['id'])
7696

7797

78-
def test_run_template_json(app):
98+
def test_run_template_http_json(app):
7999
args = {
80100
'project': PROJECT,
81101
'job': datetime.now().strftime('test_run_template_json-%Y%m%d-%H%M%S'),
@@ -86,6 +106,16 @@ def test_run_template_json(app):
86106
with app.test_request_context(json=args):
87107
res = main.run_template(flask.request)
88108
data = json.loads(res)
89-
job_id = data['job']['id']
90-
time.sleep(WAIT_TIME)
91-
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
109+
dataflow_jobs_cancel(data['job']['id'])
110+
111+
112+
def dataflow_jobs_cancel(job_id):
113+
# Wait time until a job can be cancelled, as a best effort.
114+
# If it fails to be cancelled, the job will run for ~8 minutes.
115+
time.sleep(5) # seconds
116+
request = dataflow.projects().jobs().update(
117+
projectId=PROJECT,
118+
jobId=job_id,
119+
body={'requestedState': 'JOB_STATE_CANCELLED'}
120+
)
121+
request.execute()
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
google-api-python-client==1.7.9
1+
google-api-python-client==1.7.11

‎testing/requirements.txt

Copy file name to clipboardExpand all lines: testing/requirements.txt
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
beautifulsoup4==4.8.0
1+
beautifulsoup4==4.8.1
22
coverage==4.5.4
33
flaky==3.6.1
44
funcsigs==1.0.2
55
mock==3.0.5
66
mysql-python==1.2.5; python_version < "3.0"
77
PyCrypto==2.6.1
8-
pytest-cov==2.7.1
9-
pytest==4.6.5
8+
pytest-cov==2.8.1
9+
pytest==5.2.1
1010
pyyaml==5.1.2
1111
responses==0.10.6
1212
WebTest==2.0.33

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.