Skip to content

Commit

Permalink
Archive each table independently
Browse files Browse the repository at this point in the history
At the moment all operational tables are archived using a single
timestamp for the lower boundary, which either loses us data, if they're
uneven, or forces us to load duplicates to make sure we don't lose
anything. This is because some databases don't support atomic
modifications of multiple tables at once.

Work that around, by assigning separate time ranges to separate tables,
when dumping.
  • Loading branch information
spbnick committed Nov 11, 2024
1 parent 2d4c9a0 commit f93e13e
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 321 deletions.
165 changes: 123 additions & 42 deletions kcidb/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,23 @@ def get_first_modified(self):
The database must be initialized.
Returns:
A timezone-aware datetime object representing the first
data arrival time, or None if the database is empty.
A dictionary of names of I/O object types (list names), which have
objects in the database, and timezone-aware datetime objects
representing the time the first one has arrived into the database.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine data arrival time.
"""
assert self.is_initialized()
io_schema = self.get_schema()[1]
first_modified = self.driver.get_first_modified()
assert first_modified is None or \
isinstance(first_modified, datetime.datetime) and \
first_modified.tzinfo
assert isinstance(first_modified, dict)
assert all(
obj_list_name in io_schema.id_fields and
isinstance(timestamp, datetime.datetime) and timestamp.tzinfo
for obj_list_name, timestamp in first_modified.items()
)
return first_modified

def get_last_modified(self):
Expand All @@ -206,18 +211,23 @@ def get_last_modified(self):
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
data arrival time, or None if the database is empty.
A dictionary of names of I/O object types (list names), which have
objects in the database, and timezone-aware datetime objects
representing the time the last one has arrived into the database.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
cannot determine data arrival time.
"""
assert self.is_initialized()
io_schema = self.get_schema()[1]
last_modified = self.driver.get_last_modified()
assert last_modified is None or \
isinstance(last_modified, datetime.datetime) and \
last_modified.tzinfo
assert isinstance(last_modified, dict)
assert all(
obj_list_name in io_schema.id_fields and
isinstance(timestamp, datetime.datetime) and timestamp.tzinfo
for obj_list_name, timestamp in last_modified.items()
)
return last_modified

def get_schemas(self):
Expand Down Expand Up @@ -310,34 +320,66 @@ def dump_iter(self, objects_per_report=0, with_metadata=True,
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
after: A dictionary of names of I/O object types
(list names) and timezone-aware datetime
objects specifying the latest time the
corresponding objects should've arrived to be
*excluded* from the dump. Any objects which
arrived later will be *eligible* for dumping.
Object types missing from this dictionary will
not be limited. Can be a single datetime
object, one for all object types, or None to
not limit the dump by this parameter
(equivalent to empty dictionary).
until: A dictionary of names of I/O object types
(list names) and timezone-aware datetime
objects specifying the latest time the
corresponding objects should've arrived to be
*included* into the dump. Any objects which
arrived later will be *ineligible* for
dumping.
Object types missing from this dictionary will
not be limited. Can be a single datetime
object, one for all object types, or None to
not limit the dump by this parameter
(equivalent to empty dictionary).
Returns:
An iterator returning report JSON data adhering to the current I/O
schema version, each containing at most the specified number of
objects.
Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
NoTimestamps - Either "after" or "until" are not None/empty,
and the database doesn't have row timestamps.
"""
assert self.is_initialized()
id_fields = self.get_schema()[1].id_fields
assert isinstance(objects_per_report, int)
assert objects_per_report >= 0
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
isinstance(after, datetime.datetime) and after.tzinfo or \
isinstance(after, dict) and all(
obj_list_name in id_fields and
isinstance(ts, datetime.datetime) and ts.tzinfo
for obj_list_name, ts in after.items()
)
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()
isinstance(until, datetime.datetime) and until.tzinfo or \
isinstance(until, dict) and all(
obj_list_name in id_fields and
isinstance(ts, datetime.datetime) and ts.tzinfo
for obj_list_name, ts in until.items()
)
if after is None:
after = {}
elif not isinstance(after, dict):
after = {n: after for n in id_fields}
if until is None:
until = {}
elif not isinstance(until, dict):
until = {n: until for n in id_fields}
yield from self.driver.dump_iter(
objects_per_report=objects_per_report,
with_metadata=with_metadata,
Expand All @@ -351,31 +393,55 @@ def dump(self, with_metadata=True, after=None, until=None):
Args:
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
after: A dictionary of names of I/O object types
(list names) and timezone-aware datetime
objects specifying the latest time the
corresponding objects should've arrived to be
*excluded* from the dump. Any objects which
arrived later will be *eligible* for dumping.
Object types missing from this dictionary will
not be limited. Can be a single datetime
object, one for all object types, or None to
not limit the dump by this parameter
(equivalent to empty dictionary).
until: A dictionary of names of I/O object types
(list names) and timezone-aware datetime
objects specifying the latest time the
corresponding objects should've arrived to be
*included* into the dump. Any objects which
arrived later will be *ineligible* for
dumping.
Object types missing from this dictionary will
not be limited. Can be a single datetime
object, one for all object types, or None to
not limit the dump by this parameter
(equivalent to empty dictionary).
Returns:
The JSON data from the database adhering to the current I/O schema
version.
Raises:
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
NoTimestamps - Either "after" or "until" are not None/empty,
and the database doesn't have row timestamps.
"""
assert self.is_initialized()
io_schema = self.get_schema()[1]
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
isinstance(after, datetime.datetime) and after.tzinfo or \
isinstance(after, dict) and all(
obj_list_name in io_schema.id_fields and
isinstance(ts, datetime.datetime) and ts.tzinfo
for obj_list_name, ts in after.items()
)
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()
isinstance(until, datetime.datetime) and until.tzinfo or \
isinstance(until, dict) and all(
obj_list_name in io_schema.id_fields and
isinstance(ts, datetime.datetime) and ts.tzinfo
for obj_list_name, ts in until.items()
)
try:
return next(self.dump_iter(objects_per_report=0,
with_metadata=with_metadata,
Expand Down Expand Up @@ -1077,6 +1143,11 @@ def time_main():
sys.excepthook = kcidb.misc.log_and_print_excepthook
description = 'kcidb-db-time - Fetch various timestamps from a KCIDB DB'
parser = ArgumentParser(description=description)
parser.add_argument(
'-v', '--verbose',
help='Output timestamp breakdown, when available',
action='store_true'
)
parser.add_argument(
'type',
choices=['first_modified', 'last_modified', 'current'],
Expand All @@ -1088,9 +1159,19 @@ def time_main():
raise Exception(f"Database {args.database!r} is not initialized")
ts = None
if args.type == 'first_modified':
ts = client.get_first_modified()
ts = None
for obj_list_name, obj_list_ts in client.get_first_modified().items():
if args.verbose:
print(obj_list_name + ": " +
obj_list_ts.isoformat(timespec='microseconds'))
ts = min(ts or obj_list_ts, obj_list_ts)
elif args.type == 'last_modified':
ts = client.get_last_modified()
ts = None
for obj_list_name, obj_list_ts in client.get_last_modified().items():
if args.verbose:
print(obj_list_name + ": " +
obj_list_ts.isoformat(timespec='microseconds'))
ts = min(ts or obj_list_ts, obj_list_ts)
elif args.type == 'current':
ts = client.get_current_time()
if ts is None:
Expand Down
53 changes: 34 additions & 19 deletions kcidb/db/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ def get_first_modified(self):
The database must be initialized.
Returns:
A timezone-aware datetime object representing the first
data arrival time, or None if the database is empty.
A dictionary of names of I/O object types (list names), which have
objects in the database, and timezone-aware datetime objects
representing the time the first one has arrived into the database.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
Expand All @@ -132,8 +133,9 @@ def get_last_modified(self):
The database must be initialized.
Returns:
A timezone-aware datetime object representing the last
data arrival time, or None if the database is empty.
A dictionary of names of I/O object types (list names), which have
objects in the database, and timezone-aware datetime objects
representing the time the last one has arrived into the database.
Raises:
NoTimestamps - The database doesn't have row timestamps, and
Expand Down Expand Up @@ -197,16 +199,22 @@ def dump_iter(self, objects_per_report, with_metadata, after, until):
report data, or zero for no limit.
with_metadata: True, if metadata fields should be dumped as
well. False, if not.
after: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be excluded from the dump should've arrived.
The data after this time will be dumped.
Can be None to have no limit on older data.
until: An "aware" datetime.datetime object specifying
the latest (database server) time the data to
be dumped should've arrived.
The data after this time will not be dumped.
Can be None to have no limit on newer data.
after: A dictionary of names of I/O object types
(list names) and timezone-aware datetime
objects specifying the latest time the
corresponding objects should've arrived to be
*excluded* from the dump. Any objects which
arrived later will be *eligible* for dumping.
Object types missing from this dictionary will
not be limited.
until: A dictionary of names of I/O object types
(list names) and timezone-aware datetime
objects specifying the latest time the
corresponding objects should've arrived to be
*included* into the dump. Any objects which
arrived later will be *ineligible* for
dumping. Object types missing from this
dictionary will not be limited.
Returns:
An iterator returning report JSON data adhering to the current
Expand All @@ -217,14 +225,21 @@ def dump_iter(self, objects_per_report, with_metadata, after, until):
NoTimestamps - Either "after" or "until" are not None, and
the database doesn't have row timestamps.
"""
assert self.is_initialized()
io_schema = self.get_schema()[1]
assert isinstance(objects_per_report, int)
assert objects_per_report >= 0
assert isinstance(with_metadata, bool)
assert after is None or \
isinstance(after, datetime.datetime) and after.tzinfo
assert until is None or \
isinstance(until, datetime.datetime) and until.tzinfo
assert self.is_initialized()
assert isinstance(after, dict) and all(
obj_list_name in io_schema.id_fields and
isinstance(ts, datetime.datetime) and ts.tzinfo
for obj_list_name, ts in after.items()
)
assert isinstance(until, dict) and all(
obj_list_name in io_schema.id_fields and
isinstance(ts, datetime.datetime) and ts.tzinfo
for obj_list_name, ts in until.items()
)

# No, it's not, pylint: disable=too-many-return-statements
def query_ids_are_valid(self, ids):
Expand Down
Loading

0 comments on commit f93e13e

Please sign in to comment.