diff --git a/google/auth/identity_pool.py b/google/auth/identity_pool.py new file mode 100644 index 000000000..4e78f5a30 --- /dev/null +++ b/google/auth/identity_pool.py @@ -0,0 +1,206 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Identity Pool Credentials. + +This module provides credentials that are initialized using external_account +arguments which are typically loaded from the external credentials file. +Unlike other Credentials that can be initialized with a list of explicit +arguments, secrets or credentials, external account clients use the +environment and hints/guidelines provided by the external_account JSON +file to retrieve credentials and exchange them for Google access tokens. + +Identity Pool Credentials are used with external credentials (eg. OIDC +ID tokens) retrieved from a file location, typical for K8s workloads +registered with Hub with Hub workload identity enabled. +""" + +import io +import json +import os + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import external_account + + +class Credentials(external_account.Credentials): + """File-sourced external account credentials. + This is typically used to exchange OIDC ID tokens in K8s (file-sourced + credentials) for Google access tokens. + """ + + def __init__( + self, + audience, + subject_token_type, + token_url, + credential_source, + service_account_impersonation_url=None, + client_id=None, + client_secret=None, + quota_project_id=None, + scopes=None, + ): + """Instantiates a file-sourced external account credentials object. + + Args: + audience (str): The STS audience field. + subject_token_type (str): The subject token type. + token_url (str): The STS endpoint URL. + credential_source (Mapping): The credential source dictionary used to + provide instructions on how to retrieve external credential to be + exchanged for Google access tokens.. + service_account_impersonation_url (Optional[str]): The optional service account + impersonation getAccessToken URL. + client_id (Optional[str]): The optional client ID. + client_secret (Optional[str]): The optional client secret. + quota_project_id (Optional[str]): The optional quota project ID. + scopes (Optional[Sequence[str]]): Optional scopes to request during the + authorization grant. + + Raises: + google.auth.exceptions.RefreshError: If an error is encountered during + access token retrieval logic. + google.auth.exceptions.GoogleAuthError: For invalid parameters. + + .. note:: Typically one of the helper constructors + :meth:`from_file` or + :meth:`from_info` are used instead of calling the constructor directly. + """ + + super(Credentials, self).__init__( + audience=audience, + subject_token_type=subject_token_type, + token_url=token_url, + credential_source=credential_source, + service_account_impersonation_url=service_account_impersonation_url, + client_id=client_id, + client_secret=client_secret, + quota_project_id=quota_project_id, + scopes=scopes, + ) + if isinstance(credential_source, dict): + self._credential_source_file = credential_source.get("file") + self._credential_source_headers = credential_source.get("headers") + credential_source_format = credential_source.get("format") or {} + # Get credential_source format type. When not provided, this + # defaults to text. + self._credential_source_format_type = ( + credential_source_format.get("type") or "text" + ) + if self._credential_source_format_type not in ["text", "json"]: + raise exceptions.GoogleAuthError( + "Invalid credential_source format '{}'".format( + self._credential_source_format_type + ) + ) + # For JSON types, get the required subject_token field name. + if self._credential_source_format_type == "json": + self._credential_source_field_name = credential_source_format.get( + "subject_token_field_name" + ) + if self._credential_source_field_name is None: + raise exceptions.GoogleAuthError( + "Missing subject_token_field_name for JSON credential_source format" + ) + else: + self._credential_source_field_name = None + else: + self._credential_source_file = None + if not self._credential_source_file: + raise exceptions.GoogleAuthError("Missing credential_source file") + + @_helpers.copy_docstring(external_account.Credentials) + def retrieve_subject_token(self, request): + return self._get_token_file( + self._credential_source_file, + self._credential_source_format_type, + self._credential_source_field_name, + ) + + def _get_token_file( + self, filename, format_type="text", subject_token_field_name=None + ): + if not os.path.exists(filename): + raise exceptions.RefreshError("File '{}' was not found.".format(filename)) + + with io.open(filename, "r", encoding="utf-8") as file_obj: + content = file_obj.read() + + if format_type == "text": + token = content + else: + try: + # Parse file content as JSON. + response_data = json.loads(content) + # Get the subject_token. + token = response_data[subject_token_field_name] + except (KeyError, ValueError): + raise exceptions.RefreshError( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + filename, subject_token_field_name + ) + ) + if not token: + raise exceptions.RefreshError( + "Missing subject_token in the credential_source file" + ) + return token + + @classmethod + def from_info(cls, info, **kwargs): + """Creates an Identity Pool Credentials instance from parsed external account info. + + Args: + info (Mapping[str, str]): The Identity Pool external account info in Google + format. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.identity_pool.Credentials: The constructed + credentials. + + Raises: + google.auth.exceptions.GoogleAuthError: For invalid parameters. + """ + return cls( + audience=info.get("audience"), + subject_token_type=info.get("subject_token_type"), + token_url=info.get("token_url"), + service_account_impersonation_url=info.get( + "service_account_impersonation_url" + ), + client_id=info.get("client_id"), + client_secret=info.get("client_secret"), + credential_source=info.get("credential_source"), + quota_project_id=info.get("quota_project_id"), + **kwargs + ) + + @classmethod + def from_file(cls, filename, **kwargs): + """Creates an IdentityPool Credentials instance from an external account json file. + + Args: + filename (str): The path to the IdentityPool external account json file. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.identity_pool.Credentials: The constructed + credentials. + """ + with io.open(filename, "r", encoding="utf-8") as json_file: + data = json.load(json_file) + return cls.from_info(data, **kwargs) diff --git a/tests/data/external_subject_token.json b/tests/data/external_subject_token.json new file mode 100644 index 000000000..a47ec3412 --- /dev/null +++ b/tests/data/external_subject_token.json @@ -0,0 +1,3 @@ +{ + "access_token": "HEADER.SIMULATED_JWT_PAYLOAD.SIGNATURE" +} \ No newline at end of file diff --git a/tests/data/external_subject_token.txt b/tests/data/external_subject_token.txt new file mode 100644 index 000000000..c668d8f71 --- /dev/null +++ b/tests/data/external_subject_token.txt @@ -0,0 +1 @@ +HEADER.SIMULATED_JWT_PAYLOAD.SIGNATURE \ No newline at end of file diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py new file mode 100644 index 000000000..07a6ee6f1 --- /dev/null +++ b/tests/test_identity_pool.py @@ -0,0 +1,546 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import os + +import mock +import pytest +from six.moves import http_client +from six.moves import urllib + +from google.auth import _helpers +from google.auth import exceptions +from google.auth import identity_pool +from google.auth import transport + + +CLIENT_ID = "username" +CLIENT_SECRET = "password" +# Base64 encoding of "username:password". +BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" +SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com" +SERVICE_ACCOUNT_IMPERSONATION_URL = ( + "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" + + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) +) +QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" +SCOPES = ["scope1", "scope2"] +DATA_DIR = os.path.join(os.path.dirname(__file__), "data") +SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt") +SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json") +SUBJECT_TOKEN_FIELD_NAME = "access_token" + +with open(SUBJECT_TOKEN_TEXT_FILE) as fh: + TEXT_FILE_SUBJECT_TOKEN = fh.read() + +with open(SUBJECT_TOKEN_JSON_FILE) as fh: + content = json.load(fh) + JSON_FILE_SUBJECT_TOKEN = content.get(SUBJECT_TOKEN_FIELD_NAME) + +TOKEN_URL = "https://sts.googleapis.com/v1/token" +SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" +AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" + + +class TestCredentials(object): + CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE} + CREDENTIAL_SOURCE_JSON = { + "file": SUBJECT_TOKEN_JSON_FILE, + "format": {"type": "json", "subject_token_field_name": "access_token"}, + } + SUCCESS_RESPONSE = { + "access_token": "ACCESS_TOKEN", + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", + "token_type": "Bearer", + "expires_in": 3600, + "scope": " ".join(SCOPES), + } + + @classmethod + def make_mock_request( + cls, + data, + status=http_client.OK, + impersonation_data=None, + impersonation_status=None, + ): + # STS token exchange request. + token_response = mock.create_autospec(transport.Response, instance=True) + token_response.status = status + token_response.data = json.dumps(data).encode("utf-8") + responses = [token_response] + + # If service account impersonation is requested, mock the expected response. + if impersonation_status and impersonation_status: + impersonation_response = mock.create_autospec( + transport.Response, instance=True + ) + impersonation_response.status = impersonation_status + impersonation_response.data = json.dumps(impersonation_data).encode("utf-8") + responses.append(impersonation_response) + + request = mock.create_autospec(transport.Request) + request.side_effect = responses + + return request + + @classmethod + def assert_token_request_kwargs( + cls, request_kwargs, headers, request_data, token_url=TOKEN_URL + ): + assert request_kwargs["url"] == token_url + assert request_kwargs["method"] == "POST" + assert request_kwargs["headers"] == headers + assert request_kwargs["body"] is not None + body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) + assert len(body_tuples) == len(request_data.keys()) + for (k, v) in body_tuples: + assert v.decode("utf-8") == request_data[k.decode("utf-8")] + + @classmethod + def assert_impersonation_request_kwargs( + cls, + request_kwargs, + headers, + request_data, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + ): + assert request_kwargs["url"] == service_account_impersonation_url + assert request_kwargs["method"] == "POST" + assert request_kwargs["headers"] == headers + assert request_kwargs["body"] is not None + body_json = json.loads(request_kwargs["body"].decode("utf-8")) + assert body_json == request_data + + @classmethod + def assert_underlying_credentials_refresh( + cls, + credentials, + audience, + subject_token, + subject_token_type, + token_url, + service_account_impersonation_url=None, + basic_auth_encoding=None, + quota_project_id=None, + scopes=None, + ): + """Utility to assert that a credentials are initialized with the expected + attributes by calling refresh functionality and confirming response matches + expected one and that the underlying requests were populated with the + expected parameters. + """ + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600) + ).isoformat("T") + "Z" + # STS token exchange request/response. + token_response = cls.SUCCESS_RESPONSE.copy() + + token_headers = {"Content-Type": "application/x-www-form-urlencoded"} + if basic_auth_encoding: + token_headers["Authorization"] = "Basic " + basic_auth_encoding + if service_account_impersonation_url: + token_scopes = "https://www.googleapis.com/auth/iam" + impersonation_status = http_client.OK + total_requests = 2 + else: + token_scopes = " ".join(scopes or []) + impersonation_status = None + total_requests = 1 + token_request_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "audience": audience, + "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", + "scope": token_scopes, + "subject_token": subject_token, + "subject_token_type": subject_token_type, + } + if token_scopes == "": + token_request_data.pop("scope", None) + # Service account impersonation request/response. + impersonation_response = { + "accessToken": "SA_ACCESS_TOKEN", + "expireTime": expire_time, + } + impersonation_headers = { + "Content-Type": "application/json", + "authorization": "Bearer {}".format(token_response["access_token"]), + } + if quota_project_id: + impersonation_headers["x-goog-user-project"] = quota_project_id + impersonation_request_data = { + "delegates": None, + "scope": scopes, + "lifetime": "3600s", + } + # Initialize mock request to handle token exchange and service account + # impersonation request. + request = cls.make_mock_request( + status=http_client.OK, + data=token_response, + impersonation_status=impersonation_status, + impersonation_data=impersonation_response, + ) + + credentials.refresh(request) + + assert len(request.call_args_list) == total_requests + # Verify token exchange request parameters. + cls.assert_token_request_kwargs( + request.call_args_list[0].kwargs, + token_headers, + token_request_data, + token_url, + ) + # Verify service account impersonation request parameters if the request + # is processed. + if impersonation_status: + cls.assert_impersonation_request_kwargs( + request.call_args_list[1].kwargs, + impersonation_headers, + impersonation_request_data, + service_account_impersonation_url, + ) + assert credentials.token == impersonation_response["accessToken"] + else: + assert credentials.token == token_response["access_token"] + assert credentials.quota_project_id == quota_project_id + assert credentials.scopes == scopes + + @classmethod + def make_credentials( + cls, + client_id=None, + client_secret=None, + quota_project_id=None, + scopes=None, + service_account_impersonation_url=None, + credential_source=None, + ): + return identity_pool.Credentials( + audience=AUDIENCE, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=service_account_impersonation_url, + credential_source=credential_source, + client_id=client_id, + client_secret=client_secret, + quota_project_id=quota_project_id, + scopes=scopes, + ) + + def test_from_info_full_options(self): + credentials = identity_pool.Credentials.from_info( + { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "quota_project_id": QUOTA_PROJECT_ID, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + } + ) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=QUOTA_PROJECT_ID, + ) + + def test_from_info_required_options_only(self): + credentials = identity_pool.Credentials.from_info( + { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + } + ) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + ) + + def test_from_file_full_options(self, tmpdir): + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "quota_project_id": QUOTA_PROJECT_ID, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = identity_pool.Credentials.from_file(str(config_file)) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=QUOTA_PROJECT_ID, + ) + + def test_from_file_required_options_only(self, tmpdir): + info = { + "audience": AUDIENCE, + "subject_token_type": SUBJECT_TOKEN_TYPE, + "token_url": TOKEN_URL, + "credential_source": self.CREDENTIAL_SOURCE_TEXT, + } + config_file = tmpdir.join("config.json") + config_file.write(json.dumps(info)) + credentials = identity_pool.Credentials.from_file(str(config_file)) + + # Confirm identity_pool.Credentials instantiated with expected attributes. + assert isinstance(credentials, identity_pool.Credentials) + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + ) + + def test_constructor_invalid_options(self): + credential_source = {"unsupported": "value"} + + with pytest.raises(exceptions.GoogleAuthError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Missing credential_source file") + + def test_constructor_invalid_credential_source(self): + with pytest.raises(exceptions.GoogleAuthError) as excinfo: + self.make_credentials(credential_source="non-dict") + + assert excinfo.match(r"Missing credential_source file") + + def test_constructor_invalid_credential_source_format_type(self): + credential_source = {"format": {"type": "xml"}} + + with pytest.raises(exceptions.GoogleAuthError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match(r"Invalid credential_source format 'xml'") + + def test_constructor_missing_subject_token_field_name(self): + credential_source = {"format": {"type": "json"}} + + with pytest.raises(exceptions.GoogleAuthError) as excinfo: + self.make_credentials(credential_source=credential_source) + + assert excinfo.match( + r"Missing subject_token_field_name for JSON credential_source format" + ) + + def test_retrieve_subject_token_missing_subject_token(self, tmpdir): + # Provide empty text file. + empty_file = tmpdir.join("empty.txt") + empty_file.write("") + credential_source = {"file": str(empty_file)} + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Missing subject_token in the credential_source file") + + def test_retrieve_subject_token_text_file(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_TEXT + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == TEXT_FILE_SUBJECT_TOKEN + + def test_retrieve_subject_token_json_file(self): + credentials = self.make_credentials( + credential_source=self.CREDENTIAL_SOURCE_JSON + ) + + subject_token = credentials.retrieve_subject_token(None) + + assert subject_token == JSON_FILE_SUBJECT_TOKEN + + def test_retrieve_subject_token_json_file_invalid_field_name(self): + credential_source = { + "file": SUBJECT_TOKEN_JSON_FILE, + "format": {"type": "json", "subject_token_field_name": "not_found"}, + } + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(None) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + SUBJECT_TOKEN_JSON_FILE, "not_found" + ) + ) + + def test_retrieve_subject_token_invalid_json(self, tmpdir): + # Provide JSON file. This should result in JSON parsing error. + invalid_json_file = tmpdir.join("invalid.json") + invalid_json_file.write("{") + credential_source = { + "file": str(invalid_json_file), + "format": {"type": "json", "subject_token_field_name": "access_token"}, + } + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(None) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + str(invalid_json_file), "access_token" + ) + ) + + def test_retrieve_subject_token_file_not_found(self): + credential_source = {"file": "./not_found.txt"} + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.retrieve_subject_token(None) + + assert excinfo.match(r"File './not_found.txt' was not found") + + def test_refresh_text_file_success_without_impersonation(self): + credentials = self.make_credentials( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + scopes=SCOPES, + ) + + def test_refresh_text_file_success_with_impersonation(self): + # Initialize credentials with service account impersonation and basic auth. + credentials = self.make_credentials( + # Test with text format type. + credential_source=self.CREDENTIAL_SOURCE_TEXT, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=TEXT_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + scopes=SCOPES, + ) + + def test_refresh_json_file_success_without_impersonation(self): + credentials = self.make_credentials( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + # Test with JSON format type. + credential_source=self.CREDENTIAL_SOURCE_JSON, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=JSON_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=None, + basic_auth_encoding=BASIC_AUTH_ENCODING, + quota_project_id=None, + scopes=SCOPES, + ) + + def test_refresh_json_file_success_with_impersonation(self): + # Initialize credentials with service account impersonation and basic auth. + credentials = self.make_credentials( + # Test with JSON format type. + credential_source=self.CREDENTIAL_SOURCE_JSON, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + scopes=SCOPES, + ) + + self.assert_underlying_credentials_refresh( + credentials=credentials, + audience=AUDIENCE, + subject_token=JSON_FILE_SUBJECT_TOKEN, + subject_token_type=SUBJECT_TOKEN_TYPE, + token_url=TOKEN_URL, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + basic_auth_encoding=None, + quota_project_id=None, + scopes=SCOPES, + ) + + def test_refresh_with_retrieve_subject_token_error(self): + credential_source = { + "file": SUBJECT_TOKEN_JSON_FILE, + "format": {"type": "json", "subject_token_field_name": "not_found"}, + } + credentials = self.make_credentials(credential_source=credential_source) + + with pytest.raises(exceptions.RefreshError) as excinfo: + credentials.refresh(None) + + assert excinfo.match( + "Unable to parse subject_token from JSON file '{}' using key '{}'".format( + SUBJECT_TOKEN_JSON_FILE, "not_found" + ) + )