Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

fix: add impersonated SA via local ADC support for fetch_id_token #1740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 2 additions & 36 deletions 38 google/auth/_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,42 +484,8 @@ def _get_impersonated_service_account_credentials(filename, info, scopes):
from google.auth import impersonated_credentials

try:
source_credentials_info = info.get("source_credentials")
source_credentials_type = source_credentials_info.get("type")
if source_credentials_type == _AUTHORIZED_USER_TYPE:
source_credentials, _ = _get_authorized_user_credentials(
filename, source_credentials_info
)
elif source_credentials_type == _SERVICE_ACCOUNT_TYPE:
source_credentials, _ = _get_service_account_credentials(
filename, source_credentials_info
)
elif source_credentials_type == _EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE:
source_credentials, _ = _get_external_account_authorized_user_credentials(
filename, source_credentials_info
)
else:
raise exceptions.InvalidType(
"source credential of type {} is not supported.".format(
source_credentials_type
)
)
impersonation_url = info.get("service_account_impersonation_url")
start_index = impersonation_url.rfind("/")
end_index = impersonation_url.find(":generateAccessToken")
if start_index == -1 or end_index == -1 or start_index > end_index:
raise exceptions.InvalidValue(
"Cannot extract target principal from {}".format(impersonation_url)
)
target_principal = impersonation_url[start_index + 1 : end_index]
delegates = info.get("delegates")
quota_project_id = info.get("quota_project_id")
credentials = impersonated_credentials.Credentials(
source_credentials,
target_principal,
scopes,
delegates,
quota_project_id=quota_project_id,
credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info(
info, scopes=scopes
)
except ValueError as caught_exc:
msg = "Failed to load impersonated service account credentials from {}".format(
Expand Down
75 changes: 75 additions & 0 deletions 75 google/auth/impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@

_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"

_SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user"
_SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account"
_SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE = (
"external_account_authorized_user"
)


def _make_iam_token_request(
request,
Expand Down Expand Up @@ -410,6 +416,75 @@ def with_scopes(self, scopes, default_scopes=None):
cred._target_scopes = scopes or default_scopes
return cred

@classmethod
def from_impersonated_service_account_info(cls, info, scopes=None):
"""Creates a Credentials instance from parsed impersonated service account credentials info.

Args:
info (Mapping[str, str]): The impersonated service account credentials info in Google
format.
scopes (Sequence[str]): Optional list of scopes to include in the
credentials.

Returns:
google.oauth2.credentials.Credentials: The constructed
credentials.

Raises:
InvalidType: If the info["source_credentials"] are not a supported impersonation type
InvalidValue: If the info["service_account_impersonation_url"] is not in the expected format.
ValueError: If the info is not in the expected format.
"""

source_credentials_info = info.get("source_credentials")
source_credentials_type = source_credentials_info.get("type")
if source_credentials_type == _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE:
from google.oauth2 import credentials

source_credentials = credentials.Credentials.from_authorized_user_info(
source_credentials_info
)
elif source_credentials_type == _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE:
from google.oauth2 import service_account

source_credentials = service_account.Credentials.from_service_account_info(
source_credentials_info
)
elif (
source_credentials_type
== _SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE
):
from google.auth import external_account_authorized_user

source_credentials = external_account_authorized_user.Credentials.from_info(
source_credentials_info
)
else:
raise exceptions.InvalidType(
"source credential of type {} is not supported.".format(
source_credentials_type
)
)

impersonation_url = info.get("service_account_impersonation_url")
start_index = impersonation_url.rfind("/")
end_index = impersonation_url.find(":generateAccessToken")
if start_index == -1 or end_index == -1 or start_index > end_index:
raise exceptions.InvalidValue(
"Cannot extract target principal from {}".format(impersonation_url)
)
target_principal = impersonation_url[start_index + 1 : end_index]
delegates = info.get("delegates")
quota_project_id = info.get("quota_project_id")

return cls(
source_credentials,
target_principal,
scopes,
delegates,
quota_project_id=quota_project_id,
)


class IDTokenCredentials(credentials.CredentialsWithQuotaProject):
"""Open ID Connect ID Token-based service account credentials.
Expand Down
12 changes: 12 additions & 0 deletions 12 google/oauth2/id_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,18 @@ def fetch_id_token_credentials(audience, request=None):
return service_account.IDTokenCredentials.from_service_account_info(
info, target_audience=audience
)
elif info.get("type") == "impersonated_service_account":
from google.auth import impersonated_credentials

target_credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info(
info
)

return impersonated_credentials.IDTokenCredentials(
target_credentials=target_credentials,
target_audience=audience,
include_email=True,
sai-sunder-s marked this conversation as resolved.
Show resolved Hide resolved
)
except ValueError as caught_exc:
new_exc = exceptions.DefaultCredentialsError(
"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.",
Expand Down
15 changes: 15 additions & 0 deletions 15 tests/oauth2/test_id_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

from google.auth import environment_vars
from google.auth import exceptions
from google.auth import impersonated_credentials
from google.auth import transport
from google.oauth2 import id_token
from google.oauth2 import service_account

SERVICE_ACCOUNT_FILE = os.path.join(
os.path.dirname(__file__), "../data/service_account.json"
)

IMPERSONATED_SERVICE_ACCOUNT_FILE = os.path.join(
os.path.dirname(__file__),
"../data/impersonated_service_account_authorized_user_source.json",
)

ID_TOKEN_AUDIENCE = "https://pubsub.googleapis.com"


Expand Down Expand Up @@ -262,6 +269,14 @@ def test_fetch_id_token_credentials_from_explicit_cred_json_file(monkeypatch):
assert cred._target_audience == ID_TOKEN_AUDIENCE


def test_fetch_id_token_credentials_from_impersonated_cred_json_file(monkeypatch):
monkeypatch.setenv(environment_vars.CREDENTIALS, IMPERSONATED_SERVICE_ACCOUNT_FILE)

cred = id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
assert isinstance(cred, impersonated_credentials.IDTokenCredentials)
assert cred._target_audience == ID_TOKEN_AUDIENCE


def test_fetch_id_token_credentials_no_cred_exists(monkeypatch):
monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)

Expand Down
39 changes: 39 additions & 0 deletions 39 tests/test_impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import datetime
import http.client as http_client
import json
Expand All @@ -35,6 +36,9 @@
PRIVATE_KEY_BYTES = fh.read()

SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE = os.path.join(
DATA_DIR, "impersonated_service_account_authorized_user_source.json"
)

ID_TOKEN_DATA = (
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew"
Expand All @@ -49,6 +53,9 @@
with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)

with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE, "rb") as fh:
IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO = json.load(fh)

SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
TOKEN_URI = "https://example.com/oauth2/token"

Expand Down Expand Up @@ -148,6 +155,38 @@ def make_credentials(
iam_endpoint_override=iam_endpoint_override,
)

def test_from_impersonated_service_account_info(self):
credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info(
IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO
)
assert isinstance(credentials, impersonated_credentials.Credentials)

def test_from_impersonated_service_account_info_with_invalid_source_credentials_type(
self
):
info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO)
assert "source_credentials" in info
# Set the source_credentials to an invalid type
info["source_credentials"]["type"] = "invalid_type"
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
impersonated_credentials.Credentials.from_impersonated_service_account_info(
info
)
assert excinfo.match(
"source credential of type {} is not supported".format("invalid_type")
)

def test_from_impersonated_service_account_info_with_invalid_impersonation_url(
self
):
info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO)
info["service_account_impersonation_url"] = "invalid_url"
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
impersonated_credentials.Credentials.from_impersonated_service_account_info(
info
)
assert excinfo.match(r"Cannot extract target principal from")

def test_get_cred_info(self):
credentials = self.make_credentials()
assert not credentials.get_cred_info()
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.