From 1d6c60745076f63bde79c0e45702f253b0059ebe Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Sun, 10 Nov 2024 20:39:37 +0200 Subject: [PATCH 1/2] Archive each table independently At the moment all operational tables are archived using a single timestamp for the lower boundary, which either loses us data, if they're uneven, or forces us to load duplicates to make sure we don't lose anything. This is because some databases don't support atomic modifications of multiple tables at once. Work that around, by assigning separate time ranges to separate tables, when dumping. --- kcidb/db/__init__.py | 165 +++++++++++++++++++++++++--------- kcidb/db/abstract.py | 53 +++++++---- kcidb/db/bigquery/v04_00.py | 105 ++++++++++++++-------- kcidb/db/mux.py | 76 +++++++++------- kcidb/db/null.py | 48 +++++----- kcidb/db/postgresql/v04_00.py | 96 +++++++++++--------- kcidb/db/schematic.py | 110 +++++++++++++---------- kcidb/db/sql/schema.py | 20 +++-- kcidb/db/sqlite/v04_00.py | 102 +++++++++++---------- kcidb/test_db.py | 36 +++++--- main.py | 72 +++++++++++---- 11 files changed, 562 insertions(+), 321 deletions(-) diff --git a/kcidb/db/__init__.py b/kcidb/db/__init__.py index 5cd62791..87de6ca5 100644 --- a/kcidb/db/__init__.py +++ b/kcidb/db/__init__.py @@ -186,18 +186,23 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ assert self.is_initialized() + io_schema = self.get_schema()[1] first_modified = self.driver.get_first_modified() - assert first_modified is None or \ - isinstance(first_modified, datetime.datetime) and \ - first_modified.tzinfo + assert isinstance(first_modified, dict) + assert all( + obj_list_name in io_schema.id_fields and + isinstance(timestamp, datetime.datetime) and timestamp.tzinfo + for obj_list_name, timestamp in first_modified.items() + ) return first_modified def get_last_modified(self): @@ -206,18 +211,23 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ assert self.is_initialized() + io_schema = self.get_schema()[1] last_modified = self.driver.get_last_modified() - assert last_modified is None or \ - isinstance(last_modified, datetime.datetime) and \ - last_modified.tzinfo + assert isinstance(last_modified, dict) + assert all( + obj_list_name in io_schema.id_fields and + isinstance(timestamp, datetime.datetime) and timestamp.tzinfo + for obj_list_name, timestamp in last_modified.items() + ) return last_modified def get_schemas(self): @@ -310,16 +320,29 @@ def dump_iter(self, objects_per_report=0, with_metadata=True, report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. Can be a single datetime + object, one for all object types, or None to + not limit the dump by this parameter + (equivalent to empty dictionary). + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. + Object types missing from this dictionary will + not be limited. Can be a single datetime + object, one for all object types, or None to + not limit the dump by this parameter + (equivalent to empty dictionary). Returns: An iterator returning report JSON data adhering to the current I/O @@ -327,17 +350,36 @@ def dump_iter(self, objects_per_report=0, with_metadata=True, objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and - the database doesn't have row timestamps. + NoTimestamps - Either "after" or "until" are not None/empty, + and the database doesn't have row timestamps. """ + assert self.is_initialized() + id_fields = self.get_schema()[1].id_fields assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo + isinstance(after, datetime.datetime) and after.tzinfo or \ + isinstance(after, dict) and all( + obj_list_name in id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in after.items() + ) assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo - assert self.is_initialized() + isinstance(until, datetime.datetime) and until.tzinfo or \ + isinstance(until, dict) and all( + obj_list_name in id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in until.items() + ) + if after is None: + after = {} + elif not isinstance(after, dict): + after = {n: after for n in id_fields} + if until is None: + until = {} + elif not isinstance(until, dict): + until = {n: until for n in id_fields} yield from self.driver.dump_iter( objects_per_report=objects_per_report, with_metadata=with_metadata, @@ -351,31 +393,55 @@ def dump(self, with_metadata=True, after=None, until=None): Args: with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. Can be a single datetime + object, one for all object types, or None to + not limit the dump by this parameter + (equivalent to empty dictionary). + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. + Object types missing from this dictionary will + not be limited. Can be a single datetime + object, one for all object types, or None to + not limit the dump by this parameter + (equivalent to empty dictionary). Returns: The JSON data from the database adhering to the current I/O schema version. Raises: - NoTimestamps - Either "after" or "until" are not None, and - the database doesn't have row timestamps. + NoTimestamps - Either "after" or "until" are not None/empty, + and the database doesn't have row timestamps. """ + assert self.is_initialized() + io_schema = self.get_schema()[1] assert isinstance(with_metadata, bool) assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo + isinstance(after, datetime.datetime) and after.tzinfo or \ + isinstance(after, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in after.items() + ) assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo - assert self.is_initialized() + isinstance(until, datetime.datetime) and until.tzinfo or \ + isinstance(until, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in until.items() + ) try: return next(self.dump_iter(objects_per_report=0, with_metadata=with_metadata, @@ -1077,6 +1143,11 @@ def time_main(): sys.excepthook = kcidb.misc.log_and_print_excepthook description = 'kcidb-db-time - Fetch various timestamps from a KCIDB DB' parser = ArgumentParser(description=description) + parser.add_argument( + '-v', '--verbose', + help='Output timestamp breakdown, when available', + action='store_true' + ) parser.add_argument( 'type', choices=['first_modified', 'last_modified', 'current'], @@ -1088,9 +1159,19 @@ def time_main(): raise Exception(f"Database {args.database!r} is not initialized") ts = None if args.type == 'first_modified': - ts = client.get_first_modified() + ts = None + for obj_list_name, obj_list_ts in client.get_first_modified().items(): + if args.verbose: + print(obj_list_name + ": " + + obj_list_ts.isoformat(timespec='microseconds')) + ts = min(ts or obj_list_ts, obj_list_ts) elif args.type == 'last_modified': - ts = client.get_last_modified() + ts = None + for obj_list_name, obj_list_ts in client.get_last_modified().items(): + if args.verbose: + print(obj_list_name + ": " + + obj_list_ts.isoformat(timespec='microseconds')) + ts = min(ts or obj_list_ts, obj_list_ts) elif args.type == 'current': ts = client.get_current_time() if ts is None: diff --git a/kcidb/db/abstract.py b/kcidb/db/abstract.py index 31ecfbac..e885c837 100644 --- a/kcidb/db/abstract.py +++ b/kcidb/db/abstract.py @@ -117,8 +117,9 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -132,8 +133,9 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -197,16 +199,22 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: An iterator returning report JSON data adhering to the current @@ -217,14 +225,21 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): NoTimestamps - Either "after" or "until" are not None, and the database doesn't have row timestamps. """ + assert self.is_initialized() + io_schema = self.get_schema()[1] assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) - assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo - assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo - assert self.is_initialized() + assert isinstance(after, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in after.items() + ) + assert isinstance(until, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in until.items() + ) # No, it's not, pylint: disable=too-many-return-statements def query_ids_are_valid(self, ids): diff --git a/kcidb/db/bigquery/v04_00.py b/kcidb/db/bigquery/v04_00.py index 72a4c4b7..cf25d694 100644 --- a/kcidb/db/bigquery/v04_00.py +++ b/kcidb/db/bigquery/v04_00.py @@ -741,26 +741,33 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the I/O version - of the database schema, each containing at most the specified - number of objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ + # We'll try to manage, pylint: disable=too-many-locals assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) @@ -772,7 +779,9 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): (f for f in table_schema if f.name == "_timestamp"), None ) - if (after or until) and not ts_field: + table_after = after.get(obj_list_name) + table_until = until.get(obj_list_name) + if (table_after or table_until) and not ts_field: raise NoTimestamps( f"Table {obj_list_name!r} has no {ts_field.name!r} column" ) @@ -787,13 +796,15 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): (( " WHERE " + " AND ".join( f"{ts_field.name} {op} ?" - for op, v in ((">", after), ("<=", until)) if v + for op, v in ( + (">", table_after), ("<=", table_until) + ) if v ) - ) if (after or until) else "") + ) if (table_after or table_until) else "") ) query_parameters = [ bigquery.ScalarQueryParameter(None, ts_field.field_type, v) - for v in (after, until) if v + for v in (table_after, table_until) if v ] query_job = self.conn.query_create(query_string, query_parameters) obj_list = None @@ -1205,8 +1216,9 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -1218,14 +1230,22 @@ def get_first_modified(self): ): raise NoTimestamps("Database is missing timestamps in its schema") - return next(iter(self.conn.query_create( - "SELECT MIN(first_modified) AS first_modified FROM(\n" + - "UNION ALL\n".join( - f"SELECT MIN(_timestamp) AS first_modified FROM {table_name}\n" - for table_name in self.TABLE_MAP - ) + - ")\n" - ).result()))[0] + return { + table_name: first_modified + for table_name, first_modified in self.conn.query_create( + "\nUNION ALL\n".join( + f"SELECT ? AS table_name, " + f"MIN(_timestamp) AS first_modified " + f"FROM _{table_name}" + for table_name in self.TABLE_MAP + ), + [ + bigquery.ScalarQueryParameter(None, "STRING", table_name) + for table_name in self.TABLE_MAP + ] + ).result() + if first_modified + } def get_last_modified(self): """ @@ -1233,8 +1253,9 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -1246,11 +1267,19 @@ def get_last_modified(self): ): raise NoTimestamps("Database is missing timestamps in its schema") - return next(iter(self.conn.query_create( - "SELECT MAX(last_modified) AS last_modified FROM(\n" + - "UNION ALL\n".join( - f"SELECT MAX(_timestamp) AS last_modified FROM {table_name}\n" - for table_name in self.TABLE_MAP - ) + - ")\n" - ).result()))[0] + return { + table_name: last_modified + for table_name, last_modified in self.conn.query_create( + "\nUNION ALL\n".join( + f"SELECT ? AS table_name, " + f"MAX(_timestamp) AS last_modified " + f"FROM _{table_name}" + for table_name in self.TABLE_MAP + ), + [ + bigquery.ScalarQueryParameter(None, "STRING", table_name) + for table_name in self.TABLE_MAP + ] + ).result() + if last_modified + } diff --git a/kcidb/db/mux.py b/kcidb/db/mux.py index b6bc8789..9b4e61ea 100644 --- a/kcidb/db/mux.py +++ b/kcidb/db/mux.py @@ -291,13 +291,13 @@ def get_current_time(self): def get_first_modified(self): """ - Get the time data has arrived first into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived first into the driven database. The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -305,20 +305,25 @@ def get_first_modified(self): """ assert self.is_initialized() max_ts = datetime.datetime.max.replace(tzinfo=datetime.timezone.utc) - return min( - (driver.get_first_modified() for driver in self.drivers), - key=lambda ts: ts or max_ts - ) + merged_first_modified = {} + for driver in self.drivers: + first_modified = driver.get_first_modified() + for obj_list_name, timestamp in first_modified.items(): + merged_first_modified[obj_list_name] = min( + merged_first_modified.get(obj_list_name, max_ts), + timestamp + ) + return merged_first_modified def get_last_modified(self): """ - Get the time data has arrived last into the driven database. Can - return the minimum timestamp constant, if the database is empty. + Get the time data has arrived last into the driven database. The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -326,10 +331,15 @@ def get_last_modified(self): """ assert self.is_initialized() min_ts = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) - return max( - (driver.get_last_modified() for driver in self.drivers), - key=lambda ts: ts or min_ts - ) + merged_last_modified = {} + for driver in self.drivers: + last_modified = driver.get_last_modified() + for obj_list_name, timestamp in last_modified.items(): + merged_last_modified[obj_list_name] = max( + merged_last_modified.get(obj_list_name, min_ts), + timestamp + ) + return merged_last_modified def get_schemas(self): """ @@ -393,24 +403,30 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the current I/O - schema version, each containing at most the specified number of - objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ yield from self.drivers[0].dump_iter(objects_per_report, diff --git a/kcidb/db/null.py b/kcidb/db/null.py index bcfc0dff..1427821e 100644 --- a/kcidb/db/null.py +++ b/kcidb/db/null.py @@ -122,14 +122,15 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - return None + return {} def get_last_modified(self): """ @@ -137,14 +138,15 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - return None + return {} def dump_iter(self, objects_per_report, with_metadata, after, until): """ @@ -155,24 +157,30 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the current I/O - schema version, each containing at most the specified number of - objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ del objects_per_report diff --git a/kcidb/db/postgresql/v04_00.py b/kcidb/db/postgresql/v04_00.py index 981f07aa..029239ee 100644 --- a/kcidb/db/postgresql/v04_00.py +++ b/kcidb/db/postgresql/v04_00.py @@ -5,6 +5,7 @@ import textwrap from collections import namedtuple from itertools import chain +from functools import reduce import psycopg2 import psycopg2.extras import psycopg2.errors @@ -18,6 +19,8 @@ Constraint, BoolColumn, FloatColumn, IntegerColumn, TimestampColumn, \ VarcharColumn, TextColumn, TextArrayColumn, JSONColumn, Table +# It's OK for now, pylint: disable=too-many-lines + # Module's logger LOGGER = logging.getLogger(__name__) @@ -564,16 +567,22 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: An iterator returning report JSON data adhering to the I/O @@ -581,7 +590,7 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) @@ -593,9 +602,10 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): with self.conn, self.conn.cursor() as cursor: for table_name, table_schema in self.TABLES.items(): obj_list = None - cursor.execute(*table_schema.format_dump(table_name, - with_metadata, - after, until)) + cursor.execute(*table_schema.format_dump( + table_name, with_metadata, + after.get(table_name), until.get(table_name) + )) for obj in table_schema.unpack_iter(cursor, with_metadata): if obj_list is None: obj_list = [] @@ -943,27 +953,28 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - statement = ( - "SELECT MIN(first_modified) AS first_modified\n" + - "FROM (\n" + - textwrap.indent( - "\nUNION ALL\n".join( - table_schema.format_get_first_modified(table_name) - for table_name, table_schema in self.TABLES.items() - ), - " " * 4 - ) + "\n) AS tables\n" + query = reduce( + lambda a, b: (a[0] + "\nUNION ALL\n" + b[0], a[1] + b[1]), + ( + table_schema.format_get_first_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ) ) with self.conn, self.conn.cursor() as cursor: - cursor.execute(statement) - return cursor.fetchone()[0] + cursor.execute(*query) + return { + table_name: first_modified + for (table_name, first_modified) in cursor + if first_modified + } def get_last_modified(self): """ @@ -971,24 +982,25 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - statement = ( - "SELECT MAX(last_modified) AS last_modified\n" + - "FROM (\n" + - textwrap.indent( - "\nUNION ALL\n".join( - table_schema.format_get_last_modified(table_name) - for table_name, table_schema in self.TABLES.items() - ), - " " * 4 - ) + "\n) AS tables\n" + query = reduce( + lambda a, b: (a[0] + "\nUNION ALL\n" + b[0], a[1] + b[1]), + ( + table_schema.format_get_last_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ) ) with self.conn, self.conn.cursor() as cursor: - cursor.execute(statement) - return cursor.fetchone()[0] + cursor.execute(*query) + return { + table_name: last_modified + for (table_name, last_modified) in cursor + if last_modified + } diff --git a/kcidb/db/schematic.py b/kcidb/db/schematic.py index 8ac3daaf..9e8f99b2 100644 --- a/kcidb/db/schematic.py +++ b/kcidb/db/schematic.py @@ -271,33 +271,32 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the current - database schema's I/O schema version, each containing at most the + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ - assert isinstance(objects_per_report, int) - assert objects_per_report >= 0 - assert isinstance(with_metadata, bool) - assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo - assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo # We can live with this for now, pylint: disable=too-many-arguments # Or if you prefer, pylint: disable=too-many-positional-arguments @@ -375,8 +374,9 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -390,8 +390,9 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -567,8 +568,9 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -583,8 +585,9 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and @@ -669,34 +672,47 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the schema's - I/O schema version, each containing at most the specified number - of objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ + assert self.is_initialized() + io_schema = self.get_schema()[1] assert isinstance(objects_per_report, int) assert objects_per_report >= 0 assert isinstance(with_metadata, bool) - assert after is None or \ - isinstance(after, datetime.datetime) and after.tzinfo - assert until is None or \ - isinstance(until, datetime.datetime) and until.tzinfo - assert self.is_initialized() + assert isinstance(after, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in after.items() + ) + assert isinstance(until, dict) and all( + obj_list_name in io_schema.id_fields and + isinstance(ts, datetime.datetime) and ts.tzinfo + for obj_list_name, ts in until.items() + ) return self.schema.dump_iter(objects_per_report, with_metadata, after, until) diff --git a/kcidb/db/sql/schema.py b/kcidb/db/sql/schema.py index 1a96c4e2..1fdeae36 100644 --- a/kcidb/db/sql/schema.py +++ b/kcidb/db/sql/schema.py @@ -352,8 +352,9 @@ def format_get_first_modified(self, name): name: The name of the target table of the command. Returns: - The formatted "SELECT" command, returning the timestamp in - "first_modified" column. + The formatted "SELECT" command and its parameters container. + The "SELECT" command returns the table name in "table_name", + and the timestamp in "first_modified" columns. Raises: NoTimestamps - The table doesn't have row timestamps. @@ -362,7 +363,10 @@ def format_get_first_modified(self, name): if not self.timestamp: raise NoTimestamps("Table has no timestamp column") return ( - f"SELECT MIN({self.timestamp.name}) AS first_modified FROM {name}" + f"SELECT {self.placeholder} AS table_name, " + f"MIN({self.timestamp.name}) AS first_modified " + f"FROM {name}", + (name,) ) def format_get_last_modified(self, name): @@ -374,8 +378,9 @@ def format_get_last_modified(self, name): name: The name of the target table of the command. Returns: - The formatted "SELECT" command, returning the timestamp in - "last_modified" column. + The formatted "SELECT" command and its parameters container. + The "SELECT" command returns the table name in "table_name", + and the timestamp in "last_modified" columns. Raises: NoTimestamps - The table doesn't have row timestamps. @@ -384,7 +389,10 @@ def format_get_last_modified(self, name): if not self.timestamp: raise NoTimestamps("Table has no timestamp column") return ( - f"SELECT MAX({self.timestamp.name}) AS last_modified FROM {name}" + f"SELECT {self.placeholder} AS table_name, " + f"MAX({self.timestamp.name}) AS last_modified " + f"FROM {name}", + (name,) ) def format_delete(self, name): diff --git a/kcidb/db/sqlite/v04_00.py b/kcidb/db/sqlite/v04_00.py index f195358f..1ca73d2b 100644 --- a/kcidb/db/sqlite/v04_00.py +++ b/kcidb/db/sqlite/v04_00.py @@ -492,24 +492,30 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): report data, or zero for no limit. with_metadata: True, if metadata fields should be dumped as well. False, if not. - after: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be excluded from the dump should've arrived. - The data after this time will be dumped. - Can be None to have no limit on older data. - until: An "aware" datetime.datetime object specifying - the latest (database server) time the data to - be dumped should've arrived. - The data after this time will not be dumped. - Can be None to have no limit on newer data. + after: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *excluded* from the dump. Any objects which + arrived later will be *eligible* for dumping. + Object types missing from this dictionary will + not be limited. + until: A dictionary of names of I/O object types + (list names) and timezone-aware datetime + objects specifying the latest time the + corresponding objects should've arrived to be + *included* into the dump. Any objects which + arrived later will be *ineligible* for + dumping. Object types missing from this + dictionary will not be limited. Returns: - An iterator returning report JSON data adhering to the I/O version - of the database schema, each containing at most the specified - number of objects. + An iterator returning report JSON data adhering to the I/O + version of the database schema, each containing at most the + specified number of objects. Raises: - NoTimestamps - Either "after" or "until" are not None, and + NoTimestamps - Either "after" or "until" are not empty, and the database doesn't have row timestamps. """ assert isinstance(objects_per_report, int) @@ -522,10 +528,10 @@ def dump_iter(self, objects_per_report, with_metadata, after, until): cursor = self.conn.cursor() try: for table_name, table_schema in self.TABLES.items(): - result = cursor.execute( - *table_schema.format_dump(table_name, with_metadata, - after, until) - ) + result = cursor.execute(*table_schema.format_dump( + table_name, with_metadata, + after.get(table_name), until.get(table_name) + )) obj_list = None for obj in table_schema.unpack_iter(result, with_metadata): @@ -898,32 +904,32 @@ def get_first_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the first - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the first one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - statement = ( - "SELECT MIN(first_modified) AS first_modified\n" + - "FROM (\n" + - textwrap.indent( - "\nUNION ALL\n".join( - table_schema.format_get_first_modified(table_name) - for table_name, table_schema in self.TABLES.items() - ), - " " * 4 - ) + "\n) AS tables\n" + query = reduce( + lambda a, b: (a[0] + "\nUNION ALL\n" + b[0], a[1] + b[1]), + ( + table_schema.format_get_first_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ) ) with self.conn: cursor = self.conn.cursor() try: - cursor.execute(statement) - ts_str = cursor.fetchone()[0] + return { + table_name: dateutil.parser.isoparse(first_modified) + for (table_name, first_modified) + in cursor.execute(*query) + if first_modified + } finally: cursor.close() - return ts_str and dateutil.parser.isoparse(ts_str) def get_last_modified(self): """ @@ -931,29 +937,29 @@ def get_last_modified(self): The database must be initialized. Returns: - A timezone-aware datetime object representing the last - data arrival time, or None if the database is empty. + A dictionary of names of I/O object types (list names), which have + objects in the database, and timezone-aware datetime objects + representing the time the last one has arrived into the database. Raises: NoTimestamps - The database doesn't have row timestamps, and cannot determine data arrival time. """ - statement = ( - "SELECT MAX(last_modified) AS last_modified\n" + - "FROM (\n" + - textwrap.indent( - "\nUNION ALL\n".join( - table_schema.format_get_last_modified(table_name) - for table_name, table_schema in self.TABLES.items() - ), - " " * 4 - ) + "\n) AS tables\n" + query = reduce( + lambda a, b: (a[0] + "\nUNION ALL\n" + b[0], a[1] + b[1]), + ( + table_schema.format_get_last_modified(table_name) + for table_name, table_schema in self.TABLES.items() + ) ) with self.conn: cursor = self.conn.cursor() try: - cursor.execute(statement) - ts_str = cursor.fetchone()[0] + return { + table_name: dateutil.parser.isoparse(last_modified) + for (table_name, last_modified) + in cursor.execute(*query) + if last_modified + } finally: cursor.close() - return ts_str and dateutil.parser.isoparse(ts_str) diff --git a/kcidb/test_db.py b/kcidb/test_db.py index 15029f6b..801dbd87 100644 --- a/kcidb/test_db.py +++ b/kcidb/test_db.py @@ -451,27 +451,37 @@ def test_get_modified(clean_database): # Check a post-timestamp schema version time.sleep(1) client.init() - timestamp = client.get_first_modified() - assert timestamp is None - timestamp = client.get_last_modified() - assert timestamp is None + io_schema = client.get_schema()[1] + timestamps = client.get_first_modified() + assert timestamps == {} + timestamps = client.get_last_modified() + assert timestamps == {} before_load = client.get_current_time() client.load(COMPREHENSIVE_IO_DATA) first_modified = client.get_first_modified() last_modified = client.get_last_modified() - assert first_modified is not None - assert isinstance(first_modified, datetime.datetime) - assert first_modified.tzinfo is not None - assert first_modified >= before_load + assert isinstance(first_modified, dict) + assert set(io_schema.id_fields) == set(first_modified) + assert all( + isinstance(t, datetime.datetime) and + t.tzinfo is not None and + t >= before_load + for t in first_modified.values() + ) + + assert isinstance(last_modified, dict) + assert set(io_schema.id_fields) == set(last_modified) + assert all( + isinstance(t, datetime.datetime) and + t.tzinfo is not None and + t >= before_load + for t in first_modified.values() + ) - assert last_modified is not None - assert isinstance(last_modified, datetime.datetime) - assert last_modified.tzinfo is not None - assert last_modified >= before_load + assert all(t >= first_modified[n] for n, t in last_modified.items()) - assert last_modified >= first_modified client.cleanup() diff --git a/main.py b/main.py index d4a10fe5..391b9bbd 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ """Google Cloud Functions for Kernel CI reporting""" +import gc import os import time import atexit @@ -423,45 +424,84 @@ def kcidb_archive(event, context): that is out of the editing window (to be enforced), and hasn't been transferred yet. """ + # It's OK, pylint: disable=too-many-locals + # + # Editing window + edit_window = datetime.timedelta(days=14) + # Maximum duration of the dump transferred in a single execution + max_duration = datetime.timedelta(days=7) + # Duration of each dump piece + piece_duration = datetime.timedelta(hours=6) + op_client = get_db_client(OPERATIONAL_DATABASE) + op_io_schema = op_client.get_schema()[1] + op_obj_list_names = set(op_io_schema.id_fields) op_now = op_client.get_current_time() op_first_modified = op_client.get_first_modified() if not op_first_modified: LOGGER.info("Operational database is empty, nothing to archive") return + # Maximum timestamp of data to archive + max_until = op_now - edit_window + ar_client = get_db_client(ARCHIVE_DATABASE) ar_last_modified = ar_client.get_last_modified() - after = ar_last_modified or \ - (op_first_modified - datetime.timedelta(seconds=1)) - until = min( - # Add a timespan we can fit in time limits - after + datetime.timedelta(days=7), - # Subtract editing window (to be enforced) - op_now - datetime.timedelta(days=14) - ) - if after >= until: + # Find the timestamps right before the data we need to fetch + after = { + n: ( + ar_last_modified.get(n) or + op_first_modified.get(n) and + op_first_modified[n] - datetime.timedelta(seconds=1) + ) for n in op_obj_list_names + } + min_after = min(after.values()) + if min_after >= max_until: LOGGER.info("No data old enough to archive") return + # Find the maximum timestamp of the data we need to fetch + # We try to align all tables on a single time boundary + until = min(min_after + max_duration, max_until) + # Transfer data in pieces which can hopefully fit in memory - after_str = after.isoformat(timespec='microseconds') - while after < until: - next_after = min(after + datetime.timedelta(hours=12), until) - next_after_str = next_after.isoformat(timespec='microseconds') + # Split by time, down to microseconds, as it's our transfer atom + min_after_str = min_after.isoformat(timespec='microseconds') + while all(t < until for t in after.values()): + next_after = { + n: min(max(t, min_after + piece_duration), until) + for n, t in after.items() + } + next_min_after = min(next_after.values()) + next_min_after_str = next_min_after.isoformat(timespec='microseconds') # Transfer the data, preserving the timestamps LOGGER.info("FETCHING operational database dump for (%s, %s] range", - after_str, next_after_str) + min_after_str, next_min_after_str) + for obj_list_name in after: + LOGGER.debug( + "FETCHING %s for (%s, %s] range", + obj_list_name, + after[obj_list_name].isoformat(timespec='microseconds'), + next_after[obj_list_name].isoformat(timespec='microseconds') + ) dump = op_client.dump(with_metadata=True, after=after, until=next_after) LOGGER.info("LOADING a dump of %u objects into archive database", kcidb.io.SCHEMA.count(dump)) ar_client.load(dump, with_metadata=True) LOGGER.info("ARCHIVED %u objects in (%s, %s] range", - kcidb.io.SCHEMA.count(dump), after_str, next_after_str) + kcidb.io.SCHEMA.count(dump), + min_after_str, next_min_after_str) + for obj_list_name in after: + LOGGER.debug("ARCHIVED %u %s", + len(dump.get(obj_list_name, [])), obj_list_name) after = next_after - after_str = next_after_str + min_after = next_min_after + min_after_str = next_min_after_str + # Make sure we have enough memory for the next piece + dump = None + gc.collect() def kcidb_purge_db(event, context): From 4ffbbb326d9ce5848878d90b07e6d6edf426a60f Mon Sep 17 00:00:00 2001 From: Nikolai Kondrashov Date: Tue, 12 Nov 2024 18:49:48 +0200 Subject: [PATCH 2/2] tests: Wait longer for archival --- test_main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test_main.py b/test_main.py index 8e6e98db..4a7b0c97 100644 --- a/test_main.py +++ b/test_main.py @@ -410,7 +410,7 @@ def gen_data(id, ts): op_client.load(data_now, with_metadata=True) # Trigger and wait for archival (ignore possibility of actual trigger) publisher.publish({}) - time.sleep(30) + time.sleep(60) # Check data_now doesn't end up in the archive DB assert ar_schema.count(ar_client.dump()) == 0 @@ -418,7 +418,7 @@ def gen_data(id, ts): op_client.load(op_schema.merge(data_3w, [data_4w]), with_metadata=True) # Trigger and wait for archival (ignore possibility of actual trigger) publisher.publish({}) - time.sleep(30) + time.sleep(60) # Check data_4w is in the archive database dump = ar_client.dump() assert all( @@ -434,7 +434,7 @@ def gen_data(id, ts): ), "Some three-week old data in the archive" # Trigger and wait for another archival (ignore chance of actual trigger) publisher.publish({}) - time.sleep(30) + time.sleep(60) # Check data_3w is now in the archive database dump = ar_client.dump() assert all(