-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feat: Add singular
fetch_connector_secret()
for `GoogleGSMSecretMan…
…ager`, fix GSM edge cases, and add CI tests (#191)
- Loading branch information
1 parent
77497c5
commit 97119c9
Showing
3 changed files
with
105 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
"""Tests for the GSM secrets manager.""" | ||
from __future__ import annotations | ||
|
||
from airbyte.secrets.base import SecretHandle | ||
from airbyte.secrets.google_gsm import GoogleGSMSecretManager | ||
|
||
|
||
def test_get_gsm_secret(ci_secret_manager: GoogleGSMSecretManager) -> dict: | ||
assert ci_secret_manager.get_secret( | ||
"SECRET_DESTINATION_DUCKDB__MOTHERDUCK__CREDS", | ||
).parse_json() | ||
|
||
|
||
def test_get_gsm_secrets_with_filter(ci_secret_manager: GoogleGSMSecretManager) -> None: | ||
"""Test fetching connector secrets.""" | ||
secrets = ci_secret_manager.fetch_secrets( | ||
filter_string="labels.connector=source-bigquery", | ||
) | ||
assert secrets is not None | ||
secrets_list = list(secrets) | ||
assert len(secrets_list) > 0 | ||
assert secrets_list[0].get_value().is_json() | ||
|
||
|
||
def test_get_gsm_secrets_by_label(ci_secret_manager: GoogleGSMSecretManager) -> None: | ||
"""Test fetching connector secrets.""" | ||
secrets = ci_secret_manager.fetch_secrets_by_label( | ||
label_key="connector", | ||
label_value="source-salesforce", | ||
) | ||
assert secrets is not None | ||
secrets_list = list(secrets) | ||
assert len(secrets_list) > 0 | ||
assert secrets_list[0].get_value().is_json() | ||
|
||
|
||
def test_get_connector_secrets(ci_secret_manager: GoogleGSMSecretManager) -> None: | ||
"""Test fetching connector secrets.""" | ||
secrets = ci_secret_manager.fetch_connector_secrets( | ||
"source-salesforce" | ||
) | ||
assert secrets is not None | ||
secrets_list = list(secrets) | ||
assert len(secrets_list) > 0 | ||
assert secrets_list[0].get_value().is_json() | ||
|
||
|
||
def test_first_connector_secret(ci_secret_manager: GoogleGSMSecretManager) -> None: | ||
"""Test fetching connector secrets.""" | ||
secret = ci_secret_manager.fetch_connector_secret( | ||
"source-salesforce" | ||
) | ||
assert secret is not None | ||
assert isinstance(secret, SecretHandle) | ||
assert secret.get_value().is_json() |