From 242f4b3ba77663e9f934f665085a0b58d24c9083 Mon Sep 17 00:00:00 2001 From: Christoph Kuhnke Date: Fri, 13 Oct 2023 09:19:19 +0200 Subject: [PATCH] #3 refactor secret storage (#9) * Refactored Secrets Store to only manage key/value pairs. * Added test case to verify that plain access to secret store fails. --- doc/changes/changes_0.0.1.md | 2 + exasol/secret_store.py | 71 +++++++----------------------------- tests/test_secret_store.py | 65 +++++++++++++++------------------ 3 files changed, 46 insertions(+), 92 deletions(-) diff --git a/doc/changes/changes_0.0.1.md b/doc/changes/changes_0.0.1.md index 445b992..b48a1ff 100644 --- a/doc/changes/changes_0.0.1.md +++ b/doc/changes/changes_0.0.1.md @@ -8,3 +8,5 @@ This release adds the initial implementation of the secret store * #1: Added secret store * #4: Added python toolbox +* #7: Refactored secret store to store simple key/value pairs +* #5: Added test case to verify that plain access to secret store fails diff --git a/exasol/secret_store.py b/exasol/secret_store.py index b43cbd0..0f7c292 100644 --- a/exasol/secret_store.py +++ b/exasol/secret_store.py @@ -10,22 +10,7 @@ _logger = logging.getLogger(__name__) - - -@dataclass(frozen=True) -class Table: - name: str - columns: List[str] - - -SECRETS_TABLE = Table("secrets", ["user", "password"]) -CONFIG_ITEMS_TABLE = Table("config_items", ["item"]) - - -@dataclass(frozen=True) -class Credentials: - user: str - password: str +TABLE_NAME = "secrets" class InvalidPassword(Exception): @@ -57,15 +42,9 @@ def _initialize(self, db_file_found: bool) -> None: if db_file_found: self._verify_access() return - - def create_table(table: Table) -> None: - _logger.info(f'Creating table "{table.name}".') - columns = " ,".join(table.columns) - with self._cursor() as cur: - cur.execute(f"CREATE TABLE {table.name} (key, {columns})") - - for table in (SECRETS_TABLE, CONFIG_ITEMS_TABLE): - create_table(table) + _logger.info(f'Creating table "{TABLE_NAME}".') + with self._cursor() as cur: + cur.execute(f"CREATE TABLE {TABLE_NAME} (key TEXT, value TEXT PRIMARY KEY)") def _use_master_password(self) -> None: """ @@ -107,29 +86,23 @@ def _cursor(self) -> sqlcipher.Cursor: finally: cur.close() - def _save_data(self, table: Table, key: str, data: List[str]) -> "Secrets": + def save(self, key: str, value: str) -> "Secrets": + """key represents a system, service, or application""" def entry_exists(cur) -> None: res = cur.execute( - f"SELECT * FROM {table.name} WHERE key=?", + f"SELECT * FROM {TABLE_NAME} WHERE key=?", [key]) return res and res.fetchone() def update(cur) -> None: - columns = ", ".join(f"{c}=?" for c in table.columns) cur.execute( - f"UPDATE {table.name} SET {columns} WHERE key=?", - data + [key]) + f"UPDATE {TABLE_NAME} SET value=? WHERE key=?", + [value, key]) def insert(cur) -> None: - columns = ",".join(table.columns) - value_slots = ", ".join("?" for c in table.columns) cur.execute( - ( - f"INSERT INTO {table.name}" - f" (key,{columns})" - f" VALUES (?, {value_slots})" - ), - [key] + data) + f"INSERT INTO {TABLE_NAME} (key,value) VALUES (?, ?)", + [key, value]) with self._cursor() as cur: if entry_exists(cur): @@ -138,26 +111,10 @@ def insert(cur) -> None: insert(cur) return self - def save(self, key: str, data: Union[str, Credentials]) -> "Secrets": - """key represents a system, service, or application""" - if isinstance(data, str): - return self._save_data(CONFIG_ITEMS_TABLE, key, [data]) - if isinstance(data, Credentials): - return self._save_data(SECRETS_TABLE, key, [data.user, data.password]) - raise Exception("Unsupported type of data: " + type(data).__name__) - - def _data(self, table: Table, key: str) -> Optional[List[str]]: - columns = ", ".join(table.columns) + def get(self, key: str) -> Optional[List[str]]: with self._cursor() as cur: res = cur.execute( - f"SELECT {columns} FROM {table.name} WHERE key=?", + f"SELECT value FROM {TABLE_NAME} WHERE key=?", [key]) - return res.fetchone() if res else None - - def credentials(self, key: str) -> Optional[Credentials]: - row = self._data(SECRETS_TABLE, key) - return Credentials(row[0], row[1]) if row else None - - def config(self, key: str) -> Optional[str]: - row = self._data(CONFIG_ITEMS_TABLE, key) + row = res.fetchone() if res else None return row[0] if row else None diff --git a/tests/test_secret_store.py b/tests/test_secret_store.py index 6bf112f..f8b7566 100644 --- a/tests/test_secret_store.py +++ b/tests/test_secret_store.py @@ -1,56 +1,51 @@ -import os import pytest -from exasol.secret_store import Credentials, InvalidPassword, Secrets +import sqlite3 +from exasol.secret_store import InvalidPassword, Secrets from sqlcipher3 import dbapi2 as sqlcipher - def test_no_database_file(secrets): - assert not os.path.exists(secrets.db_file) - - -def test_database_file_from_credentials(secrets): - assert secrets.credentials("a") is None - assert os.path.exists(secrets.db_file) + assert not secrets.db_file.exists() -def test_database_file_from_config_item(secrets): - assert secrets.config("a") is None - assert os.path.exists(secrets.db_file) +def test_database_file_created(secrets): + assert secrets.get("any_key") is None + assert secrets.db_file.exists() -def test_credentials(secrets): - credentials = Credentials("user", "password") - secrets.save("key", credentials).close() - assert secrets.credentials("key") == credentials +def test_value(secrets): + value = "my value" + secrets.save("key", value).close() + assert secrets.get("key") == value -def test_config_item(secrets): - config_item = "some configuration" - secrets.save("key", config_item).close() - assert secrets.config("key") == config_item - - -def test_update_credentials(secrets): - initial = Credentials("user", "password") - secrets.save("key", initial).close() - other = Credentials("other", "changed") - secrets.save("key", other) - secrets.close() - assert secrets.credentials("key") == other - - -def test_update_config_item(secrets): +def test_update(secrets): initial = "initial value" secrets.save("key", initial).close() other = "other value" secrets.save("key", other).close() - assert secrets.config("key") == other + assert secrets.get("key") == other def test_wrong_password(sample_file): secrets = Secrets(sample_file, "correct password") - secrets.save("key", Credentials("usr", "pass")).close() + secrets.save("key", "my value").close() invalid = Secrets(sample_file, "wrong password") with pytest.raises(InvalidPassword) as ex: - invalid.credentials("key") + invalid.get("key") assert "master password is incorrect" in str(ex.value) + + +def test_plain_access_fails(sample_file): + """ + This test sets up a secret store, secured by a master password and + verifies that plain access to the secret store using sqlite3 without + encryption raises a DatabaseError. + """ + secrets = Secrets(sample_file, "correct password") + secrets.save("key", "my value").close() + con = sqlite3.connect(sample_file) + cur = con.cursor() + with pytest.raises(sqlite3.DatabaseError) as ex: + res = cur.execute("SELECT * FROM sqlite_master") + cur.close() + assert str(ex.value) == "file is not a database"