Skip to content

Commit

Permalink
feat(auth): Update get_client_ssl_credentials to support X.509 worklo…
Browse files Browse the repository at this point in the history
…ad certs (#1558)

* feat(auth): Update get_client_ssl_credentials to support X.509 workload certs

* feat(auth): Update has_default_client_cert_source

* feat(auth): Fix formatting

* feat(auth): Fix test__mtls_helper.py

* feat(auth): Fix function name in tests

* chore: Refresh system test creds.

* feat(auth): Fix style

* feat(auth): Fix casing

* feat(auth): Fix linter issue

* feat(auth): Fix coverage issue

---------

Co-authored-by: Carl Lundin <[email protected]>
Co-authored-by: Carl Lundin <[email protected]>
  • Loading branch information
3 people authored Aug 7, 2024
1 parent f858a15 commit 18c2ec1
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 70 deletions.
40 changes: 29 additions & 11 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from google.auth import exceptions

CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
_CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG"
_CERT_PROVIDER_COMMAND = "cert_provider_command"
_CERT_REGEX = re.compile(
Expand All @@ -48,21 +48,21 @@
)


def _check_dca_metadata_path(metadata_path):
"""Checks for context aware metadata. If it exists, returns the absolute path;
def _check_config_path(config_path):
"""Checks for config file path. If it exists, returns the absolute path with user expansion;
otherwise returns None.
Args:
metadata_path (str): context aware metadata path.
config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example
Returns:
str: absolute path if exists and None otherwise.
"""
metadata_path = path.expanduser(metadata_path)
if not path.exists(metadata_path):
_LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path)
config_path = path.expanduser(config_path)
if not path.exists(config_path):
_LOGGER.debug("%s is not found.", config_path)
return None
return metadata_path
return config_path


def _load_json_file(path):
Expand Down Expand Up @@ -136,7 +136,7 @@ def _get_cert_config_path(certificate_config_path=None):
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH

certificate_config_path = path.expanduser(certificate_config_path)
if not path.exists(certificate_config_path):
Expand Down Expand Up @@ -279,14 +279,22 @@ def _run_cert_provider_command(command, expect_encrypted_key=False):
def get_client_ssl_credentials(
generate_encrypted_key=False,
context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
certificate_config_path=CERTIFICATE_CONFIGURATION_DEFAULT_PATH,
):
"""Returns the client side certificate, private key and passphrase.
We look for certificates and keys with the following order of priority:
1. Certificate and key specified by certificate_config.json.
Currently, only X.509 workload certificates are supported.
2. Certificate and key specified by context aware metadata (i.e. SecureConnect).
Args:
generate_encrypted_key (bool): If set to True, encrypted private key
and passphrase will be generated; otherwise, unencrypted private key
will be generated and passphrase will be None.
will be generated and passphrase will be None. This option only
affects keys obtained via context_aware_metadata.json.
context_aware_metadata_path (str): The context_aware_metadata.json file path.
certificate_config_path (str): The certificate_config.json file path.
Returns:
Tuple[bool, bytes, bytes, bytes]:
Expand All @@ -297,7 +305,17 @@ def get_client_ssl_credentials(
google.auth.exceptions.ClientCertError: if problems occurs when getting
the cert, key and passphrase.
"""
metadata_path = _check_dca_metadata_path(context_aware_metadata_path)

# 1. Check for certificate config json.
cert_config_path = _check_config_path(certificate_config_path)
if cert_config_path:
# Attempt to retrieve X.509 Workload cert and key.
cert, key = _get_workload_cert_and_key(cert_config_path)
if cert and key:
return True, cert, key, None

# 2. Check for context aware metadata json
metadata_path = _check_config_path(context_aware_metadata_path)

if metadata_path:
metadata_json = _load_json_file(metadata_path)
Expand Down
2 changes: 1 addition & 1 deletion google/auth/transport/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def __init__(self):
self._is_mtls = False
else:
# Load client SSL credentials.
metadata_path = _mtls_helper._check_dca_metadata_path(
metadata_path = _mtls_helper._check_config_path(
_mtls_helper.CONTEXT_AWARE_METADATA_PATH
)
self._is_mtls = metadata_path is not None
Expand Down
17 changes: 13 additions & 4 deletions google/auth/transport/mtls.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,19 @@ def has_default_client_cert_source():
Returns:
bool: indicating if the default client cert source exists.
"""
metadata_path = _mtls_helper._check_dca_metadata_path(
_mtls_helper.CONTEXT_AWARE_METADATA_PATH
)
return metadata_path is not None
if (
_mtls_helper._check_config_path(_mtls_helper.CONTEXT_AWARE_METADATA_PATH)
is not None
):
return True
if (
_mtls_helper._check_config_path(
_mtls_helper.CERTIFICATE_CONFIGURATION_DEFAULT_PATH
)
is not None
):
return True
return False


def default_client_cert_source():
Expand Down
97 changes: 69 additions & 28 deletions tests/transport/test__mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ def test_key(self):
)


class TestCheckaMetadataPath(object):
class TestCheckConfigPath(object):
def test_success(self):
metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
returned_path = _mtls_helper._check_config_path(metadata_path)
assert returned_path is not None

def test_failure(self):
metadata_path = os.path.join(pytest.data_dir, "not_exists.json")
returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
returned_path = _mtls_helper._check_config_path(metadata_path)
assert returned_path is None


Expand Down Expand Up @@ -275,54 +275,92 @@ def test_popen_raise_exception(self, mock_popen):

class TestGetClientSslCredentials(object):
@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
"google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
def test_success(
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_success_with_context_aware_metadata(
self,
mock_check_dca_metadata_path,
mock_check_config_path,
mock_load_json_file,
mock_run_cert_provider_command,
mock_get_workload_cert_and_key,
):
mock_check_dca_metadata_path.return_value = True
mock_check_config_path.return_value = "/path/to/config"
mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
mock_get_workload_cert_and_key.return_value = (None, None)
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
assert has_cert
assert cert == b"cert"
assert key == b"key"
assert passphrase is None

@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
"google.auth.transport._mtls_helper._read_cert_and_key_files", autospec=True
)
def test_success_without_metadata(self, mock_check_dca_metadata_path):
mock_check_dca_metadata_path.return_value = False
@mock.patch(
"google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_success_with_certificate_config(
self,
mock_check_config_path,
mock_load_json_file,
mock_get_cert_config_path,
mock_read_cert_and_key_files,
):
cert_config_path = "/path/to/config"
mock_check_config_path.return_value = cert_config_path
mock_load_json_file.return_value = {
"cert_configs": {
"workload": {"cert_path": "cert/path", "key_path": "key/path"}
}
}
mock_get_cert_config_path.return_value = cert_config_path
mock_read_cert_and_key_files.return_value = (
pytest.public_cert_bytes,
pytest.private_key_bytes,
)

has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
assert has_cert
assert cert == pytest.public_cert_bytes
assert key == pytest.private_key_bytes
assert passphrase is None

@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_success_without_metadata(self, mock_check_config_path):
mock_check_config_path.return_value = False
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
assert not has_cert
assert cert is None
assert key is None
assert passphrase is None

@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
"google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_success_with_encrypted_key(
self,
mock_check_dca_metadata_path,
mock_check_config_path,
mock_load_json_file,
mock_run_cert_provider_command,
mock_get_workload_cert_and_key,
):
mock_check_dca_metadata_path.return_value = True
mock_check_config_path.return_value = "/path/to/config"
mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase")
mock_get_workload_cert_and_key.return_value = (None, None)
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
generate_encrypted_key=True
)
Expand All @@ -334,33 +372,36 @@ def test_success_with_encrypted_key(
["command", "--with_passphrase"], expect_encrypted_key=True
)

@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
"google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_missing_cert_command(
self, mock_check_dca_metadata_path, mock_load_json_file
self,
mock_check_config_path,
mock_load_json_file,
mock_get_workload_cert_and_key,
):
mock_check_dca_metadata_path.return_value = True
mock_check_config_path.return_value = "/path/to/config"
mock_load_json_file.return_value = {}
mock_get_workload_cert_and_key.return_value = (None, None)
with pytest.raises(exceptions.ClientCertError):
_mtls_helper.get_client_ssl_credentials()

@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_customize_context_aware_metadata_path(
self,
mock_check_dca_metadata_path,
mock_check_config_path,
mock_load_json_file,
mock_run_cert_provider_command,
):
context_aware_metadata_path = "/path/to/metata/data"
mock_check_dca_metadata_path.return_value = context_aware_metadata_path
mock_check_config_path.return_value = context_aware_metadata_path
mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", None)

Expand All @@ -372,7 +413,7 @@ def test_customize_context_aware_metadata_path(
assert cert == b"cert"
assert key == b"key"
assert passphrase is None
mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path)
mock_check_config_path.assert_called_with(context_aware_metadata_path)
mock_load_json_file.assert_called_with(context_aware_metadata_path)


Expand Down Expand Up @@ -520,7 +561,7 @@ def test_default(self, mock_path_exists):
mock_path_exists.return_value = True
returned_path = _mtls_helper._get_cert_config_path()
expected_path = os.path.expanduser(
_mtls_helper._CERTIFICATE_CONFIGURATION_DEFAULT_PATH
_mtls_helper.CERTIFICATE_CONFIGURATION_DEFAULT_PATH
)
assert returned_path == expected_path

Expand Down
Loading

0 comments on commit 18c2ec1

Please sign in to comment.