Skip to content

Commit

Permalink
#3 refactor secret storage (#9)
Browse files Browse the repository at this point in the history
* Refactored Secrets Store to only manage key/value pairs.
* Added test case to verify that plain access to secret store fails.
  • Loading branch information
ckunki authored Oct 13, 2023
1 parent 828c8b7 commit 242f4b3
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 92 deletions.
2 changes: 2 additions & 0 deletions doc/changes/changes_0.0.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ This release adds the initial implementation of the secret store

* #1: Added secret store
* #4: Added python toolbox
* #7: Refactored secret store to store simple key/value pairs
* #5: Added test case to verify that plain access to secret store fails
71 changes: 14 additions & 57 deletions exasol/secret_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,7 @@


_logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class Table:
name: str
columns: List[str]


SECRETS_TABLE = Table("secrets", ["user", "password"])
CONFIG_ITEMS_TABLE = Table("config_items", ["item"])


@dataclass(frozen=True)
class Credentials:
user: str
password: str
TABLE_NAME = "secrets"


class InvalidPassword(Exception):
Expand Down Expand Up @@ -57,15 +42,9 @@ def _initialize(self, db_file_found: bool) -> None:
if db_file_found:
self._verify_access()
return

def create_table(table: Table) -> None:
_logger.info(f'Creating table "{table.name}".')
columns = " ,".join(table.columns)
with self._cursor() as cur:
cur.execute(f"CREATE TABLE {table.name} (key, {columns})")

for table in (SECRETS_TABLE, CONFIG_ITEMS_TABLE):
create_table(table)
_logger.info(f'Creating table "{TABLE_NAME}".')
with self._cursor() as cur:
cur.execute(f"CREATE TABLE {TABLE_NAME} (key TEXT, value TEXT PRIMARY KEY)")

def _use_master_password(self) -> None:
"""
Expand Down Expand Up @@ -107,29 +86,23 @@ def _cursor(self) -> sqlcipher.Cursor:
finally:
cur.close()

def _save_data(self, table: Table, key: str, data: List[str]) -> "Secrets":
def save(self, key: str, value: str) -> "Secrets":
"""key represents a system, service, or application"""
def entry_exists(cur) -> None:
res = cur.execute(
f"SELECT * FROM {table.name} WHERE key=?",
f"SELECT * FROM {TABLE_NAME} WHERE key=?",
[key])
return res and res.fetchone()

def update(cur) -> None:
columns = ", ".join(f"{c}=?" for c in table.columns)
cur.execute(
f"UPDATE {table.name} SET {columns} WHERE key=?",
data + [key])
f"UPDATE {TABLE_NAME} SET value=? WHERE key=?",
[value, key])

def insert(cur) -> None:
columns = ",".join(table.columns)
value_slots = ", ".join("?" for c in table.columns)
cur.execute(
(
f"INSERT INTO {table.name}"
f" (key,{columns})"
f" VALUES (?, {value_slots})"
),
[key] + data)
f"INSERT INTO {TABLE_NAME} (key,value) VALUES (?, ?)",
[key, value])

with self._cursor() as cur:
if entry_exists(cur):
Expand All @@ -138,26 +111,10 @@ def insert(cur) -> None:
insert(cur)
return self

def save(self, key: str, data: Union[str, Credentials]) -> "Secrets":
"""key represents a system, service, or application"""
if isinstance(data, str):
return self._save_data(CONFIG_ITEMS_TABLE, key, [data])
if isinstance(data, Credentials):
return self._save_data(SECRETS_TABLE, key, [data.user, data.password])
raise Exception("Unsupported type of data: " + type(data).__name__)

def _data(self, table: Table, key: str) -> Optional[List[str]]:
columns = ", ".join(table.columns)
def get(self, key: str) -> Optional[List[str]]:
with self._cursor() as cur:
res = cur.execute(
f"SELECT {columns} FROM {table.name} WHERE key=?",
f"SELECT value FROM {TABLE_NAME} WHERE key=?",
[key])
return res.fetchone() if res else None

def credentials(self, key: str) -> Optional[Credentials]:
row = self._data(SECRETS_TABLE, key)
return Credentials(row[0], row[1]) if row else None

def config(self, key: str) -> Optional[str]:
row = self._data(CONFIG_ITEMS_TABLE, key)
row = res.fetchone() if res else None
return row[0] if row else None
65 changes: 30 additions & 35 deletions tests/test_secret_store.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,51 @@
import os
import pytest
from exasol.secret_store import Credentials, InvalidPassword, Secrets
import sqlite3
from exasol.secret_store import InvalidPassword, Secrets
from sqlcipher3 import dbapi2 as sqlcipher


def test_no_database_file(secrets):
assert not os.path.exists(secrets.db_file)


def test_database_file_from_credentials(secrets):
assert secrets.credentials("a") is None
assert os.path.exists(secrets.db_file)
assert not secrets.db_file.exists()


def test_database_file_from_config_item(secrets):
assert secrets.config("a") is None
assert os.path.exists(secrets.db_file)
def test_database_file_created(secrets):
assert secrets.get("any_key") is None
assert secrets.db_file.exists()


def test_credentials(secrets):
credentials = Credentials("user", "password")
secrets.save("key", credentials).close()
assert secrets.credentials("key") == credentials
def test_value(secrets):
value = "my value"
secrets.save("key", value).close()
assert secrets.get("key") == value


def test_config_item(secrets):
config_item = "some configuration"
secrets.save("key", config_item).close()
assert secrets.config("key") == config_item


def test_update_credentials(secrets):
initial = Credentials("user", "password")
secrets.save("key", initial).close()
other = Credentials("other", "changed")
secrets.save("key", other)
secrets.close()
assert secrets.credentials("key") == other


def test_update_config_item(secrets):
def test_update(secrets):
initial = "initial value"
secrets.save("key", initial).close()
other = "other value"
secrets.save("key", other).close()
assert secrets.config("key") == other
assert secrets.get("key") == other


def test_wrong_password(sample_file):
secrets = Secrets(sample_file, "correct password")
secrets.save("key", Credentials("usr", "pass")).close()
secrets.save("key", "my value").close()
invalid = Secrets(sample_file, "wrong password")
with pytest.raises(InvalidPassword) as ex:
invalid.credentials("key")
invalid.get("key")
assert "master password is incorrect" in str(ex.value)


def test_plain_access_fails(sample_file):
"""
This test sets up a secret store, secured by a master password and
verifies that plain access to the secret store using sqlite3 without
encryption raises a DatabaseError.
"""
secrets = Secrets(sample_file, "correct password")
secrets.save("key", "my value").close()
con = sqlite3.connect(sample_file)
cur = con.cursor()
with pytest.raises(sqlite3.DatabaseError) as ex:
res = cur.execute("SELECT * FROM sqlite_master")
cur.close()
assert str(ex.value) == "file is not a database"

0 comments on commit 242f4b3

Please sign in to comment.