Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Feb 23, 2024. It is now read-only.

Commit cb7e900

Browse filesBrowse files
Music Leebusunkim96
authored andcommitted
test(translation): add VPC-SC system tests (#9272)
1 parent 42224d0 commit cb7e900
Copy full SHA for cb7e900

File tree

Expand file treeCollapse file tree

1 file changed

+166
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

1 file changed

+166
-0
lines changed
Open diff view settings
Collapse file

‎tests/system/test_vpcsc.py‎

Copy file name to clipboard
+166Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
"""Unit tests for VPC-SC."""
17+
18+
import os
19+
import pytest
20+
21+
from google.api_core import exceptions
22+
from google.cloud import translate_v3beta1
23+
24+
25+
IS_INSIDE_VPCSC = "GOOGLE_CLOUD_TESTS_IN_VPCSC" in os.environ
26+
# If IS_INSIDE_VPCSC is set, these environment variables should also be set
27+
if IS_INSIDE_VPCSC:
28+
PROJECT_INSIDE = os.environ["PROJECT_ID"]
29+
PROJECT_OUTSIDE = os.environ["GOOGLE_CLOUD_TESTS_VPCSC_OUTSIDE_PERIMETER_PROJECT"]
30+
31+
32+
class TestVPCServiceControl(object):
33+
@classmethod
34+
def setup(self):
35+
self._client = translate_v3beta1.TranslationServiceClient()
36+
self._parent_inside = self._client.location_path(PROJECT_INSIDE, "us-central1")
37+
self._parent_outside = self._client.location_path(
38+
PROJECT_OUTSIDE, "us-central1"
39+
)
40+
41+
def make_glossary_name(project_id):
42+
return "projects/{0}/locations/us-central1/glossaries/fake_glossary".format(
43+
project_id
44+
)
45+
46+
self._glossary_name_inside = make_glossary_name(PROJECT_INSIDE)
47+
self._glossary_name_outside = make_glossary_name(PROJECT_OUTSIDE)
48+
49+
@staticmethod
50+
def _is_rejected(call):
51+
try:
52+
responses = call()
53+
print("responses: ", responses)
54+
except exceptions.PermissionDenied as e:
55+
print("PermissionDenied Exception: ", e)
56+
return e.message == "Request is prohibited by organization's policy"
57+
except Exception as e:
58+
print("Other Exception: ", e)
59+
pass
60+
return False
61+
62+
@staticmethod
63+
def _do_test(delayed_inside, delayed_outside):
64+
assert TestVPCServiceControl._is_rejected(delayed_outside)
65+
assert not (TestVPCServiceControl._is_rejected(delayed_inside))
66+
67+
@pytest.mark.skipif(
68+
not IS_INSIDE_VPCSC,
69+
reason="This test must be run in VPCSC. To enable this test, set the environment variable GOOGLE_CLOUD_TESTS_IN_VPCSC to True",
70+
)
71+
def test_create_glossary(self):
72+
def make_glossary(project_id):
73+
return {
74+
"name": "projects/{0}/locations/us-central1/glossaries/fake_glossary".format(
75+
project_id
76+
),
77+
"language_codes_set": {"language_codes": ["en", "ja"]},
78+
"input_config": {
79+
"gcs_source": {"input_uri": "gs://fake-bucket/fake_glossary.csv"}
80+
},
81+
}
82+
83+
glossary_inside = make_glossary(PROJECT_INSIDE)
84+
85+
def delayed_inside():
86+
return self._client.create_glossary(self._parent_inside, glossary_inside)
87+
88+
glossary_outside = make_glossary(PROJECT_OUTSIDE)
89+
90+
def delayed_outside():
91+
return self._client.create_glossary(self._parent_outside, glossary_outside)
92+
93+
TestVPCServiceControl._do_test(delayed_inside, delayed_outside)
94+
95+
@pytest.mark.skipif(
96+
not IS_INSIDE_VPCSC,
97+
reason="This test must be run in VPCSC. To enable this test, set the environment variable GOOGLE_CLOUD_TESTS_IN_VPCSC to True",
98+
)
99+
def test_list_glossaries(self):
100+
# list_glossaries() returns an GRPCIterator instance, and we need to actually iterate through it
101+
# by calling _next_page() to get real response.
102+
def delayed_inside():
103+
return self._client.list_glossaries(self._parent_inside)._next_page()
104+
105+
def delayed_outside():
106+
return self._client.list_glossaries(self._parent_outside)._next_page()
107+
108+
TestVPCServiceControl._do_test(delayed_inside, delayed_outside)
109+
110+
@pytest.mark.skipif(
111+
not IS_INSIDE_VPCSC,
112+
reason="This test must be run in VPCSC. To enable this test, set the environment variable GOOGLE_CLOUD_TESTS_IN_VPCSC to True",
113+
)
114+
def test_get_glossary(self):
115+
def delayed_inside():
116+
return self._client.get_glossary(self._glossary_name_inside)
117+
118+
def delayed_outside():
119+
return self._client.get_glossary(self._glossary_name_outside)
120+
121+
TestVPCServiceControl._do_test(delayed_inside, delayed_outside)
122+
123+
@pytest.mark.skipif(
124+
not IS_INSIDE_VPCSC,
125+
reason="This test must be run in VPCSC. To enable this test, set the environment variable GOOGLE_CLOUD_TESTS_IN_VPCSC to True",
126+
)
127+
def test_delete_glossary(self):
128+
def delayed_inside():
129+
return self._client.delete_glossary(self._glossary_name_inside)
130+
131+
def delayed_outside():
132+
return self._client.delete_glossary(self._glossary_name_outside)
133+
134+
TestVPCServiceControl._do_test(delayed_inside, delayed_outside)
135+
136+
@pytest.mark.skipif(
137+
not IS_INSIDE_VPCSC,
138+
reason="This test must be run in VPCSC. To enable this test, set the environment variable GOOGLE_CLOUD_TESTS_IN_VPCSC to True",
139+
)
140+
def test_batch_translate_text(self):
141+
source_language_code = "en"
142+
target_language_codes = ["es"]
143+
input_configs = [{"gcs_source": {"input_uri": "gs://fake-bucket/*"}}]
144+
output_config = {
145+
"gcs_destination": {"output_uri_prefix": "gs://fake-bucket/output/"}
146+
}
147+
148+
def delayed_inside():
149+
return self._client.batch_translate_text(
150+
self._parent_inside,
151+
source_language_code,
152+
target_language_codes,
153+
input_configs,
154+
output_config,
155+
)
156+
157+
def delayed_outside():
158+
return self._client.batch_translate_text(
159+
self._parent_outside,
160+
source_language_code,
161+
target_language_codes,
162+
input_configs,
163+
output_config,
164+
)
165+
166+
TestVPCServiceControl._do_test(delayed_inside, delayed_outside)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.