-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
211 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,4 @@ | ||
tzdata | ||
tzdata | ||
cryptography | ||
python-dotenv | ||
PyYAML |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
TEST_VARIABLE=test_value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,5 @@ | ||
from .backoff import exponential_delay | ||
from .circuit_breaker import CircuitBreaker | ||
from .config import config_var, load_config, load_env | ||
from .file_operations import debug_file_path | ||
from .logging import setup_logging |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import logging | ||
import os | ||
|
||
from dotenv import load_dotenv | ||
|
||
from .crypto import decrypt_fernet_str | ||
from .exceptions import EncryptedValueError, MissingVaultKey | ||
from .yaml import load_and_decrypt_yaml | ||
|
||
# Sentinel value to represent an unset default | ||
UNSET_DEFAULT = object() | ||
|
||
|
||
def config_var(name: str, default: any = UNSET_DEFAULT) -> any: | ||
""" | ||
Get a config variable from the service object or environment, prefer environment | ||
""" | ||
value = default | ||
|
||
if name in os.environ: | ||
value = os.environ[name] | ||
# else: | ||
# config = getattr(SVCObj().svc, "config", None) | ||
# if config is not None: | ||
# if name in config: | ||
# value = config[name] | ||
# elif name.lower() in config: | ||
# value = config[name.lower()] | ||
|
||
if value is UNSET_DEFAULT: | ||
raise ValueError(f"Config variable {name} not set and no default provided") | ||
|
||
if isinstance(value, bytes): | ||
value = _attempt_decode(name, value) | ||
|
||
if isinstance(value, str): | ||
value = _normalize_config_string(name, value) | ||
|
||
return value | ||
|
||
|
||
def _normalize_config_string(name, value): | ||
if value.lower() == "true": | ||
value = True | ||
elif value.lower() == "false": | ||
value = False | ||
elif value.startswith("!secret"): | ||
value = extract_secret(name, value) | ||
return value | ||
|
||
|
||
def _attempt_decode(name, value): | ||
try: | ||
value = value.decode() | ||
except Exception as e: | ||
logging.error(f"Error decoding config var {name}: {e}") | ||
raise e | ||
return value | ||
|
||
|
||
def extract_secret(name, value): | ||
vault_key = os.getenv("VAULT_KEY") | ||
if vault_key is None: | ||
raise MissingVaultKey("Encrypted Config encountered with no VAULT_KEY environment variable set") | ||
else: | ||
try: | ||
value = _decrypt_cfg_var(os.getenv("VAULT_KEY"), value) | ||
except Exception as e: | ||
logging.error(f"Error decrypting config var {name}: {e}") | ||
raise EncryptedValueError(f"Error decrypting config var {name}: {e}") from e | ||
return value | ||
|
||
|
||
def _decrypt_cfg_var(key, value): | ||
value = value.removeprefix("!secret ") | ||
return decrypt_fernet_str(key, value) | ||
|
||
|
||
def load_config(decryption_key=None, config_file=None) -> dict: | ||
""" | ||
Loads a YAML configuration file, decrypting values tagged with "!secret" using the provided decryption key. | ||
The function registers a custom constructor to handle decryption of values tagged with "!secret" in the YAML file. | ||
If the decryption_key is not provided, the function uses the secret_constructor without the key. | ||
If the config_file is not provided, the function looks for the file path in the "CONFIG" environment variable. | ||
:param decryption_key: Optional; a string representing the Fernet decryption key to be used for decryption. | ||
If not provided, the constructor will attempt to use the environment variable "VAULT_KEY". | ||
:param config_file: Optional; a string representing the path to the YAML configuration file to be loaded. | ||
If not provided, the function will look for the path in the "CONFIG" environment variable. | ||
:return: A dictionary representing the contents of the loaded YAML file, with encrypted values decrypted | ||
:raises Exception: If neither the config_file parameter nor the "CONFIG" environment variable is set. | ||
""" | ||
if decryption_key is None: | ||
decryption_key = os.getenv("VAULT_KEY", None) | ||
if config_file is None: | ||
config_file = os.getenv("CONFIG_FILE") | ||
if config_file is None: | ||
raise Exception("CONFIG_FILE not set") | ||
|
||
return load_and_decrypt_yaml(decryption_key, config_file) | ||
|
||
|
||
def load_env(env_file=".env", required=False): | ||
if os.path.exists(env_file): | ||
load_dotenv(env_file) | ||
else: | ||
if required: | ||
raise FileNotFoundError(f"Environment file {env_file} not found") | ||
else: | ||
print(f"Warning: environment file {env_file} not found") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from cryptography.fernet import Fernet | ||
|
||
from .helpers import want_bytes | ||
|
||
|
||
def decrypt_fernet_str(key: str | bytes, value: str | bytes) -> str: | ||
f = Fernet(want_bytes(key)) | ||
return f.decrypt(want_bytes(value)).decode() | ||
|
||
|
||
def encrypt_fernet_str(key: str | bytes, value: str | bytes) -> str: | ||
f = Fernet(want_bytes(key)) | ||
return f.encrypt(want_bytes(value)).decode() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import os | ||
|
||
import yaml | ||
from cryptography.fernet import Fernet | ||
|
||
from zsuite.helpers import want_bytes | ||
|
||
from .exceptions import MissingVaultKey | ||
|
||
|
||
def secret_constructor(decryption_key: str | bytes = None): | ||
"""Creates and returns a custom YAML constructor function for decrypting values tagged with "!secret". | ||
This function returns a constructor that can be used with a YAML loader to decrypt scalar values tagged with | ||
"!secret" in a YAML file. If a decryption_key is provided, it will be used to decrypt the values. If not, | ||
the function will attempt to use the "VAULT_KEY" environment variable as the decryption key. | ||
:param decryption_key: Optional; the Fernet decryption key to be used for decryption. | ||
If not provided, the returned constructor will use the "VAULT_KEY" environment variable. | ||
:return: A constructor function that takes a loader and a node, decrypts the value if it's tagged with "!secret", | ||
and returns the decrypted or original value. | ||
:raises ValueError: If a value tagged with "!secret" is encountered but no decryption key is provided or found. | ||
""" | ||
decryption_key = want_bytes(decryption_key) | ||
|
||
def _constructor(loader, node): | ||
value = loader.construct_scalar(node) | ||
key = decryption_key if decryption_key is not None else os.getenv("VAULT_KEY") | ||
if key is None: | ||
raise ValueError("Encountered encrypted value, but no decryption key was provided.") | ||
f = Fernet(key) | ||
return f.decrypt(value.encode()).decode() | ||
|
||
return _constructor | ||
|
||
|
||
def load_and_decrypt_yaml(decryption_key=None, file_path=None): | ||
"""Loads and parses a YAML file, decrypting values tagged with "!secret" using the provided decryption key. | ||
:param decryption_key: Optional; String representing the Fernet decryption key to be used for decryption. | ||
:param file_path: String representing the path to the YAML file to be loaded. | ||
:return: A dictionary representing the contents of the loaded YAML file, with encrypted values decrypted. | ||
""" | ||
try: | ||
# First, read the file without applying any constructors to check for encrypted values | ||
with open(file_path) as f: | ||
file_content = f.read() | ||
if "!secret" in file_content: | ||
if decryption_key is None: | ||
if "VAULT_KEY" in os.environ: | ||
decryption_key = os.environ["VAULT_KEY"] | ||
else: | ||
raise MissingVaultKey("Encrypted value found but no decryption key provided.") | ||
yaml.constructor.SafeConstructor.add_constructor("!secret", secret_constructor(decryption_key)) | ||
with open(file_path) as f: | ||
return yaml.safe_load(f) | ||
|
||
except FileNotFoundError: | ||
raise FileNotFoundError(f"File {file_path} not found") from None | ||
except NotADirectoryError: | ||
raise FileNotFoundError(f"File {file_path} directory not found") from None | ||
except MissingVaultKey: | ||
raise | ||
except Exception as e: | ||
raise Exception(f"Error loading YAML file {file_path}: {e}") from e |