Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: add access token credentials #476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions 60 google/auth/_cloud_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import os
import subprocess

import six

from google.auth import environment_vars
import google.oauth2.credentials
from google.auth import exceptions


# The ~/.config subdirectory containing gcloud credentials.
Expand All @@ -34,6 +36,8 @@
_CLOUD_SDK_WINDOWS_COMMAND = "gcloud.cmd"
# The command to get the Cloud SDK configuration
_CLOUD_SDK_CONFIG_COMMAND = ("config", "config-helper", "--format", "json")
# The command to get google user access token
_CLOUD_SDK_USER_ACCESS_TOKEN_COMMAND = ("auth", "print-access-token")
# Cloud SDK's application-default client ID
CLOUD_SDK_CLIENT_ID = (
"764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com"
Expand Down Expand Up @@ -80,21 +84,6 @@ def get_application_default_credentials_path():
return os.path.join(config_path, _CREDENTIALS_FILENAME)


def load_authorized_user_credentials(info):
"""Loads an authorized user credential.

Args:
info (Mapping[str, str]): The loaded file's data.

Returns:
google.oauth2.credentials.Credentials: The constructed credentials.

Raises:
ValueError: if the info is in the wrong format or missing data.
"""
return google.oauth2.credentials.Credentials.from_authorized_user_info(info)


def get_project_id():
"""Gets the project ID from the Cloud SDK.

Expand Down Expand Up @@ -122,3 +111,42 @@ def get_project_id():
return configuration["configuration"]["properties"]["core"]["project"]
except KeyError:
return None


def get_auth_access_token(account=None):
"""Load user access token with the ``gcloud auth print-access-token`` command.

Args:
account (Optional[str]): Account to get the access token for. If not
specified, the current active account will be used.

Returns:
str: The user access token.

Raises:
google.auth.exceptions.UserAccessTokenError: if failed to get access
token from gcloud.
"""
if os.name == "nt":
command = _CLOUD_SDK_WINDOWS_COMMAND
else:
command = _CLOUD_SDK_POSIX_COMMAND

try:
if account:
command = (
(command,)
+ _CLOUD_SDK_USER_ACCESS_TOKEN_COMMAND
+ ("--account=" + account,)
)
else:
command = (command,) + _CLOUD_SDK_USER_ACCESS_TOKEN_COMMAND

access_token = subprocess.check_output(command, stderr=subprocess.STDOUT)
# remove the trailing "\n"
return access_token.decode("utf-8").strip()
except (subprocess.CalledProcessError, OSError, IOError) as caught_exc:
new_exc = exceptions.UserAccessTokenError(
"Failed to obtain access token", caught_exc
)
six.raise_from(new_exc, caught_exc)
4 changes: 2 additions & 2 deletions 4 google/auth/_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ def _load_credentials_from_file(filename):
credential_type = info.get("type")

if credential_type == _AUTHORIZED_USER_TYPE:
from google.auth import _cloud_sdk
from google.oauth2 import credentials

try:
credentials = _cloud_sdk.load_authorized_user_credentials(info)
credentials = credentials.Credentials.from_authorized_user_info(info)
except ValueError as caught_exc:
msg = "Failed to load authorized user credentials from {}".format(filename)
new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
Expand Down
4 changes: 4 additions & 0 deletions 4 google/auth/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,9 @@ class RefreshError(GoogleAuthError):
failed."""


class UserAccessTokenError(GoogleAuthError):
"""Used to indicate ``gcloud auth print-access-token`` command failed."""


class DefaultCredentialsError(GoogleAuthError):
"""Used to indicate that acquiring default credentials failed."""
48 changes: 48 additions & 0 deletions 48 google/oauth2/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import six

from google.auth import _cloud_sdk
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
Expand Down Expand Up @@ -292,3 +293,50 @@ def to_json(self, strip=None):
prep = {k: v for k, v in prep.items() if k not in strip}

return json.dumps(prep)


class UserAccessTokenCredentials(credentials.Credentials):
"""Access token credentials for user account.

Obtain the access token for a given user account or the current active
user account with the ``gcloud auth print-access-token`` command.

Args:
account (Optional[str]): Account to get the access token for. If not
specified, the current active account will be used.
"""

def __init__(self, account=None):
super(UserAccessTokenCredentials, self).__init__()
self._account = account

def with_account(self, account):
"""Create a new instance with the given account.

Args:
account (str): Account to get the access token for.

Returns:
google.oauth2.credentials.UserAccessTokenCredentials: The created
credentials with the given account.
"""
return self.__class__(account=account)

def refresh(self, request):
arithmetic1728 marked this conversation as resolved.
Show resolved Hide resolved
"""Refreshes the access token.

Args:
request (google.auth.transport.Request): This argument is required
by the base class interface but not used in this implementation,
so just set it to `None`.

Raises:
google.auth.exceptions.UserAccessTokenError: If the access token
refresh failed.
"""
self.token = _cloud_sdk.get_auth_access_token(self._account)

@_helpers.copy_docstring(credentials.Credentials)
def before_request(self, request, method, url, headers):
self.refresh(request)
self.apply(headers)
7 changes: 7 additions & 0 deletions 7 system_tests/test_mtls_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import json
from os import path
import time

import google.auth
import google.auth.credentials
Expand Down Expand Up @@ -42,6 +43,9 @@ def test_requests():
# supposed to be created.
assert authed_session.is_mtls == check_context_aware_metadata()

# Sleep 1 second to avoid 503 error.
time.sleep(1)

if authed_session.is_mtls:
response = authed_session.get(MTLS_ENDPOINT.format(project_id))
else:
Expand All @@ -63,6 +67,9 @@ def test_urllib3():
# supposed to be created.
assert is_mtls == check_context_aware_metadata()

# Sleep 1 second to avoid 503 error.
time.sleep(1)

if is_mtls:
response = authed_http.request("GET", MTLS_ENDPOINT.format(project_id))
else:
Expand Down
28 changes: 28 additions & 0 deletions 28 tests/oauth2/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,31 @@ def test_unpickle_old_credentials_pickle(self):
) as f:
credentials = pickle.load(f)
assert credentials.quota_project_id is None


class TestUserAccessTokenCredentials(object):
def test_instance(self):
cred = credentials.UserAccessTokenCredentials()
assert cred._account is None

cred = cred.with_account("account")
assert cred._account == "account"

@mock.patch("google.auth._cloud_sdk.get_auth_access_token", autospec=True)
def test_refresh(self, get_auth_access_token):
get_auth_access_token.return_value = "access_token"
cred = credentials.UserAccessTokenCredentials()
cred.refresh(None)
assert cred.token == "access_token"

@mock.patch(
"google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True
)
@mock.patch(
"google.oauth2.credentials.UserAccessTokenCredentials.refresh", autospec=True
)
def test_before_request(self, refresh, apply):
cred = credentials.UserAccessTokenCredentials()
cred.before_request(mock.Mock(), "GET", "https://example.com", {})
refresh.assert_called()
apply.assert_called()
40 changes: 25 additions & 15 deletions 40 tests/test__cloud_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from google.auth import _cloud_sdk
from google.auth import environment_vars
import google.oauth2.credentials
from google.auth import exceptions


DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
Expand Down Expand Up @@ -137,23 +137,33 @@ def test_get_config_path_no_appdata(monkeypatch):
assert os.path.split(config_path) == ("G:/\\", _cloud_sdk._CONFIG_DIRECTORY)


def test_load_authorized_user_credentials():
credentials = _cloud_sdk.load_authorized_user_credentials(AUTHORIZED_USER_FILE_DATA)
@mock.patch("os.name", new="nt")
@mock.patch("subprocess.check_output", autospec=True)
def test_get_auth_access_token_windows(check_output):
check_output.return_value = b"access_token\n"

token = _cloud_sdk.get_auth_access_token()
assert token == "access_token"
check_output.assert_called_with(
("gcloud.cmd", "auth", "print-access-token"), stderr=subprocess.STDOUT
)


assert isinstance(credentials, google.oauth2.credentials.Credentials)
@mock.patch("subprocess.check_output", autospec=True)
def test_get_auth_access_token_with_account(check_output):
check_output.return_value = b"access_token\n"

assert credentials.token is None
assert credentials._refresh_token == AUTHORIZED_USER_FILE_DATA["refresh_token"]
assert credentials._client_id == AUTHORIZED_USER_FILE_DATA["client_id"]
assert credentials._client_secret == AUTHORIZED_USER_FILE_DATA["client_secret"]
assert (
credentials._token_uri
== google.oauth2.credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
token = _cloud_sdk.get_auth_access_token(account="account")
assert token == "access_token"
check_output.assert_called_with(
("gcloud", "auth", "print-access-token", "--account=account"),
stderr=subprocess.STDOUT,
)


def test_load_authorized_user_credentials_bad_format():
with pytest.raises(ValueError) as excinfo:
_cloud_sdk.load_authorized_user_credentials({})
@mock.patch("subprocess.check_output", autospec=True)
def test_get_auth_access_token_with_exception(check_output):
check_output.side_effect = OSError()

assert excinfo.match(r"missing fields")
with pytest.raises(exceptions.UserAccessTokenError):
_cloud_sdk.get_auth_access_token(account="account")
Morty Proxy This is a proxified and sanitized view of the page, visit original site.