Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f33ae9e

Browse filesBrowse files
JesseLovelacegcf-owl-bot[bot]msampathkumarm-strzelczyk
authored
feat(storagetransfer): Adds event driven transfer samples (GoogleCloudPlatform#10622)
* feat(storagetransfer): Adds event driven transfer samples * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Update storagetransfer/event_driven_aws_transfer.py Co-authored-by: Sampath Kumar <sampathm@google.com> * fix typo in event_driven_aws_transfer.py --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Sampath Kumar <sampathm@google.com> Co-authored-by: Maciej Strzelczyk <strzelczyk@google.com>
1 parent 2f12a30 commit f33ae9e
Copy full SHA for f33ae9e
Expand file treeCollapse file tree

7 files changed

+375
-2
lines changed

‎storagetransfer/conftest.py

Copy file name to clipboardExpand all lines: storagetransfer/conftest.py
+39-1Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from azure.storage.blob import BlobServiceClient, ContainerClient
2424
import boto3
25-
from google.cloud import secretmanager, storage, storage_transfer
25+
from google.cloud import pubsub_v1, secretmanager, storage, storage_transfer
2626
from google.cloud.storage_transfer import TransferJob
2727

2828
import pytest
@@ -367,3 +367,41 @@ def manifest_file(source_bucket: storage.Bucket):
367367

368368
# use arbitrary path and name
369369
yield f"gs://{source_bucket.name}/test-manifest.csv"
370+
371+
372+
@pytest.fixture
373+
def pubsub_id(project_id: str):
374+
"""
375+
Yields a pubsub subscription ID. Deletes it afterwards
376+
"""
377+
publisher = pubsub_v1.PublisherClient()
378+
topic_id = f"pubsub-sts-topic-{uuid.uuid4()}"
379+
topic_path = publisher.topic_path(project_id, topic_id)
380+
publisher.create_topic(request={"name": topic_path})
381+
382+
subscriber = pubsub_v1.SubscriberClient()
383+
subscription_id = f"pubsub-sts-subscription-{uuid.uuid4()}"
384+
subscription_path = subscriber.subscription_path(project_id, subscription_id)
385+
subscription = subscriber.create_subscription(
386+
request={"name": subscription_path, "topic": topic_path}
387+
)
388+
389+
yield str(subscription.name)
390+
391+
subscriber.delete_subscription(request={"subscription": subscription_path})
392+
subscriber.close()
393+
publisher.delete_topic(request={"topic": topic_path})
394+
395+
396+
@pytest.fixture
397+
def sqs_queue_arn(secret_cache):
398+
"""
399+
Yields an AWS SQS queue ARN. Deletes it afterwards.
400+
"""
401+
sqs = boto3.resource("sqs", **aws_key_pair(secret_cache), region_name="us-west-1")
402+
queue_name = f"sqs-sts-queue-{uuid.uuid4()}"
403+
queue = sqs.create_queue(QueueName=queue_name)
404+
405+
yield queue.attributes["QueueArn"]
406+
407+
queue.delete()
+121Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2023 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""
18+
Command-line sample that creates an event driven transfer between two GCS buckets that tracks an AWS SQS queue.
19+
"""
20+
21+
import argparse
22+
23+
# [START storagetransfer_create_event_driven_aws_transfer]
24+
25+
from google.cloud import storage_transfer
26+
27+
28+
def create_event_driven_aws_transfer(
29+
project_id: str,
30+
description: str,
31+
source_s3_bucket: str,
32+
sink_gcs_bucket: str,
33+
sqs_queue_arn: str,
34+
aws_access_key_id: str,
35+
aws_secret_access_key: str,
36+
):
37+
"""Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""
38+
39+
client = storage_transfer.StorageTransferServiceClient()
40+
41+
# The ID of the Google Cloud Platform Project that owns the job
42+
# project_id = 'my-project-id'
43+
44+
# A description of this job
45+
# description = 'Creates an event-driven transfer that tracks an SQS queue'
46+
47+
# AWS S3 source bucket name
48+
# source_s3_bucket = 'my-s3-source-bucket'
49+
50+
# Google Cloud Storage destination bucket name
51+
# sink_gcs_bucket = 'my-gcs-destination-bucket'
52+
53+
# The ARN of the SQS queue to subscribe to
54+
# pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'
55+
56+
# AWS Access Key ID. Should be accessed via environment variable for security purposes.
57+
# aws_access_key_id = 'AKIA...'
58+
59+
# AWS Secret Access Key. Should be accessed via environment variable for security purposes.
60+
# aws_secret_access_key = 'HEAoMK2.../...ku8'
61+
62+
transfer_job_request = storage_transfer.CreateTransferJobRequest(
63+
{
64+
"transfer_job": {
65+
"project_id": project_id,
66+
"description": description,
67+
"status": storage_transfer.TransferJob.Status.ENABLED,
68+
"transfer_spec": {
69+
"aws_s3_data_source": {
70+
"bucket_name": source_s3_bucket,
71+
"aws_access_key": {
72+
"access_key_id": aws_access_key_id,
73+
"secret_access_key": aws_secret_access_key,
74+
},
75+
},
76+
"gcs_data_sink": {
77+
"bucket_name": sink_gcs_bucket,
78+
},
79+
},
80+
"event_stream": {
81+
"name": sqs_queue_arn,
82+
},
83+
},
84+
}
85+
)
86+
87+
result = client.create_transfer_job(transfer_job_request)
88+
print(f"Created transferJob: {result.name}")
89+
90+
91+
# [END storagetransfer_create_event_driven_aws_transfer]
92+
93+
if __name__ == "__main__":
94+
parser = argparse.ArgumentParser(description=__doc__)
95+
parser.add_argument(
96+
"--project-id",
97+
help="The ID of the Google Cloud Platform Project that owns the job",
98+
required=True,
99+
)
100+
parser.add_argument(
101+
"--description",
102+
help="A useful description for your transfer job",
103+
default="My transfer job",
104+
)
105+
parser.add_argument(
106+
"--source-s3-bucket", help="AWS S3 source bucket name", required=True
107+
)
108+
parser.add_argument(
109+
"--sink-gcs-bucket",
110+
help="Google Cloud Storage destination bucket name",
111+
required=True,
112+
)
113+
parser.add_argument(
114+
"--sqs-queue-arn",
115+
help="The ARN of the AWS SQS queue to track",
116+
required=True,
117+
)
118+
119+
args = parser.parse_args()
120+
121+
create_event_driven_aws_transfer(**vars(args))
+54Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright 2023 Google LLC
2+
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import backoff
16+
from google.api_core.exceptions import RetryError
17+
from google.api_core.exceptions import ServiceUnavailable
18+
from google.cloud.storage import Bucket
19+
20+
import event_driven_aws_transfer
21+
22+
23+
@backoff.on_exception(
24+
backoff.expo,
25+
(
26+
RetryError,
27+
ServiceUnavailable,
28+
),
29+
max_time=60,
30+
)
31+
def test_event_driven_aws_transfer(
32+
capsys,
33+
project_id: str,
34+
job_description_unique: str,
35+
aws_source_bucket: str,
36+
destination_bucket: Bucket,
37+
sqs_queue_arn: str,
38+
aws_access_key_id: str,
39+
aws_secret_access_key: str,
40+
):
41+
event_driven_aws_transfer.create_event_driven_aws_transfer(
42+
project_id=project_id,
43+
description=job_description_unique,
44+
source_s3_bucket=aws_source_bucket,
45+
sink_gcs_bucket=destination_bucket.name,
46+
sqs_queue_arn=sqs_queue_arn,
47+
aws_access_key_id=aws_access_key_id,
48+
aws_secret_access_key=aws_secret_access_key,
49+
)
50+
51+
out, _ = capsys.readouterr()
52+
53+
# Ensure the transferJob has been created
54+
assert "Created transferJob:" in out
+109Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2023 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""
18+
Command-line sample that creates aan event driven transfer between two GCS buckets that tracks a PubSub subscription.
19+
"""
20+
21+
import argparse
22+
23+
# [START storagetransfer_create_event_driven_gcs_transfer]
24+
25+
from google.cloud import storage_transfer
26+
27+
28+
def create_event_driven_gcs_transfer(
29+
project_id: str,
30+
description: str,
31+
source_bucket: str,
32+
sink_bucket: str,
33+
pubsub_id: str,
34+
):
35+
"""Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""
36+
37+
client = storage_transfer.StorageTransferServiceClient()
38+
39+
# The ID of the Google Cloud Platform Project that owns the job
40+
# project_id = 'my-project-id'
41+
42+
# A description of this job
43+
# description = 'Creates an event-driven transfer that tracks a pubsub subscription'
44+
45+
# Google Cloud Storage source bucket name
46+
# source_bucket = 'my-gcs-source-bucket'
47+
48+
# Google Cloud Storage destination bucket name
49+
# sink_bucket = 'my-gcs-destination-bucket'
50+
51+
# The Pubsub Subscription ID to track
52+
# pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'
53+
54+
transfer_job_request = storage_transfer.CreateTransferJobRequest(
55+
{
56+
"transfer_job": {
57+
"project_id": project_id,
58+
"description": description,
59+
"status": storage_transfer.TransferJob.Status.ENABLED,
60+
"transfer_spec": {
61+
"gcs_data_source": {
62+
"bucket_name": source_bucket,
63+
},
64+
"gcs_data_sink": {
65+
"bucket_name": sink_bucket,
66+
},
67+
},
68+
"event_stream": {
69+
"name": pubsub_id,
70+
},
71+
},
72+
}
73+
)
74+
75+
result = client.create_transfer_job(transfer_job_request)
76+
print(f"Created transferJob: {result.name}")
77+
78+
79+
# [END storagetransfer_create_event_driven_gcs_transfer]
80+
81+
if __name__ == "__main__":
82+
parser = argparse.ArgumentParser(description=__doc__)
83+
parser.add_argument(
84+
"--project-id",
85+
help="The ID of the Google Cloud Platform Project that owns the job",
86+
required=True,
87+
)
88+
parser.add_argument(
89+
"--description",
90+
help="A useful description for your transfer job",
91+
default="My transfer job",
92+
)
93+
parser.add_argument(
94+
"--source-bucket", help="Google Cloud Storage source bucket name", required=True
95+
)
96+
parser.add_argument(
97+
"--sink-bucket",
98+
help="Google Cloud Storage destination bucket name",
99+
required=True,
100+
)
101+
parser.add_argument(
102+
"--pubsub-id",
103+
help="The subscription ID of the PubSub queue to track",
104+
required=True,
105+
)
106+
107+
args = parser.parse_args()
108+
109+
create_event_driven_gcs_transfer(**vars(args))
+50Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2023 Google LLC
2+
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import backoff
16+
from google.api_core.exceptions import RetryError
17+
from google.api_core.exceptions import ServiceUnavailable
18+
from google.cloud.storage import Bucket
19+
20+
import event_driven_gcs_transfer
21+
22+
23+
@backoff.on_exception(
24+
backoff.expo,
25+
(
26+
RetryError,
27+
ServiceUnavailable,
28+
),
29+
max_time=60,
30+
)
31+
def test_event_driven_gcs_transfer(
32+
capsys,
33+
project_id: str,
34+
job_description_unique: str,
35+
source_bucket: Bucket,
36+
destination_bucket: Bucket,
37+
pubsub_id: str,
38+
):
39+
event_driven_gcs_transfer.create_event_driven_gcs_transfer(
40+
project_id=project_id,
41+
description=job_description_unique,
42+
source_bucket=source_bucket.name,
43+
sink_bucket=destination_bucket.name,
44+
pubsub_id=pubsub_id,
45+
)
46+
47+
out, _ = capsys.readouterr()
48+
49+
# Ensure the transferJob has been created
50+
assert "Created transferJob:" in out

‎storagetransfer/requirements-test.txt

Copy file name to clipboardExpand all lines: storagetransfer/requirements-test.txt
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ azure-storage-blob==12.16.0
22
backoff==2.2.1; python_version < "3.7"
33
backoff==2.2.1; python_version >= "3.7"
44
boto3==1.26.150
5+
google-cloud-pubsub==2.18.4
56
google-cloud-storage==2.9.0; python_version < '3.7'
67
google-cloud-storage==2.9.0; python_version > '3.6'
78
google-cloud-secret-manager==2.16.1

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.