Skip to content

Commit

Permalink
db: Support dumping a time range
Browse files Browse the repository at this point in the history
  • Loading branch information
spbnick committed Nov 1, 2024
1 parent ff5ce9f commit 78f0d62
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 40 deletions.
69 changes: 63 additions & 6 deletions kcidb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from kcidb.db import abstract, schematic, mux, \
bigquery, postgresql, sqlite, json, null, misc # noqa: F401

# It's OK for now, pylint: disable=too-many-lines

# Module's logger
LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -274,7 +276,8 @@ def upgrade(self, target_version=None):
"Target schema is older than the current schema"
self.driver.upgrade(target_version)

def dump_iter(self, objects_per_report=0, with_metadata=True):
def dump_iter(self, objects_per_report=0, with_metadata=True,
after=None, until=None):
"""
Dump all data from the database in object number-limited chunks.
Expand All @@ -283,37 +286,76 @@ def dump_iter(self, objects_per_report=0, with_metadata=True):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
Returns:
An iterator returning report JSON data adhering to the current I/O
schema version, each containing at most the specified number of
objects.
Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert self.is_initialized()
assert isinstance(objects_per_report, int)
assert objects_per_report >= 0
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()
yield from self.driver.dump_iter(
objects_per_report=objects_per_report,
with_metadata=with_metadata
with_metadata=with_metadata,
after=after, until=until
)

def dump(self, with_metadata=True):
def dump(self, with_metadata=True, after=None, until=None):
"""
Dump all data from the database.
Args:
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
Returns:
The JSON data from the database adhering to the current I/O schema
version.
Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()
try:
return next(self.dump_iter(objects_per_report=0,
with_metadata=with_metadata))
with_metadata=with_metadata,
after=after, until=until))
except StopIteration:
return self.get_schema()[1].new()

Expand Down Expand Up @@ -776,13 +818,28 @@ def dump_main():
help='Do not dump metadata fields',
action='store_true'
)
parser.add_argument(
'--after',
metavar='AFTER',
type=kcidb.misc.iso_timestamp,
help="An ISO-8601 timestamp specifying the latest time the data to "
"be *excluded* from the dump should've arrived."
)
parser.add_argument(
'--until',
metavar='UNTIL',
type=kcidb.misc.iso_timestamp,
help="An ISO-8601 timestamp specifying the latest time the data to "
"be *included* into the dump should've arrived."
)
args = parser.parse_args()
client = Client(args.database)
if not client.is_initialized():
raise Exception(f"Database {args.database!r} is not initialized")
kcidb.misc.json_dump_stream(
client.dump_iter(objects_per_report=args.objects_per_report,
with_metadata=not args.without_metadata),
with_metadata=not args.without_metadata,
after=args.after, until=args.until),
sys.stdout, indent=args.indent, seq=args.seq_out
)

Expand Down
20 changes: 19 additions & 1 deletion kcidb/db/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def upgrade(self, target_version):
"Target schema is older than the current schema"

@abstractmethod
def dump_iter(self, objects_per_report, with_metadata):
def dump_iter(self, objects_per_report, with_metadata, after, until):
"""
Dump all data from the database in object number-limited chunks.
The database must be initialized.
Expand All @@ -180,15 +180,33 @@ def dump_iter(self, objects_per_report, with_metadata):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
Returns:
An iterator returning report JSON data adhering to the current
database schema's I/O schema version, each containing at most the
specified number of objects.
Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert isinstance(objects_per_report, int)
assert objects_per_report >= 0
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()

# No, it's not, pylint: disable=too-many-return-statements
Expand Down
48 changes: 41 additions & 7 deletions kcidb/db/bigquery/v04_00.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from kcidb.db.schematic import \
Schema as AbstractSchema, \
Connection as AbstractConnection
from kcidb.db.misc import NotFound
from kcidb.db.misc import NotFound, NoTimestamps
from kcidb.db.bigquery.schema import validate_json_obj_list

# We'll manage for now, pylint: disable=too-many-lines
Expand Down Expand Up @@ -749,7 +749,7 @@ def _unpack_node(cls, node, drop_null=True):
node[key] = cls._unpack_node(value)
return node

def dump_iter(self, objects_per_report, with_metadata):
def dump_iter(self, objects_per_report, with_metadata, after, until):
"""
Dump all data from the database in object number-limited chunks.
Expand All @@ -758,11 +758,25 @@ def dump_iter(self, objects_per_report, with_metadata):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
Returns:
An iterator returning report JSON data adhering to the I/O version
of the database schema, each containing at most the specified
number of objects.
Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert isinstance(objects_per_report, int)
assert objects_per_report >= 0
Expand All @@ -771,14 +785,34 @@ def dump_iter(self, objects_per_report, with_metadata):
obj_num = 0
data = self.io.new()
for obj_list_name, table_schema in self.TABLE_MAP.items():
query_string = \
"SELECT " + \
ts_field = next(
(f for f in table_schema if f.name == "_timestamp"),
None
)
if (after or until) and not ts_field:
raise NoTimestamps(
f"Table {obj_list_name!r} has no {ts_field.name!r} column"
)

query_string = (
"SELECT " +
", ".join(
f"`{f.name}`" for f in table_schema
if with_metadata or f.name[0] != '_'
) + \
f" FROM `{obj_list_name}`"
query_job = self.conn.query_create(query_string)
) +
f" FROM `{obj_list_name}`" +
((
" WHERE " + " AND ".join(
f"{ts_field.name} {op} ?"
for op, v in ((">", after), ("<=", until)) if v
)
) if (after or until) else "")
)
query_parameters = [
bigquery.ScalarQueryParameter(None, ts_field.field_type, v)
for v in (after, until) if v
]
query_job = self.conn.query_create(query_string, query_parameters)
obj_list = None
for row in query_job:
if obj_list is None:
Expand Down
4 changes: 4 additions & 0 deletions kcidb/db/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class UnsupportedSchema(Error):
"""Database schema version is not supported"""


class NoTimestamps(Error):
"""Row timestamps required for the operation don't exist"""


def format_spec_list(specs):
"""
Format a database specification list string out of a list of specification
Expand Down
18 changes: 16 additions & 2 deletions kcidb/db/mux.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def upgrade(self, target_version):
driver.upgrade(driver_version)
self.version = version

def dump_iter(self, objects_per_report, with_metadata):
def dump_iter(self, objects_per_report, with_metadata, after, until):
"""
Dump all data from the first database in object number-limited chunks.
Expand All @@ -364,14 +364,28 @@ def dump_iter(self, objects_per_report, with_metadata):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
Returns:
An iterator returning report JSON data adhering to the current I/O
schema version, each containing at most the specified number of
objects.
Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
yield from self.drivers[0].dump_iter(objects_per_report,
with_metadata)
with_metadata, after, until)

# We can live with this for now, pylint: disable=too-many-arguments
# Or if you prefer, pylint: disable=too-many-positional-arguments
Expand Down
19 changes: 18 additions & 1 deletion kcidb/db/null.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def get_last_modified(self):
"""
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def dump_iter(self, objects_per_report, with_metadata):
def dump_iter(self, objects_per_report, with_metadata, after, until):
"""
Dump all data from the database in object number-limited chunks.
Expand All @@ -138,13 +138,30 @@ def dump_iter(self, objects_per_report, with_metadata):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
Returns:
An iterator returning report JSON data adhering to the current I/O
schema version, each containing at most the specified number of
objects.
Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
del objects_per_report
del with_metadata
del after
del until
yield io.SCHEMA.new()

# We can live with this for now, pylint: disable=too-many-arguments
Expand Down
7 changes: 5 additions & 2 deletions kcidb/db/postgresql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def __init__(self, constraint=None,

class Table(_SQLTable):
"""A table schema"""
def __init__(self, columns, primary_key=None):
def __init__(self, columns, primary_key=None, timestamp=None):
"""
Initialize the table schema.
Expand All @@ -249,9 +249,12 @@ def __init__(self, columns, primary_key=None):
primary_key: A list of names of columns constituting the
primary key. None or an empty list to use the
column with the PRIMARY_KEY constraint instead.
timestamp The name of the column containing last row change
timestamp. Must exist in "columns".
"""
# TODO: Switch to hardcoding "_" key_sep in base class
super().__init__("%s", columns, primary_key, key_sep="_")
super().__init__("%s", columns, primary_key, key_sep="_",
timestamp=timestamp)


class Index(_SQLIndex):
Expand Down
Loading

0 comments on commit 78f0d62

Please sign in to comment.