diff --git a/CHANGELOG.md b/CHANGELOG.md index 2122c34d8..a974e5256 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ [1]: https://pypi.org/project/google-auth/#history +## [2.14.0](https://github.com/googleapis/google-auth-library-python/compare/v2.13.0...v2.14.0) (2022-10-31) + + +### Features + +* Add token_info_url to external account credentials ([#1168](https://github.com/googleapis/google-auth-library-python/issues/1168)) ([9adee75](https://github.com/googleapis/google-auth-library-python/commit/9adee75712202234aa0b124a9ca0424654022428)) +* Read Quota Project from Environment Variable ([#1163](https://github.com/googleapis/google-auth-library-python/issues/1163)) ([57b3e42](https://github.com/googleapis/google-auth-library-python/commit/57b3e424927a5d86fbab8b231109a5aae1233745)) + + +### Bug Fixes + +* Adding more properties to external_account_authorized_user ([#1169](https://github.com/googleapis/google-auth-library-python/issues/1169)) ([a12b96d](https://github.com/googleapis/google-auth-library-python/commit/a12b96dcfa7cb58d9171fd7f2a7ea8331a228419)) + ## [2.13.0](https://github.com/googleapis/google-auth-library-python/compare/v2.12.0...v2.13.0) (2022-10-14) diff --git a/google/auth/_default.py b/google/auth/_default.py index bef09659b..8fe168428 100644 --- a/google/auth/_default.py +++ b/google/auth/_default.py @@ -479,6 +479,8 @@ def _get_gdch_service_account_credentials(filename, info): def _apply_quota_project_id(credentials, quota_project_id): if quota_project_id: credentials = credentials.with_quota_project(quota_project_id) + else: + credentials = credentials.with_quota_project_from_environment() from google.oauth2 import credentials as authorized_user_credentials diff --git a/google/auth/credentials.py b/google/auth/credentials.py index 2735892d4..ca1032a14 100644 --- a/google/auth/credentials.py +++ b/google/auth/credentials.py @@ -16,10 +16,11 @@ """Interfaces for credentials.""" import abc +import os import six -from google.auth import _helpers +from google.auth import _helpers, environment_vars @six.add_metaclass(abc.ABCMeta) @@ -149,6 +150,12 @@ def with_quota_project(self, quota_project_id): """ raise NotImplementedError("This credential does not support quota project.") + def with_quota_project_from_environment(self): + quota_from_env = os.environ.get(environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT) + if quota_from_env: + return self.with_quota_project(quota_from_env) + return self + class CredentialsWithTokenUri(Credentials): """Abstract base for credentials supporting ``with_token_uri`` factory""" diff --git a/google/auth/environment_vars.py b/google/auth/environment_vars.py index c076dc59d..81f31571e 100644 --- a/google/auth/environment_vars.py +++ b/google/auth/environment_vars.py @@ -29,6 +29,10 @@ situations (such as Google App Engine). """ +GOOGLE_CLOUD_QUOTA_PROJECT = "GOOGLE_CLOUD_QUOTA_PROJECT" +"""Environment variable defining the project to be used for +quota and billing.""" + CREDENTIALS = "GOOGLE_APPLICATION_CREDENTIALS" """Environment variable defining the location of Google application default credentials.""" diff --git a/google/auth/external_account.py b/google/auth/external_account.py index c1ba5efa0..7edb55f63 100644 --- a/google/auth/external_account.py +++ b/google/auth/external_account.py @@ -78,6 +78,7 @@ def __init__( service_account_impersonation_options=None, client_id=None, client_secret=None, + token_info_url=None, quota_project_id=None, scopes=None, default_scopes=None, @@ -94,6 +95,7 @@ def __init__( impersonation generateAccessToken URL. client_id (Optional[str]): The optional client ID. client_secret (Optional[str]): The optional client secret. + token_info_url (str): The optional STS endpoint URL for token introspection. quota_project_id (Optional[str]): The optional quota project ID. scopes (Optional[Sequence[str]]): Optional scopes to request during the authorization grant. @@ -112,6 +114,7 @@ def __init__( self._audience = audience self._subject_token_type = subject_token_type self._token_url = token_url + self._token_info_url = token_info_url self._credential_source = credential_source self._service_account_impersonation_url = service_account_impersonation_url self._service_account_impersonation_options = ( @@ -125,6 +128,8 @@ def __init__( self._workforce_pool_user_project = workforce_pool_user_project Credentials.validate_token_url(token_url) + if token_info_url: + Credentials.validate_token_url(token_info_url, url_type="token info") if service_account_impersonation_url: Credentials.validate_service_account_impersonation_url( service_account_impersonation_url @@ -161,13 +166,25 @@ def info(self): useful for serializing the current credentials so it can deserialized later. """ - config_info = { - "type": _EXTERNAL_ACCOUNT_JSON_TYPE, + config_info = self._constructor_args() + config_info.update( + type=_EXTERNAL_ACCOUNT_JSON_TYPE, + service_account_impersonation=config_info.pop( + "service_account_impersonation_options", None + ), + ) + config_info.pop("scopes", None) + config_info.pop("default_scopes", None) + return {key: value for key, value in config_info.items() if value is not None} + + def _constructor_args(self): + args = { "audience": self._audience, "subject_token_type": self._subject_token_type, "token_url": self._token_url, + "token_info_url": self._token_info_url, "service_account_impersonation_url": self._service_account_impersonation_url, - "service_account_impersonation": copy.deepcopy( + "service_account_impersonation_options": copy.deepcopy( self._service_account_impersonation_options ) or None, @@ -176,8 +193,12 @@ def info(self): "client_id": self._client_id, "client_secret": self._client_secret, "workforce_pool_user_project": self._workforce_pool_user_project, + "scopes": self._scopes, + "default_scopes": self._default_scopes, } - return {key: value for key, value in config_info.items() if value is not None} + if not self.is_workforce_pool: + args.pop("workforce_pool_user_project") + return args @property def service_account_email(self): @@ -255,25 +276,17 @@ def project_number(self): except ValueError: return None + @property + def token_info_url(self): + """Optional[str]: The STS token introspection endpoint.""" + + return self._token_info_url + @_helpers.copy_docstring(credentials.Scoped) def with_scopes(self, scopes, default_scopes=None): - d = dict( - audience=self._audience, - subject_token_type=self._subject_token_type, - token_url=self._token_url, - credential_source=self._credential_source, - service_account_impersonation_url=self._service_account_impersonation_url, - service_account_impersonation_options=self._service_account_impersonation_options, - client_id=self._client_id, - client_secret=self._client_secret, - quota_project_id=self._quota_project_id, - scopes=scopes, - default_scopes=default_scopes, - workforce_pool_user_project=self._workforce_pool_user_project, - ) - if not self.is_workforce_pool: - d.pop("workforce_pool_user_project") - return self.__class__(**d) + kwargs = self._constructor_args() + kwargs.update(scopes=scopes, default_scopes=default_scopes) + return self.__class__(**kwargs) @abc.abstractmethod def retrieve_subject_token(self, request): @@ -368,43 +381,15 @@ def refresh(self, request): @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) def with_quota_project(self, quota_project_id): # Return copy of instance with the provided quota project ID. - d = dict( - audience=self._audience, - subject_token_type=self._subject_token_type, - token_url=self._token_url, - credential_source=self._credential_source, - service_account_impersonation_url=self._service_account_impersonation_url, - service_account_impersonation_options=self._service_account_impersonation_options, - client_id=self._client_id, - client_secret=self._client_secret, - quota_project_id=quota_project_id, - scopes=self._scopes, - default_scopes=self._default_scopes, - workforce_pool_user_project=self._workforce_pool_user_project, - ) - if not self.is_workforce_pool: - d.pop("workforce_pool_user_project") - return self.__class__(**d) + kwargs = self._constructor_args() + kwargs.update(quota_project_id=quota_project_id) + return self.__class__(**kwargs) @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) def with_token_uri(self, token_uri): - d = dict( - audience=self._audience, - subject_token_type=self._subject_token_type, - token_url=token_uri, - credential_source=self._credential_source, - service_account_impersonation_url=self._service_account_impersonation_url, - service_account_impersonation_options=self._service_account_impersonation_options, - client_id=self._client_id, - client_secret=self._client_secret, - quota_project_id=self._quota_project_id, - scopes=self._scopes, - default_scopes=self._default_scopes, - workforce_pool_user_project=self._workforce_pool_user_project, - ) - if not self.is_workforce_pool: - d.pop("workforce_pool_user_project") - return self.__class__(**d) + kwargs = self._constructor_args() + kwargs.update(token_url=token_uri) + return self.__class__(**kwargs) def _initialize_impersonated_credentials(self): """Generates an impersonated credentials. @@ -422,23 +407,12 @@ def _initialize_impersonated_credentials(self): endpoint returned an error. """ # Return copy of instance with no service account impersonation. - d = dict( - audience=self._audience, - subject_token_type=self._subject_token_type, - token_url=self._token_url, - credential_source=self._credential_source, + kwargs = self._constructor_args() + kwargs.update( service_account_impersonation_url=None, service_account_impersonation_options={}, - client_id=self._client_id, - client_secret=self._client_secret, - quota_project_id=self._quota_project_id, - scopes=self._scopes, - default_scopes=self._default_scopes, - workforce_pool_user_project=self._workforce_pool_user_project, ) - if not self.is_workforce_pool: - d.pop("workforce_pool_user_project") - source_credentials = self.__class__(**d) + source_credentials = self.__class__(**kwargs) # Determine target_principal. target_principal = self.service_account_email @@ -461,7 +435,7 @@ def _initialize_impersonated_credentials(self): ) @staticmethod - def validate_token_url(token_url): + def validate_token_url(token_url, url_type="token"): _TOKEN_URL_PATTERNS = [ "^[^\\.\\s\\/\\\\]+\\.sts\\.googleapis\\.com$", "^sts\\.googleapis\\.com$", @@ -471,7 +445,7 @@ def validate_token_url(token_url): ] if not Credentials.is_valid_url(_TOKEN_URL_PATTERNS, token_url): - raise ValueError("The provided token URL is invalid.") + raise ValueError("The provided {} URL is invalid.".format(url_type)) @staticmethod def validate_service_account_impersonation_url(url): @@ -530,6 +504,7 @@ def from_info(cls, info, **kwargs): audience=info.get("audience"), subject_token_type=info.get("subject_token_type"), token_url=info.get("token_url"), + token_info_url=info.get("token_info_url"), service_account_impersonation_url=info.get( "service_account_impersonation_url" ), diff --git a/google/auth/external_account_authorized_user.py b/google/auth/external_account_authorized_user.py index c0ffc49f3..51e7f2058 100644 --- a/google/auth/external_account_authorized_user.py +++ b/google/auth/external_account_authorized_user.py @@ -73,6 +73,7 @@ def __init__( token_url=None, token_info_url=None, revoke_url=None, + scopes=None, quota_project_id=None, ): """Instantiates a external account authorized user credentials object. @@ -90,8 +91,8 @@ def __init__( None if the token can not be refreshed. client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be left as None if the token can not be refreshed. - token_url (str): The optional STS token exchange endpoint. Must be specified fro refresh, - can be leftas None if the token can not be refreshed. + token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for + refresh, can be left as None if the token can not be refreshed. token_info_url (str): The optional STS endpoint URL for token introspection. revoke_url (str): The optional STS endpoint URL for revoking tokens. quota_project_id (str): The optional project ID used for quota and billing. @@ -102,9 +103,6 @@ def __init__( google.auth.external_account_authorized_user.Credentials: The constructed credentials. """ - if not any((refresh_token, token)): - raise ValueError("Either `refresh_token` or `token` should be set.") - super(Credentials, self).__init__() self.token = token @@ -117,6 +115,14 @@ def __init__( self._client_secret = client_secret self._revoke_url = revoke_url self._quota_project_id = quota_project_id + self._scopes = scopes + + if not self.valid and not self.can_refresh: + raise ValueError( + "Token should be created with fields to make it valid (`token` and " + "`expiry`), or fields to allow it to refresh (`refresh_token`, " + "`token_url`, `client_id`, `client_secret`)." + ) self._client_auth = None if self._client_id: @@ -154,20 +160,68 @@ def constructor_args(self): "token": self.token, "expiry": self.expiry, "revoke_url": self._revoke_url, + "scopes": self._scopes, "quota_project_id": self._quota_project_id, } + @property + def scopes(self): + """Optional[str]: The OAuth 2.0 permission scopes.""" + return self._scopes + @property def requires_scopes(self): """ False: OAuth 2.0 credentials have their scopes set when the initial token is requested and can not be changed.""" return False + @property + def client_id(self): + """Optional[str]: The OAuth 2.0 client ID.""" + return self._client_id + + @property + def client_secret(self): + """Optional[str]: The OAuth 2.0 client secret.""" + return self._client_secret + + @property + def audience(self): + """Optional[str]: The STS audience which contains the resource name for the + workforce pool and the provider identifier in that pool.""" + return self._audience + + @property + def refresh_token(self): + """Optional[str]: The OAuth 2.0 refresh token.""" + return self._refresh_token + + @property + def token_url(self): + """Optional[str]: The STS token exchange endpoint for refresh.""" + return self._token_url + + @property + def token_info_url(self): + """Optional[str]: The STS endpoint for token info.""" + return self._token_info_url + + @property + def revoke_url(self): + """Optional[str]: The STS endpoint for token revocation.""" + return self._revoke_url + @property def is_user(self): """ True: This credential always represents a user.""" return True + @property + def can_refresh(self): + return all( + (self._refresh_token, self._token_url, self._client_id, self._client_secret) + ) + def get_project_id(self): """Retrieves the project ID corresponding to the workload identity or workforce pool. For workforce pool credentials, it returns the project ID corresponding to @@ -203,9 +257,7 @@ def refresh(self, request): google.auth.exceptions.RefreshError: If the credentials could not be refreshed. """ - if not all( - (self._refresh_token, self._token_url, self._client_id, self._client_secret) - ): + if not self.can_refresh: raise exceptions.RefreshError( "The credentials do not contain the necessary fields need to " "refresh the access token. You must specify refresh_token, " @@ -270,6 +322,7 @@ def from_info(cls, info, **kwargs): expiry=expiry, revoke_url=info.get("revoke_url"), quota_project_id=info.get("quota_project_id"), + scopes=info.get("scopes"), **kwargs ) diff --git a/google/auth/version.py b/google/auth/version.py index b5e62bac3..6f8afc72a 100644 --- a/google/auth/version.py +++ b/google/auth/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.13.0" +__version__ = "2.14.0" diff --git a/system_tests/secrets.tar.enc b/system_tests/secrets.tar.enc index 69e4225b8..abfaddd92 100644 Binary files a/system_tests/secrets.tar.enc and b/system_tests/secrets.tar.enc differ diff --git a/tests/test__default.py b/tests/test__default.py index 11d87f4cb..26b41b995 100644 --- a/tests/test__default.py +++ b/tests/test__default.py @@ -1214,3 +1214,23 @@ def test_default_gdch_service_account_credentials(get_adc_path): assert creds._token_uri == "https://service-identity./authenticate" assert creds._ca_cert_path == "/path/to/ca/cert" assert project == "project_foo" + + +@mock.patch.dict(os.environ) +@mock.patch( + "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True +) +def test_quota_project_from_environment(get_adc_path): + get_adc_path.return_value = AUTHORIZED_USER_CLOUD_SDK_WITH_QUOTA_PROJECT_ID_FILE + + credentials, _ = _default.default(quota_project_id=None) + assert credentials.quota_project_id == "quota_project_id" + + quota_from_env = "quota_from_env" + os.environ[environment_vars.GOOGLE_CLOUD_QUOTA_PROJECT] = quota_from_env + credentials, _ = _default.default(quota_project_id=None) + assert credentials.quota_project_id == quota_from_env + + explicit_quota = "explicit_quota" + credentials, _ = _default.default(quota_project_id=explicit_quota) + assert credentials.quota_project_id == explicit_quota diff --git a/tests/test_aws.py b/tests/test_aws.py index 0a451f3eb..85f5e8dd4 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -32,13 +32,19 @@ # Base64 encoding of "username:password". BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" +SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = ( + "https://us-east1-iamcredentials.googleapis.com" +) +SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( + SERVICE_ACCOUNT_EMAIL +) SERVICE_ACCOUNT_IMPERSONATION_URL = ( - "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" - + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) + SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE ) QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" SCOPES = ["scope1", "scope2"] TOKEN_URL = "https://sts.googleapis.com/v1/token" +TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect" SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request" AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone" @@ -56,6 +62,89 @@ REQUEST_PARAMS = '{"KeySchema":[{"KeyType":"HASH","AttributeName":"Id"}],"TableName":"TestTable","AttributeDefinitions":[{"AttributeName":"Id","AttributeType":"S"}],"ProvisionedThroughput":{"WriteCapacityUnits":5,"ReadCapacityUnits":5}}' # Each tuple contains the following entries: # region, time, credentials, original_request, signed_request + +VALID_TOKEN_URLS = [ + "https://sts.googleapis.com", + "https://us-east-1.sts.googleapis.com", + "https://US-EAST-1.sts.googleapis.com", + "https://sts.us-east-1.googleapis.com", + "https://sts.US-WEST-1.googleapis.com", + "https://us-east-1-sts.googleapis.com", + "https://US-WEST-1-sts.googleapis.com", + "https://us-west-1-sts.googleapis.com/path?query", + "https://sts-us-east-1.p.googleapis.com", +] +INVALID_TOKEN_URLS = [ + "https://iamcredentials.googleapis.com", + "sts.googleapis.com", + "https://", + "http://sts.googleapis.com", + "https://st.s.googleapis.com", + "https://us-eas\t-1.sts.googleapis.com", + "https:/us-east-1.sts.googleapis.com", + "https://US-WE/ST-1-sts.googleapis.com", + "https://sts-us-east-1.googleapis.com", + "https://sts-US-WEST-1.googleapis.com", + "testhttps://us-east-1.sts.googleapis.com", + "https://us-east-1.sts.googleapis.comevil.com", + "https://us-east-1.us-east-1.sts.googleapis.com", + "https://us-ea.s.t.sts.googleapis.com", + "https://sts.googleapis.comevil.com", + "hhttps://us-east-1.sts.googleapis.com", + "https://us- -1.sts.googleapis.com", + "https://-sts.googleapis.com", + "https://us-east-1.sts.googleapis.com.evil.com", + "https://sts.pgoogleapis.com", + "https://p.googleapis.com", + "https://sts.p.com", + "http://sts.p.googleapis.com", + "https://xyz-sts.p.googleapis.com", + "https://sts-xyz.123.p.googleapis.com", + "https://sts-xyz.p1.googleapis.com", + "https://sts-xyz.p.foo.com", + "https://sts-xyz.p.foo.googleapis.com", +] +VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [ + "https://iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.com", + "https://US-EAST-1.iamcredentials.googleapis.com", + "https://iamcredentials.us-east-1.googleapis.com", + "https://iamcredentials.US-WEST-1.googleapis.com", + "https://us-east-1-iamcredentials.googleapis.com", + "https://US-WEST-1-iamcredentials.googleapis.com", + "https://us-west-1-iamcredentials.googleapis.com/path?query", + "https://iamcredentials-us-east-1.p.googleapis.com", +] +INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [ + "https://sts.googleapis.com", + "iamcredentials.googleapis.com", + "https://", + "http://iamcredentials.googleapis.com", + "https://iamcre.dentials.googleapis.com", + "https://us-eas\t-1.iamcredentials.googleapis.com", + "https:/us-east-1.iamcredentials.googleapis.com", + "https://US-WE/ST-1-iamcredentials.googleapis.com", + "https://iamcredentials-us-east-1.googleapis.com", + "https://iamcredentials-US-WEST-1.googleapis.com", + "testhttps://us-east-1.iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.comevil.com", + "https://us-east-1.us-east-1.iamcredentials.googleapis.com", + "https://us-ea.s.t.iamcredentials.googleapis.com", + "https://iamcredentials.googleapis.comevil.com", + "hhttps://us-east-1.iamcredentials.googleapis.com", + "https://us- -1.iamcredentials.googleapis.com", + "https://-iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.com.evil.com", + "https://iamcredentials.pgoogleapis.com", + "https://p.googleapis.com", + "https://iamcredentials.p.com", + "http://iamcredentials.p.googleapis.com", + "https://xyz-iamcredentials.p.googleapis.com", + "https://iamcredentials-xyz.123.p.googleapis.com", + "https://iamcredentials-xyz.p1.googleapis.com", + "https://iamcredentials-xyz.p.foo.com", + "https://iamcredentials-xyz.p.foo.googleapis.com", +] TEST_FIXTURES = [ # GET request (AWS botocore tests). # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.req @@ -727,6 +816,8 @@ def make_mock_request( def make_credentials( cls, credential_source, + token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, client_id=None, client_secret=None, quota_project_id=None, @@ -737,7 +828,8 @@ def make_credentials( return aws.Credentials( audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, - token_url=TOKEN_URL, + token_url=token_url, + token_info_url=token_info_url, service_account_impersonation_url=service_account_impersonation_url, credential_source=credential_source, client_id=client_id, @@ -796,6 +888,7 @@ def test_from_info_full_options(self, mock_init): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, "service_account_impersonation": {"token_lifetime_seconds": 2800}, "client_id": CLIENT_ID, @@ -811,6 +904,7 @@ def test_from_info_full_options(self, mock_init): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, client_id=CLIENT_ID, @@ -837,6 +931,7 @@ def test_from_info_required_options_only(self, mock_init): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=None, service_account_impersonation_url=None, service_account_impersonation_options={}, client_id=None, @@ -852,6 +947,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, "service_account_impersonation": {"token_lifetime_seconds": 2800}, "client_id": CLIENT_ID, @@ -869,6 +965,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, client_id=CLIENT_ID, @@ -896,6 +993,7 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=None, service_account_impersonation_url=None, service_account_impersonation_options={}, client_id=None, @@ -954,9 +1052,89 @@ def test_info(self): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "credential_source": self.CREDENTIAL_SOURCE, } + def test_token_info_url(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy() + ) + + assert credentials.token_info_url == TOKEN_INFO_URL + + def test_token_info_url_custom(self): + for url in VALID_TOKEN_URLS: + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy(), + token_info_url=(url + "/introspect"), + ) + + assert credentials.token_info_url == (url + "/introspect") + + def test_token_info_url_bad(self): + for url in INVALID_TOKEN_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy(), + token_info_url=(url + "/introspect"), + ) + + assert excinfo.match(r"The provided token info URL is invalid\.") + + def test_token_info_url_negative(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy(), token_info_url=None + ) + + assert not credentials.token_info_url + + def test_token_url_custom(self): + for url in VALID_TOKEN_URLS: + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy(), + token_url=(url + "/token"), + ) + + assert credentials._token_url == (url + "/token") + + def test_token_url_bad(self): + for url in INVALID_TOKEN_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy(), + token_url=(url + "/token"), + ) + + assert excinfo.match(r"The provided token URL is invalid\.") + + def test_service_account_impersonation_url_custom(self): + for url in VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS: + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy(), + service_account_impersonation_url=( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ), + ) + + assert credentials._service_account_impersonation_url == ( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ) + + def test_service_account_impersonation_url_bad(self): + for url in INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE.copy(), + service_account_impersonation_url=( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ), + ) + + assert excinfo.match( + r"The provided service account impersonation URL is invalid\." + ) + def test_retrieve_subject_token_missing_region_url(self): # When AWS_REGION envvar is not available, region_url is required for # determining the current AWS region. diff --git a/tests/test_external_account.py b/tests/test_external_account.py index 468152e05..18ac75511 100644 --- a/tests/test_external_account.py +++ b/tests/test_external_account.py @@ -65,37 +65,93 @@ "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id", ] +VALID_TOKEN_URLS = [ + "https://sts.googleapis.com", + "https://us-east-1.sts.googleapis.com", + "https://US-EAST-1.sts.googleapis.com", + "https://sts.us-east-1.googleapis.com", + "https://sts.US-WEST-1.googleapis.com", + "https://us-east-1-sts.googleapis.com", + "https://US-WEST-1-sts.googleapis.com", + "https://us-west-1-sts.googleapis.com/path?query", + "https://sts-us-east-1.p.googleapis.com", +] +INVALID_TOKEN_URLS = [ + "https://iamcredentials.googleapis.com", + "sts.googleapis.com", + "https://", + "http://sts.googleapis.com", + "https://st.s.googleapis.com", + "https://us-eas\t-1.sts.googleapis.com", + "https:/us-east-1.sts.googleapis.com", + "https://US-WE/ST-1-sts.googleapis.com", + "https://sts-us-east-1.googleapis.com", + "https://sts-US-WEST-1.googleapis.com", + "testhttps://us-east-1.sts.googleapis.com", + "https://us-east-1.sts.googleapis.comevil.com", + "https://us-east-1.us-east-1.sts.googleapis.com", + "https://us-ea.s.t.sts.googleapis.com", + "https://sts.googleapis.comevil.com", + "hhttps://us-east-1.sts.googleapis.com", + "https://us- -1.sts.googleapis.com", + "https://-sts.googleapis.com", + "https://us-east-1.sts.googleapis.com.evil.com", + "https://sts.pgoogleapis.com", + "https://p.googleapis.com", + "https://sts.p.com", + "http://sts.p.googleapis.com", + "https://xyz-sts.p.googleapis.com", + "https://sts-xyz.123.p.googleapis.com", + "https://sts-xyz.p1.googleapis.com", + "https://sts-xyz.p.foo.com", + "https://sts-xyz.p.foo.googleapis.com", +] +VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [ + "https://iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.com", + "https://US-EAST-1.iamcredentials.googleapis.com", + "https://iamcredentials.us-east-1.googleapis.com", + "https://iamcredentials.US-WEST-1.googleapis.com", + "https://us-east-1-iamcredentials.googleapis.com", + "https://US-WEST-1-iamcredentials.googleapis.com", + "https://us-west-1-iamcredentials.googleapis.com/path?query", + "https://iamcredentials-us-east-1.p.googleapis.com", +] +INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [ + "https://sts.googleapis.com", + "iamcredentials.googleapis.com", + "https://", + "http://iamcredentials.googleapis.com", + "https://iamcre.dentials.googleapis.com", + "https://us-eas\t-1.iamcredentials.googleapis.com", + "https:/us-east-1.iamcredentials.googleapis.com", + "https://US-WE/ST-1-iamcredentials.googleapis.com", + "https://iamcredentials-us-east-1.googleapis.com", + "https://iamcredentials-US-WEST-1.googleapis.com", + "testhttps://us-east-1.iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.comevil.com", + "https://us-east-1.us-east-1.iamcredentials.googleapis.com", + "https://us-ea.s.t.iamcredentials.googleapis.com", + "https://iamcredentials.googleapis.comevil.com", + "hhttps://us-east-1.iamcredentials.googleapis.com", + "https://us- -1.iamcredentials.googleapis.com", + "https://-iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.com.evil.com", + "https://iamcredentials.pgoogleapis.com", + "https://p.googleapis.com", + "https://iamcredentials.p.com", + "http://iamcredentials.p.googleapis.com", + "https://xyz-iamcredentials.p.googleapis.com", + "https://iamcredentials-xyz.123.p.googleapis.com", + "https://iamcredentials-xyz.p1.googleapis.com", + "https://iamcredentials-xyz.p.foo.com", + "https://iamcredentials-xyz.p.foo.googleapis.com", +] + class CredentialsImpl(external_account.Credentials): - def __init__( - self, - audience, - subject_token_type, - token_url, - credential_source, - service_account_impersonation_url=None, - service_account_impersonation_options={}, - client_id=None, - client_secret=None, - quota_project_id=None, - scopes=None, - default_scopes=None, - workforce_pool_user_project=None, - ): - super(CredentialsImpl, self).__init__( - audience=audience, - subject_token_type=subject_token_type, - token_url=token_url, - credential_source=credential_source, - service_account_impersonation_url=service_account_impersonation_url, - service_account_impersonation_options=service_account_impersonation_options, - client_id=client_id, - client_secret=client_secret, - quota_project_id=quota_project_id, - scopes=scopes, - default_scopes=default_scopes, - workforce_pool_user_project=workforce_pool_user_project, - ) + def __init__(self, **kwargs): + super(CredentialsImpl, self).__init__(**kwargs) self._counter = 0 def retrieve_subject_token(self, request): @@ -106,6 +162,7 @@ def retrieve_subject_token(self, request): class TestCredentials(object): TOKEN_URL = "https://sts.googleapis.com/v1/token" + TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect" PROJECT_NUMBER = "123456" POOL_ID = "POOL_ID" PROVIDER_ID = "PROVIDER_ID" @@ -165,6 +222,7 @@ def make_credentials( client_id=None, client_secret=None, quota_project_id=None, + token_info_url=None, scopes=None, default_scopes=None, service_account_impersonation_url=None, @@ -174,6 +232,7 @@ def make_credentials( audience=cls.AUDIENCE, subject_token_type=cls.SUBJECT_TOKEN_TYPE, token_url=cls.TOKEN_URL, + token_info_url=token_info_url, service_account_impersonation_url=service_account_impersonation_url, service_account_impersonation_options=service_account_impersonation_options, credential_source=cls.CREDENTIAL_SOURCE, @@ -280,53 +339,14 @@ def assert_resource_manager_request_kwargs( assert "body" not in request_kwargs def test_valid_token_url_shall_pass_validation(self): - valid_urls = [ - "https://sts.googleapis.com", - "https://us-east-1.sts.googleapis.com", - "https://US-EAST-1.sts.googleapis.com", - "https://sts.us-east-1.googleapis.com", - "https://sts.US-WEST-1.googleapis.com", - "https://us-east-1-sts.googleapis.com", - "https://US-WEST-1-sts.googleapis.com", - "https://us-west-1-sts.googleapis.com/path?query", - "https://sts-us-east-1.p.googleapis.com", - ] + valid_urls = VALID_TOKEN_URLS for url in valid_urls: # A valid url shouldn't throw exception and a None value should be returned external_account.Credentials.validate_token_url(url) def test_invalid_token_url_shall_throw_exceptions(self): - invalid_urls = [ - "https://iamcredentials.googleapis.com", - "sts.googleapis.com", - "https://", - "http://sts.googleapis.com", - "https://st.s.googleapis.com", - "https://us-eas\t-1.sts.googleapis.com", - "https:/us-east-1.sts.googleapis.com", - "https://US-WE/ST-1-sts.googleapis.com", - "https://sts-us-east-1.googleapis.com", - "https://sts-US-WEST-1.googleapis.com", - "testhttps://us-east-1.sts.googleapis.com", - "https://us-east-1.sts.googleapis.comevil.com", - "https://us-east-1.us-east-1.sts.googleapis.com", - "https://us-ea.s.t.sts.googleapis.com", - "https://sts.googleapis.comevil.com", - "hhttps://us-east-1.sts.googleapis.com", - "https://us- -1.sts.googleapis.com", - "https://-sts.googleapis.com", - "https://us-east-1.sts.googleapis.com.evil.com", - "https://sts.pgoogleapis.com", - "https://p.googleapis.com", - "https://sts.p.com", - "http://sts.p.googleapis.com", - "https://xyz-sts.p.googleapis.com", - "https://sts-xyz.123.p.googleapis.com", - "https://sts-xyz.p1.googleapis.com", - "https://sts-xyz.p.foo.com", - "https://sts-xyz.p.foo.googleapis.com", - ] + invalid_urls = INVALID_TOKEN_URLS for url in invalid_urls: # An invalid url should throw a ValueError exception @@ -336,53 +356,14 @@ def test_invalid_token_url_shall_throw_exceptions(self): assert excinfo.match("The provided token URL is invalid.") def test_valid_service_account_impersonation_url_shall_pass_validation(self): - valid_urls = [ - "https://iamcredentials.googleapis.com", - "https://us-east-1.iamcredentials.googleapis.com", - "https://US-EAST-1.iamcredentials.googleapis.com", - "https://iamcredentials.us-east-1.googleapis.com", - "https://iamcredentials.US-WEST-1.googleapis.com", - "https://us-east-1-iamcredentials.googleapis.com", - "https://US-WEST-1-iamcredentials.googleapis.com", - "https://us-west-1-iamcredentials.googleapis.com/path?query", - "https://iamcredentials-us-east-1.p.googleapis.com", - ] + valid_urls = VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS for url in valid_urls: # A valid url shouldn't throw exception and a None value should be returned external_account.Credentials.validate_service_account_impersonation_url(url) def test_invalid_service_account_impersonate_url_shall_throw_exceptions(self): - invalid_urls = [ - "https://sts.googleapis.com", - "iamcredentials.googleapis.com", - "https://", - "http://iamcredentials.googleapis.com", - "https://iamcre.dentials.googleapis.com", - "https://us-eas\t-1.iamcredentials.googleapis.com", - "https:/us-east-1.iamcredentials.googleapis.com", - "https://US-WE/ST-1-iamcredentials.googleapis.com", - "https://iamcredentials-us-east-1.googleapis.com", - "https://iamcredentials-US-WEST-1.googleapis.com", - "testhttps://us-east-1.iamcredentials.googleapis.com", - "https://us-east-1.iamcredentials.googleapis.comevil.com", - "https://us-east-1.us-east-1.iamcredentials.googleapis.com", - "https://us-ea.s.t.iamcredentials.googleapis.com", - "https://iamcredentials.googleapis.comevil.com", - "hhttps://us-east-1.iamcredentials.googleapis.com", - "https://us- -1.iamcredentials.googleapis.com", - "https://-iamcredentials.googleapis.com", - "https://us-east-1.iamcredentials.googleapis.com.evil.com", - "https://iamcredentials.pgoogleapis.com", - "https://p.googleapis.com", - "https://iamcredentials.p.com", - "http://iamcredentials.p.googleapis.com", - "https://xyz-iamcredentials.p.googleapis.com", - "https://iamcredentials-xyz.123.p.googleapis.com", - "https://iamcredentials-xyz.p1.googleapis.com", - "https://iamcredentials-xyz.p.foo.com", - "https://iamcredentials-xyz.p.foo.googleapis.com", - ] + invalid_urls = INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS for url in invalid_urls: # An invalid url should throw a ValueError exception @@ -413,6 +394,8 @@ def test_default_state(self): assert not credentials.scopes assert credentials.requires_scopes assert not credentials.quota_project_id + # Token info url not set yet + assert not credentials.token_info_url def test_invalid_token_url(self): with pytest.raises(ValueError) as excinfo: @@ -515,6 +498,7 @@ def test_with_scopes_full_options_propagated(self): client_secret=CLIENT_SECRET, quota_project_id=self.QUOTA_PROJECT_ID, scopes=self.SCOPES, + token_info_url=self.TOKEN_INFO_URL, default_scopes=["default1"], service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, @@ -531,6 +515,7 @@ def test_with_scopes_full_options_propagated(self): audience=self.AUDIENCE, subject_token_type=self.SUBJECT_TOKEN_TYPE, token_url=self.TOKEN_URL, + token_info_url=self.TOKEN_INFO_URL, credential_source=self.CREDENTIAL_SOURCE, service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, @@ -539,7 +524,6 @@ def test_with_scopes_full_options_propagated(self): quota_project_id=self.QUOTA_PROJECT_ID, scopes=["email"], default_scopes=["default2"], - workforce_pool_user_project=None, ) def test_with_token_uri(self): @@ -599,6 +583,7 @@ def test_with_quota_project_full_options_propagated(self): credentials = self.make_credentials( client_id=CLIENT_ID, client_secret=CLIENT_SECRET, + token_info_url=self.TOKEN_INFO_URL, quota_project_id=self.QUOTA_PROJECT_ID, scopes=self.SCOPES, default_scopes=["default1"], @@ -617,6 +602,7 @@ def test_with_quota_project_full_options_propagated(self): audience=self.AUDIENCE, subject_token_type=self.SUBJECT_TOKEN_TYPE, token_url=self.TOKEN_URL, + token_info_url=self.TOKEN_INFO_URL, credential_source=self.CREDENTIAL_SOURCE, service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, @@ -625,7 +611,6 @@ def test_with_quota_project_full_options_propagated(self): quota_project_id="project-foo", scopes=self.SCOPES, default_scopes=["default1"], - workforce_pool_user_project=None, ) def test_with_invalid_impersonation_target_principal(self): @@ -668,6 +653,7 @@ def test_info_with_full_options(self): client_id=CLIENT_ID, client_secret=CLIENT_SECRET, quota_project_id=self.QUOTA_PROJECT_ID, + token_info_url=self.TOKEN_INFO_URL, service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, ) @@ -677,6 +663,7 @@ def test_info_with_full_options(self): "audience": self.AUDIENCE, "subject_token_type": self.SUBJECT_TOKEN_TYPE, "token_url": self.TOKEN_URL, + "token_info_url": self.TOKEN_INFO_URL, "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL, "service_account_impersonation": {"token_lifetime_seconds": 2800}, "credential_source": self.CREDENTIAL_SOURCE.copy(), diff --git a/tests/test_external_account_authorized_user.py b/tests/test_external_account_authorized_user.py index 49c34a9a4..c97d087b3 100644 --- a/tests/test_external_account_authorized_user.py +++ b/tests/test_external_account_authorized_user.py @@ -42,6 +42,8 @@ CLIENT_SECRET = "password" # Base64 encoding of "username:password". BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" +SCOPES = ["email", "profile"] +NOW = datetime.datetime(1990, 8, 27, 6, 54, 30) class TestCredentials(object): @@ -87,11 +89,22 @@ def test_default_state(self): assert not creds.token assert not creds.valid assert not creds.requires_scopes + assert not creds.scopes + assert not creds.revoke_url + assert creds.token_info_url + assert creds.client_id + assert creds.client_secret assert creds.is_user + assert creds.refresh_token == REFRESH_TOKEN + assert creds.audience == AUDIENCE + assert creds.token_url == TOKEN_URL def test_basic_create(self): creds = external_account_authorized_user.Credentials( - token=ACCESS_TOKEN, expiry=datetime.datetime.max + token=ACCESS_TOKEN, + expiry=datetime.datetime.max, + scopes=SCOPES, + revoke_url=REVOKE_URL, ) assert creds.expiry == datetime.datetime.max @@ -99,15 +112,51 @@ def test_basic_create(self): assert creds.token == ACCESS_TOKEN assert creds.valid assert not creds.requires_scopes + assert creds.scopes == SCOPES assert creds.is_user + assert creds.revoke_url == REVOKE_URL - def test_stunted_create(self): + def test_stunted_create_no_refresh_token(self): with pytest.raises(ValueError) as excinfo: self.make_credentials(token=None, refresh_token=None) - assert excinfo.match(r"Either `refresh_token` or `token` should be set") + assert excinfo.match( + r"Token should be created with fields to make it valid \(`token` and " + r"`expiry`\), or fields to allow it to refresh \(`refresh_token`, " + r"`token_url`, `client_id`, `client_secret`\)\." + ) + + def test_stunted_create_no_token_url(self): + with pytest.raises(ValueError) as excinfo: + self.make_credentials(token=None, token_url=None) + + assert excinfo.match( + r"Token should be created with fields to make it valid \(`token` and " + r"`expiry`\), or fields to allow it to refresh \(`refresh_token`, " + r"`token_url`, `client_id`, `client_secret`\)\." + ) + + def test_stunted_create_no_client_id(self): + with pytest.raises(ValueError) as excinfo: + self.make_credentials(token=None, client_id=None) + + assert excinfo.match( + r"Token should be created with fields to make it valid \(`token` and " + r"`expiry`\), or fields to allow it to refresh \(`refresh_token`, " + r"`token_url`, `client_id`, `client_secret`\)\." + ) - @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + def test_stunted_create_no_client_secret(self): + with pytest.raises(ValueError) as excinfo: + self.make_credentials(token=None, client_secret=None) + + assert excinfo.match( + r"Token should be created with fields to make it valid \(`token` and " + r"`expiry`\), or fields to allow it to refresh \(`refresh_token`, " + r"`token_url`, `client_id`, `client_secret`\)\." + ) + + @mock.patch("google.auth._helpers.utcnow", return_value=NOW) def test_refresh_auth_success(self, utcnow): request = self.make_mock_request( status=http_client.OK, @@ -137,7 +186,7 @@ def test_refresh_auth_success(self, utcnow): ), ) - @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) + @mock.patch("google.auth._helpers.utcnow", return_value=NOW) def test_refresh_auth_success_new_refresh_token(self, utcnow): request = self.make_mock_request( status=http_client.OK, @@ -228,7 +277,7 @@ def test_refresh_without_refresh_token(self): def test_refresh_without_token_url(self): request = self.make_mock_request() - creds = self.make_credentials(token_url=None) + creds = self.make_credentials(token_url=None, token=ACCESS_TOKEN) with pytest.raises(exceptions.RefreshError) as excinfo: creds.refresh(request) @@ -239,8 +288,6 @@ def test_refresh_without_token_url(self): assert not creds.expiry assert not creds.expired - assert not creds.token - assert not creds.valid assert not creds.requires_scopes assert creds.is_user @@ -248,7 +295,7 @@ def test_refresh_without_token_url(self): def test_refresh_without_client_id(self): request = self.make_mock_request() - creds = self.make_credentials(client_id=None) + creds = self.make_credentials(client_id=None, token=ACCESS_TOKEN) with pytest.raises(exceptions.RefreshError) as excinfo: creds.refresh(request) @@ -259,8 +306,6 @@ def test_refresh_without_client_id(self): assert not creds.expiry assert not creds.expired - assert not creds.token - assert not creds.valid assert not creds.requires_scopes assert creds.is_user @@ -268,7 +313,7 @@ def test_refresh_without_client_id(self): def test_refresh_without_client_secret(self): request = self.make_mock_request() - creds = self.make_credentials(client_secret=None) + creds = self.make_credentials(client_secret=None, token=ACCESS_TOKEN) with pytest.raises(exceptions.RefreshError) as excinfo: creds.refresh(request) @@ -279,8 +324,6 @@ def test_refresh_without_client_secret(self): assert not creds.expiry assert not creds.expired - assert not creds.token - assert not creds.valid assert not creds.requires_scopes assert creds.is_user @@ -304,7 +347,7 @@ def test_info(self): def test_info_full(self): creds = self.make_credentials( token=ACCESS_TOKEN, - expiry=datetime.datetime.min, + expiry=NOW, revoke_url=REVOKE_URL, quota_project_id=QUOTA_PROJECT_ID, ) @@ -317,7 +360,7 @@ def test_info_full(self): assert info["client_id"] == CLIENT_ID assert info["client_secret"] == CLIENT_SECRET assert info["token"] == ACCESS_TOKEN - assert info["expiry"] == datetime.datetime.min.isoformat() + "Z" + assert info["expiry"] == NOW.isoformat() + "Z" assert info["revoke_url"] == REVOKE_URL assert info["quota_project_id"] == QUOTA_PROJECT_ID @@ -340,7 +383,7 @@ def test_to_json(self): def test_to_json_full(self): creds = self.make_credentials( token=ACCESS_TOKEN, - expiry=datetime.datetime.min, + expiry=NOW, revoke_url=REVOKE_URL, quota_project_id=QUOTA_PROJECT_ID, ) @@ -354,14 +397,14 @@ def test_to_json_full(self): assert info["client_id"] == CLIENT_ID assert info["client_secret"] == CLIENT_SECRET assert info["token"] == ACCESS_TOKEN - assert info["expiry"] == datetime.datetime.min.isoformat() + "Z" + assert info["expiry"] == NOW.isoformat() + "Z" assert info["revoke_url"] == REVOKE_URL assert info["quota_project_id"] == QUOTA_PROJECT_ID def test_to_json_full_with_strip(self): creds = self.make_credentials( token=ACCESS_TOKEN, - expiry=datetime.datetime.min, + expiry=NOW, revoke_url=REVOKE_URL, quota_project_id=QUOTA_PROJECT_ID, ) @@ -386,7 +429,7 @@ def test_get_project_id(self): def test_with_quota_project(self): creds = self.make_credentials( token=ACCESS_TOKEN, - expiry=datetime.datetime.min, + expiry=NOW, revoke_url=REVOKE_URL, quota_project_id=QUOTA_PROJECT_ID, ) @@ -405,7 +448,7 @@ def test_with_quota_project(self): def test_with_token_uri(self): creds = self.make_credentials( token=ACCESS_TOKEN, - expiry=datetime.datetime.min, + expiry=NOW, revoke_url=REVOKE_URL, quota_project_id=QUOTA_PROJECT_ID, ) @@ -428,36 +471,39 @@ def test_from_file_required_options_only(self, tmpdir): creds = external_account_authorized_user.Credentials.from_file(str(config_file)) assert isinstance(creds, external_account_authorized_user.Credentials) - assert creds._audience == AUDIENCE - assert creds._refresh_token == REFRESH_TOKEN - assert creds._token_url == TOKEN_URL - assert creds._token_info_url == TOKEN_INFO_URL - assert creds._client_id == CLIENT_ID - assert creds._client_secret == CLIENT_SECRET + assert creds.audience == AUDIENCE + assert creds.refresh_token == REFRESH_TOKEN + assert creds.token_url == TOKEN_URL + assert creds.token_info_url == TOKEN_INFO_URL + assert creds.client_id == CLIENT_ID + assert creds.client_secret == CLIENT_SECRET assert creds.token is None assert creds.expiry is None + assert creds.scopes is None assert creds._revoke_url is None assert creds._quota_project_id is None def test_from_file_full_options(self, tmpdir): from_creds = self.make_credentials( token=ACCESS_TOKEN, - expiry=datetime.datetime.min, + expiry=NOW, revoke_url=REVOKE_URL, quota_project_id=QUOTA_PROJECT_ID, + scopes=SCOPES, ) config_file = tmpdir.join("config.json") config_file.write(from_creds.to_json()) creds = external_account_authorized_user.Credentials.from_file(str(config_file)) assert isinstance(creds, external_account_authorized_user.Credentials) - assert creds._audience == AUDIENCE - assert creds._refresh_token == REFRESH_TOKEN - assert creds._token_url == TOKEN_URL - assert creds._token_info_url == TOKEN_INFO_URL - assert creds._client_id == CLIENT_ID - assert creds._client_secret == CLIENT_SECRET + assert creds.audience == AUDIENCE + assert creds.refresh_token == REFRESH_TOKEN + assert creds.token_url == TOKEN_URL + assert creds.token_info_url == TOKEN_INFO_URL + assert creds.client_id == CLIENT_ID + assert creds.client_secret == CLIENT_SECRET assert creds.token == ACCESS_TOKEN - assert creds.expiry == datetime.datetime.min + assert creds.expiry == NOW + assert creds.scopes == SCOPES assert creds._revoke_url == REVOKE_URL assert creds._quota_project_id == QUOTA_PROJECT_ID diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py index 3f48675e2..0b0156eb0 100644 --- a/tests/test_identity_pool.py +++ b/tests/test_identity_pool.py @@ -32,10 +32,16 @@ # Base64 encoding of "username:password". BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" +SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = ( + "https://us-east1-iamcredentials.googleapis.com" +) +SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( + SERVICE_ACCOUNT_EMAIL +) SERVICE_ACCOUNT_IMPERSONATION_URL = ( - "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" - + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) + SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE ) + QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" SCOPES = ["scope1", "scope2"] DATA_DIR = os.path.join(os.path.dirname(__file__), "data") @@ -51,6 +57,7 @@ JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME) TOKEN_URL = "https://sts.googleapis.com/v1/token" +TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect" SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" WORKFORCE_AUDIENCE = ( @@ -60,6 +67,90 @@ WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER" +VALID_TOKEN_URLS = [ + "https://sts.googleapis.com", + "https://us-east-1.sts.googleapis.com", + "https://US-EAST-1.sts.googleapis.com", + "https://sts.us-east-1.googleapis.com", + "https://sts.US-WEST-1.googleapis.com", + "https://us-east-1-sts.googleapis.com", + "https://US-WEST-1-sts.googleapis.com", + "https://us-west-1-sts.googleapis.com/path?query", + "https://sts-us-east-1.p.googleapis.com", +] +INVALID_TOKEN_URLS = [ + "https://iamcredentials.googleapis.com", + "sts.googleapis.com", + "https://", + "http://sts.googleapis.com", + "https://st.s.googleapis.com", + "https://us-eas\t-1.sts.googleapis.com", + "https:/us-east-1.sts.googleapis.com", + "https://US-WE/ST-1-sts.googleapis.com", + "https://sts-us-east-1.googleapis.com", + "https://sts-US-WEST-1.googleapis.com", + "testhttps://us-east-1.sts.googleapis.com", + "https://us-east-1.sts.googleapis.comevil.com", + "https://us-east-1.us-east-1.sts.googleapis.com", + "https://us-ea.s.t.sts.googleapis.com", + "https://sts.googleapis.comevil.com", + "hhttps://us-east-1.sts.googleapis.com", + "https://us- -1.sts.googleapis.com", + "https://-sts.googleapis.com", + "https://us-east-1.sts.googleapis.com.evil.com", + "https://sts.pgoogleapis.com", + "https://p.googleapis.com", + "https://sts.p.com", + "http://sts.p.googleapis.com", + "https://xyz-sts.p.googleapis.com", + "https://sts-xyz.123.p.googleapis.com", + "https://sts-xyz.p1.googleapis.com", + "https://sts-xyz.p.foo.com", + "https://sts-xyz.p.foo.googleapis.com", +] +VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [ + "https://iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.com", + "https://US-EAST-1.iamcredentials.googleapis.com", + "https://iamcredentials.us-east-1.googleapis.com", + "https://iamcredentials.US-WEST-1.googleapis.com", + "https://us-east-1-iamcredentials.googleapis.com", + "https://US-WEST-1-iamcredentials.googleapis.com", + "https://us-west-1-iamcredentials.googleapis.com/path?query", + "https://iamcredentials-us-east-1.p.googleapis.com", +] +INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [ + "https://sts.googleapis.com", + "iamcredentials.googleapis.com", + "https://", + "http://iamcredentials.googleapis.com", + "https://iamcre.dentials.googleapis.com", + "https://us-eas\t-1.iamcredentials.googleapis.com", + "https:/us-east-1.iamcredentials.googleapis.com", + "https://US-WE/ST-1-iamcredentials.googleapis.com", + "https://iamcredentials-us-east-1.googleapis.com", + "https://iamcredentials-US-WEST-1.googleapis.com", + "testhttps://us-east-1.iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.comevil.com", + "https://us-east-1.us-east-1.iamcredentials.googleapis.com", + "https://us-ea.s.t.iamcredentials.googleapis.com", + "https://iamcredentials.googleapis.comevil.com", + "hhttps://us-east-1.iamcredentials.googleapis.com", + "https://us- -1.iamcredentials.googleapis.com", + "https://-iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.com.evil.com", + "https://iamcredentials.pgoogleapis.com", + "https://p.googleapis.com", + "https://iamcredentials.p.com", + "http://iamcredentials.p.googleapis.com", + "https://xyz-iamcredentials.p.googleapis.com", + "https://iamcredentials-xyz.123.p.googleapis.com", + "https://iamcredentials-xyz.p1.googleapis.com", + "https://iamcredentials-xyz.p.foo.com", + "https://iamcredentials-xyz.p.foo.googleapis.com", +] + + class TestCredentials(object): CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE} CREDENTIAL_SOURCE_JSON = { @@ -262,6 +353,8 @@ def make_credentials( cls, audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, client_id=None, client_secret=None, quota_project_id=None, @@ -274,7 +367,8 @@ def make_credentials( return identity_pool.Credentials( audience=audience, subject_token_type=subject_token_type, - token_url=TOKEN_URL, + token_url=token_url, + token_info_url=token_info_url, service_account_impersonation_url=service_account_impersonation_url, credential_source=credential_source, client_id=client_id, @@ -292,6 +386,7 @@ def test_from_info_full_options(self, mock_init): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, "service_account_impersonation": {"token_lifetime_seconds": 2800}, "client_id": CLIENT_ID, @@ -307,6 +402,7 @@ def test_from_info_full_options(self, mock_init): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, client_id=CLIENT_ID, @@ -333,6 +429,7 @@ def test_from_info_required_options_only(self, mock_init): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=None, service_account_impersonation_url=None, service_account_impersonation_options={}, client_id=None, @@ -360,6 +457,7 @@ def test_from_info_workforce_pool(self, mock_init): audience=WORKFORCE_AUDIENCE, subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=None, service_account_impersonation_url=None, service_account_impersonation_options={}, client_id=None, @@ -375,6 +473,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, "service_account_impersonation": {"token_lifetime_seconds": 2800}, "client_id": CLIENT_ID, @@ -392,6 +491,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, client_id=CLIENT_ID, @@ -419,6 +519,7 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=None, service_account_impersonation_url=None, service_account_impersonation_options={}, client_id=None, @@ -447,6 +548,7 @@ def test_from_file_workforce_pool(self, mock_init, tmpdir): audience=WORKFORCE_AUDIENCE, subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=None, service_account_impersonation_url=None, service_account_impersonation_options={}, client_id=None, @@ -534,6 +636,7 @@ def test_info_with_workforce_pool_user_project(self): "audience": WORKFORCE_AUDIENCE, "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT, } @@ -548,6 +651,7 @@ def test_info_with_file_credential_source(self): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL, } @@ -561,6 +665,7 @@ def test_info_with_url_credential_source(self): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "credential_source": self.CREDENTIAL_SOURCE_JSON_URL, } @@ -638,6 +743,85 @@ def test_retrieve_subject_token_file_not_found(self): assert excinfo.match(r"File './not_found.txt' was not found") + def test_token_info_url(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON + ) + + assert credentials.token_info_url == TOKEN_INFO_URL + + def test_token_info_url_custom(self): + for url in VALID_TOKEN_URLS: + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), + token_info_url=(url + "/introspect"), + ) + + assert credentials.token_info_url == url + "/introspect" + + def test_token_info_url_bad(self): + for url in INVALID_TOKEN_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), + token_info_url=(url + "/introspect"), + ) + + assert excinfo.match(r"The provided token info URL is invalid.") + + def test_token_info_url_negative(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), token_info_url=None + ) + + assert not credentials.token_info_url + + def test_token_url_custom(self): + for url in VALID_TOKEN_URLS: + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), + token_url=(url + "/token"), + ) + + assert credentials._token_url == (url + "/token") + + def test_token_url_bad(self): + for url in INVALID_TOKEN_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), + token_url=(url + "/token"), + ) + + assert excinfo.match(r"The provided token URL is invalid\.") + + def test_service_account_impersonation_url_custom(self): + for url in VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS: + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), + service_account_impersonation_url=( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ), + ) + + assert credentials._service_account_impersonation_url == ( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ) + + def test_service_account_impersonation_url_bad(self): + for url in INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON.copy(), + service_account_impersonation_url=( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ), + ) + + assert excinfo.match( + r"The provided service account impersonation URL is invalid\." + ) + def test_refresh_text_file_success_without_impersonation_ignore_default_scopes( self, ): diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py index 293d5c6ed..0c0ebeb06 100644 --- a/tests/test_pluggable.py +++ b/tests/test_pluggable.py @@ -36,18 +36,107 @@ # Base64 encoding of "username:password". BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" +SERVICE_ACCOUNT_IMPERSONATION_URL_BASE = ( + "https://us-east1-iamcredentials.googleapis.com" +) +SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE = "/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( + SERVICE_ACCOUNT_EMAIL +) SERVICE_ACCOUNT_IMPERSONATION_URL = ( - "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" - + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) + SERVICE_ACCOUNT_IMPERSONATION_URL_BASE + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE ) QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" SCOPES = ["scope1", "scope2"] SUBJECT_TOKEN_FIELD_NAME = "access_token" TOKEN_URL = "https://sts.googleapis.com/v1/token" +TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect" SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" +VALID_TOKEN_URLS = [ + "https://sts.googleapis.com", + "https://us-east-1.sts.googleapis.com", + "https://US-EAST-1.sts.googleapis.com", + "https://sts.us-east-1.googleapis.com", + "https://sts.US-WEST-1.googleapis.com", + "https://us-east-1-sts.googleapis.com", + "https://US-WEST-1-sts.googleapis.com", + "https://us-west-1-sts.googleapis.com/path?query", + "https://sts-us-east-1.p.googleapis.com", +] +INVALID_TOKEN_URLS = [ + "https://iamcredentials.googleapis.com", + "sts.googleapis.com", + "https://", + "http://sts.googleapis.com", + "https://st.s.googleapis.com", + "https://us-eas\t-1.sts.googleapis.com", + "https:/us-east-1.sts.googleapis.com", + "https://US-WE/ST-1-sts.googleapis.com", + "https://sts-us-east-1.googleapis.com", + "https://sts-US-WEST-1.googleapis.com", + "testhttps://us-east-1.sts.googleapis.com", + "https://us-east-1.sts.googleapis.comevil.com", + "https://us-east-1.us-east-1.sts.googleapis.com", + "https://us-ea.s.t.sts.googleapis.com", + "https://sts.googleapis.comevil.com", + "hhttps://us-east-1.sts.googleapis.com", + "https://us- -1.sts.googleapis.com", + "https://-sts.googleapis.com", + "https://us-east-1.sts.googleapis.com.evil.com", + "https://sts.pgoogleapis.com", + "https://p.googleapis.com", + "https://sts.p.com", + "http://sts.p.googleapis.com", + "https://xyz-sts.p.googleapis.com", + "https://sts-xyz.123.p.googleapis.com", + "https://sts-xyz.p1.googleapis.com", + "https://sts-xyz.p.foo.com", + "https://sts-xyz.p.foo.googleapis.com", +] +VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [ + "https://iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.com", + "https://US-EAST-1.iamcredentials.googleapis.com", + "https://iamcredentials.us-east-1.googleapis.com", + "https://iamcredentials.US-WEST-1.googleapis.com", + "https://us-east-1-iamcredentials.googleapis.com", + "https://US-WEST-1-iamcredentials.googleapis.com", + "https://us-west-1-iamcredentials.googleapis.com/path?query", + "https://iamcredentials-us-east-1.p.googleapis.com", +] +INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS = [ + "https://sts.googleapis.com", + "iamcredentials.googleapis.com", + "https://", + "http://iamcredentials.googleapis.com", + "https://iamcre.dentials.googleapis.com", + "https://us-eas\t-1.iamcredentials.googleapis.com", + "https:/us-east-1.iamcredentials.googleapis.com", + "https://US-WE/ST-1-iamcredentials.googleapis.com", + "https://iamcredentials-us-east-1.googleapis.com", + "https://iamcredentials-US-WEST-1.googleapis.com", + "testhttps://us-east-1.iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.comevil.com", + "https://us-east-1.us-east-1.iamcredentials.googleapis.com", + "https://us-ea.s.t.iamcredentials.googleapis.com", + "https://iamcredentials.googleapis.comevil.com", + "hhttps://us-east-1.iamcredentials.googleapis.com", + "https://us- -1.iamcredentials.googleapis.com", + "https://-iamcredentials.googleapis.com", + "https://us-east-1.iamcredentials.googleapis.com.evil.com", + "https://iamcredentials.pgoogleapis.com", + "https://p.googleapis.com", + "https://iamcredentials.p.com", + "http://iamcredentials.p.googleapis.com", + "https://xyz-iamcredentials.p.googleapis.com", + "https://iamcredentials-xyz.123.p.googleapis.com", + "https://iamcredentials-xyz.p1.googleapis.com", + "https://iamcredentials-xyz.p.foo.com", + "https://iamcredentials-xyz.p.foo.googleapis.com", +] + class TestCredentials(object): CREDENTIAL_SOURCE_EXECUTABLE_COMMAND = ( @@ -115,6 +204,8 @@ def make_pluggable( cls, audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, client_id=None, client_secret=None, quota_project_id=None, @@ -128,7 +219,8 @@ def make_pluggable( return pluggable.Credentials( audience=audience, subject_token_type=subject_token_type, - token_url=TOKEN_URL, + token_url=token_url, + token_info_url=token_info_url, service_account_impersonation_url=service_account_impersonation_url, credential_source=credential_source, client_id=client_id, @@ -147,6 +239,7 @@ def test_from_info_full_options(self, mock_init): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, "service_account_impersonation": {"token_lifetime_seconds": 2800}, "client_id": CLIENT_ID, @@ -162,6 +255,7 @@ def test_from_info_full_options(self, mock_init): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, client_id=CLIENT_ID, @@ -188,6 +282,7 @@ def test_from_info_required_options_only(self, mock_init): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=None, service_account_impersonation_url=None, service_account_impersonation_options={}, client_id=None, @@ -203,6 +298,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, "service_account_impersonation": {"token_lifetime_seconds": 2800}, "client_id": CLIENT_ID, @@ -220,6 +316,7 @@ def test_from_file_full_options(self, mock_init, tmpdir): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=TOKEN_INFO_URL, service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, service_account_impersonation_options={"token_lifetime_seconds": 2800}, client_id=CLIENT_ID, @@ -247,6 +344,7 @@ def test_from_file_required_options_only(self, mock_init, tmpdir): audience=AUDIENCE, subject_token_type=SUBJECT_TOKEN_TYPE, token_url=TOKEN_URL, + token_info_url=None, service_account_impersonation_url=None, service_account_impersonation_options={}, client_id=None, @@ -280,9 +378,89 @@ def test_info_with_credential_source(self): "audience": AUDIENCE, "subject_token_type": SUBJECT_TOKEN_TYPE, "token_url": TOKEN_URL, + "token_info_url": TOKEN_INFO_URL, "credential_source": self.CREDENTIAL_SOURCE, } + def test_token_info_url(self): + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy() + ) + + assert credentials.token_info_url == TOKEN_INFO_URL + + def test_token_info_url_custom(self): + for url in VALID_TOKEN_URLS: + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy(), + token_info_url=(url + "/introspect"), + ) + + assert credentials.token_info_url == url + "/introspect" + + def test_token_info_url_bad(self): + for url in INVALID_TOKEN_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy(), + token_info_url=(url + "/introspect"), + ) + + assert excinfo.match(r"The provided token info URL is invalid.") + + def test_token_info_url_negative(self): + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy(), token_info_url=None + ) + + assert not credentials.token_info_url + + def test_token_url_custom(self): + for url in VALID_TOKEN_URLS: + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy(), + token_url=(url + "/token"), + ) + + assert credentials._token_url == (url + "/token") + + def test_token_url_bad(self): + for url in INVALID_TOKEN_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy(), + token_url=(url + "/token"), + ) + + assert excinfo.match(r"The provided token URL is invalid\.") + + def test_service_account_impersonation_url_custom(self): + for url in VALID_SERVICE_ACCOUNT_IMPERSONATION_URLS: + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy(), + service_account_impersonation_url=( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ), + ) + + assert credentials._service_account_impersonation_url == ( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ) + + def test_service_account_impersonation_url_bad(self): + for url in INVALID_SERVICE_ACCOUNT_IMPERSONATION_URLS: + with pytest.raises(ValueError) as excinfo: + self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE.copy(), + service_account_impersonation_url=( + url + SERVICE_ACCOUNT_IMPERSONATION_URL_ROUTE + ), + ) + + assert excinfo.match( + r"The provided service account impersonation URL is invalid\." + ) + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_successfully(self, tmpdir): ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = tmpdir.join(