From 21ddc09ab4670a66286ab102f33aaae289c37b5b Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Tue, 17 Sep 2024 17:19:06 +0200 Subject: [PATCH] [core][fix] Persist parent update structure in an atomic way (#2196) --- fixcore/fixcore/db/db_access.py | 2 +- fixcore/fixcore/db/graphdb.py | 152 +++++++++----------------- fixcore/fixcore/db/model.py | 64 ++++++++++- fixcore/fixcore/model/graph_access.py | 3 +- fixcore/tests/fixcore/conftest.py | 4 +- 5 files changed, 114 insertions(+), 111 deletions(-) diff --git a/fixcore/fixcore/db/db_access.py b/fixcore/fixcore/db/db_access.py index d4c644a84e..e6c5ebcce3 100644 --- a/fixcore/fixcore/db/db_access.py +++ b/fixcore/fixcore/db/db_access.py @@ -275,7 +275,7 @@ def get_graph_db(self, name: GraphName, no_check: bool = False) -> GraphDB: else: if not no_check and not self.database.has_graph(name): raise NoSuchGraph(name) - graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph) + graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph, self.lock_db) event_db = EventGraphDB(graph_db, self.event_sender) self.graph_dbs[name] = event_db return event_db diff --git a/fixcore/fixcore/db/graphdb.py b/fixcore/fixcore/db/graphdb.py index b34fd8e8da..868440e1b6 100644 --- a/fixcore/fixcore/db/graphdb.py +++ b/fixcore/fixcore/db/graphdb.py @@ -9,7 +9,6 @@ from numbers import Number from textwrap import dedent from typing import ( - DefaultDict, Optional, Callable, AsyncGenerator, @@ -18,7 +17,6 @@ Dict, List, Tuple, - TypeVar, cast, AsyncIterator, Literal, @@ -39,7 +37,8 @@ from fixcore.db import arango_query, EstimatedSearchCost from fixcore.db.arango_query import fulltext_delimiter from fixcore.db.async_arangodb import AsyncArangoDB, AsyncArangoTransactionDB, AsyncArangoDBBase, AsyncCursorContext -from fixcore.db.model import GraphUpdate, QueryModel +from fixcore.db.lockdb import LockDB +from fixcore.db.model import GraphUpdate, QueryModel, GraphChange from fixcore.db.usagedb import resource_usage_db from fixcore.error import InvalidBatchUpdate, ConflictingChangeInProgress, NoSuchChangeError, OptimisticLockingFailed from fixcore.ids import NodeId, GraphName @@ -275,6 +274,7 @@ def __init__( name: GraphName, adjust_node: AdjustNode, config: GraphConfig, + lock_db: LockDB, ) -> None: super().__init__() self._name = name @@ -283,6 +283,7 @@ def __init__( self.in_progress = f"{name}_in_progress" self.node_history = f"{name}_node_history" self.usage_db = resource_usage_db(db, f"{name}_usage") + self.lock_db = lock_db self.db = db self.config = config @@ -309,8 +310,8 @@ async def create_node(self, model: Model, node_id: NodeId, data: Json, under_nod graph.add_node(node_id, data) graph.add_edge(under_node_id, node_id, EdgeTypes.default) access = GraphAccess(graph.graph, node_id, {under_node_id}) - _, node_inserts, _, _ = self.prepare_nodes(access, [], model) - _, edge_inserts, _, _ = self.prepare_edges(access, [], EdgeTypes.default) + node_inserts = self.prepare_nodes(access, [], model).node_inserts + edge_inserts = self.prepare_edges(access, [], EdgeTypes.default).edge_inserts[EdgeTypes.default] assert len(node_inserts) == 1 assert len(edge_inserts) == 1 edge_collection = self.edge_collection(EdgeTypes.default) @@ -974,7 +975,7 @@ async def move_temp_to_proper(self, change_id: str, temp_name: str, update_histo + edge_updates + edge_deletes + usage_updates - + [f'remove {{_key: "{change_key}"}} in {self.in_progress}'], + + [f'remove {{_key: "{change_key}"}} in {self.in_progress} OPTIONS {{ ignoreErrors: true }}'], ) ) cmd = f'function () {{\nvar db=require("@arangodb").db;\n{updates}\n}}' @@ -1034,15 +1035,9 @@ def adjust_node( # adjuster has the option to manipulate the resulting json return self.node_adjuster.adjust(json) - def prepare_nodes( - self, access: GraphAccess, node_cursor: Iterable[Json], model: Model - ) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]: + def prepare_nodes(self, access: GraphAccess, node_cursor: Iterable[Json], model: Model) -> GraphChange: log.info(f"Prepare nodes for subgraph {access.root()}") - info = GraphUpdate() - resource_inserts: List[Json] = [] - resource_updates: List[Json] = [] - resource_deletes: List[Json] = [] - + change = GraphChange() optional_properties = [*Section.all_ordered, "refs", "kinds", "flat", "hash", "hist_hash"] def insert_node(node: Json) -> None: @@ -1052,8 +1047,7 @@ def insert_node(node: Json) -> None: value = node.get(prop, None) if value: js_doc[prop] = value - resource_inserts.append(js_doc) - info.nodes_created += 1 + change.node_inserts.append(js_doc) def update_or_delete_node(node: Json) -> None: key = node["_key"] @@ -1061,8 +1055,7 @@ def update_or_delete_node(node: Json) -> None: elem = access.node(key) if elem is None: # node is in db, but not in the graph any longer: delete node - resource_deletes.append({"_key": key, "deleted": access.at_json, "history": True}) - info.nodes_deleted += 1 + change.node_deletes.append({"_key": key, "deleted": access.at_json, "history": True}) elif elem["hash"] != hash_string: # node is in db and in the graph, content is different adjusted: Json = self.adjust_node(model, elem, node["created"], access.at_json) @@ -1072,15 +1065,14 @@ def update_or_delete_node(node: Json) -> None: value = adjusted.get(prop, None) if value: js[prop] = value - resource_updates.append(js) - info.nodes_updated += 1 + change.node_updates.append(js) for doc in node_cursor: update_or_delete_node(doc) for not_visited in access.not_visited_nodes(): insert_node(not_visited) - return info, resource_inserts, resource_updates, resource_deletes + return change def _edge_to_json( self, from_node: str, to_node: str, data: Optional[Json], refs: Optional[Dict[str, str]] = None, **kwargs: Any @@ -1096,14 +1088,9 @@ def _edge_to_json( js["_to"] = f"{self.vertex_name}/{to_node}" return js - def prepare_edges( - self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: EdgeType - ) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]: + def prepare_edges(self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: EdgeType) -> GraphChange: log.info(f"Prepare edges of type {edge_type} for subgraph {access.root()}") - info = GraphUpdate() - edge_inserts: List[Json] = [] - edge_updates: List[Json] = [] - edge_deletes: List[Json] = [] + change = GraphChange() def edge_json(from_node: str, to_node: str, edge_data: Optional[Json]) -> Json: # Take the refs with the lower number of entries (or none): @@ -1117,8 +1104,7 @@ def edge_json(from_node: str, to_node: str, edge_data: Optional[Json]) -> Json: return self._edge_to_json(from_node, to_node, edge_data, refs) def insert_edge(from_node: str, to_node: str, edge_data: Optional[Json]) -> None: - edge_inserts.append(edge_json(from_node, to_node, edge_data)) - info.edges_created += 1 + change.edge_inserts[edge_type].append(edge_json(from_node, to_node, edge_data)) def update_edge(edge: Json) -> None: from_node = edge["_from"].split("/")[1] # vertex/id @@ -1126,12 +1112,10 @@ def update_edge(edge: Json) -> None: has_edge, edge_data = access.has_edge(from_node, to_node, edge_type) edge_hash = edge_data.get("hash") if edge_data else None if not has_edge: - edge_deletes.append(edge) - info.edges_deleted += 1 + change.edge_deletes[edge_type].append(edge) elif edge_hash != edge.get("hash"): js = edge_json(from_node, to_node, edge_data) - edge_updates.append(js) - info.edges_updated += 1 + change.edge_updates[edge_type].append(js) for doc in edge_cursor: update_edge(doc) @@ -1139,7 +1123,7 @@ def update_edge(edge: Json) -> None: for edge_from, edge_to, data in access.not_visited_edges(edge_type): insert_edge(edge_from, edge_to, data) - return info, edge_inserts, edge_updates, edge_deletes + return change async def merge_graph( self, @@ -1154,37 +1138,21 @@ async def merge_graph( async def prepare_graph( sub: GraphAccess, node_query: Tuple[str, Json], edge_query: Callable[[EdgeType], Tuple[str, Json]] - ) -> Tuple[ - GraphUpdate, - List[Json], # node insert - List[Json], # node update - List[Json], # node delete - Dict[EdgeType, List[Json]], # edge insert - Dict[EdgeType, List[Json]], # edge update - Dict[EdgeType, List[Json]], # edge delete - ]: - graph_info = GraphUpdate() + ) -> GraphChange: + graph_change = GraphChange() # check all nodes for this subgraph query, bind = node_query log.debug(f"Query for nodes: {sub.root()}") with await self.db.aql(query, bind_vars=bind, batch_size=50000) as node_cursor: - node_info, ni, nu, nd = self.prepare_nodes(sub, node_cursor, model) - graph_info += node_info + graph_change += self.prepare_nodes(sub, node_cursor, model) # check all edges in all relevant edge-collections - edge_inserts: DefaultDict[EdgeType, List[Json]] = defaultdict(list) - edge_updates: DefaultDict[EdgeType, List[Json]] = defaultdict(list) - edge_deletes: DefaultDict[EdgeType, List[Json]] = defaultdict(list) for edge_type in EdgeTypes.all: query, bind = edge_query(edge_type) log.debug(f"Query for edges of type {edge_type}: {sub.root()}") with await self.db.aql(query, bind_vars=bind, batch_size=50000) as ec: - edge_info, gei, geu, ged = self.prepare_edges(sub, ec, edge_type) - graph_info += edge_info - edge_inserts[edge_type] = gei - edge_updates[edge_type] = geu - edge_deletes[edge_type] = ged - return graph_info, ni, nu, nd, edge_inserts, edge_updates, edge_deletes + graph_change += self.prepare_edges(sub, ec, edge_type) + return graph_change roots, parent, graphs = GraphAccess.merge_graphs(graph_to_merge) log.info(f"merge_graph {len(roots)} merge nodes found. change_id={change_id}, is_batch={is_batch}.") @@ -1192,27 +1160,23 @@ async def prepare_graph( def merge_edges(merge_node: str, merge_node_kind: str, edge_type: EdgeType) -> Tuple[str, Json]: return self.query_update_edges(edge_type, merge_node_kind), {"update_id": merge_node} - K = TypeVar("K") # noqa: N806 - V = TypeVar("V") # noqa: N806 - - def combine_dict(left: Dict[K, List[V]], right: Dict[K, List[V]]) -> Dict[K, List[V]]: - result = dict(left) - for key, right_values in right.items(): - left_values = left.get(key) - result[key] = left_values + right_values if left_values else right_values - return result + def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]: + edge_ids = [self.db_edge_key(f, t) for f, t, k in parent.g.edges(keys=True) if k.edge_type == edge_type] + return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids) # this will throw an exception, in case of a conflicting update (--> outside try block) log.debug("Mark all parent nodes for this update to avoid conflicting changes") await self.mark_update(roots, list(parent.nodes), change_id, is_batch) try: - - def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]: - edge_ids = [self.db_edge_key(f, t) for f, t, k in parent.g.edges(keys=True) if k.edge_type == edge_type] - return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids) - + # store parent nodes and edges with a mutex to avoid conflicts parents_nodes = self.nodes_by_ids_and_until_replace_node(preserve_parent_structure, parent) - info, nis, nus, nds, eis, eus, eds = await prepare_graph(parent, parents_nodes, parent_edges) + async with self.lock_db.lock("merge_graph_parents"): + parent_change_id = change_id + "_parent" + parent_change = await prepare_graph(parent, parents_nodes, parent_edges) + if parent_change.change_count(): # only persist in case of changes + await self._persist_update(parent_change_id, False, parent_change, update_history) + + change = GraphChange() for num, (root, graph) in enumerate(graphs): root_kind = GraphResolver.resolved_kind(graph_to_merge.nodes[root]) if root_kind: @@ -1220,39 +1184,21 @@ def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]: log.info(f"Update subgraph: root={root} ({root_kind}, {num+1} of {len(roots)})") node_query = self.query_update_nodes(root_kind), {"update_id": root} edge_query = partial(merge_edges, root, root_kind) - - i, ni, nu, nd, ei, eu, ed = await prepare_graph(graph, node_query, edge_query) - info += i - nis += ni - nus += nu - nds += nd - eis = combine_dict(eis, ei) - eus = combine_dict(eus, eu) - eds = combine_dict(eds, ed) + change += await prepare_graph(graph, node_query, edge_query) else: - # Already checked in GraphAccess - only here as safeguard. + # Already checked in GraphAccess - only here as a safeguard. raise AttributeError(f"Kind of update root {root} is not a pre-resolved and can not be used!") - log.debug(f"Update prepared: {info}. Going to persist the changes.") + graph_update = parent_change.to_update() + change.to_update() + log.debug(f"Update prepared: {graph_update}. Going to persist the changes.") await self._refresh_marked_update(change_id) - await self._persist_update(change_id, is_batch, nis, nus, nds, eis, eus, eds, update_history) - return roots, info + await self._persist_update(change_id, is_batch, change, update_history) + return roots, graph_update except Exception as ex: await self.delete_marked_update(change_id) raise ex - async def _persist_update( - self, - change_id: str, - is_batch: bool, - resource_inserts: List[Json], - resource_updates: List[Json], - resource_deletes: List[Json], - edge_inserts: Dict[EdgeType, List[Json]], - edge_updates: Dict[EdgeType, List[Json]], - edge_deletes: Dict[EdgeType, List[Json]], - update_history: bool, - ) -> None: + async def _persist_update(self, change_id: str, is_batch: bool, change: GraphChange, update_history: bool) -> None: async def execute_many_async( async_fn: Callable[[str, List[Json]], Any], name: str, array: List[Json], **kwargs: Any ) -> None: @@ -1275,20 +1221,20 @@ async def trafo_many( async def store_to_tmp_collection(temp: StandardCollection) -> None: tmp = temp.name - ri = trafo_many(self.db.insert_many, tmp, resource_inserts, {"action": "node_created"}) - ru = trafo_many(self.db.insert_many, tmp, resource_updates, {"action": "node_updated"}) - rd = trafo_many(self.db.insert_many, tmp, resource_deletes, {"action": "node_deleted"}) + ri = trafo_many(self.db.insert_many, tmp, change.node_inserts, {"action": "node_created"}) + ru = trafo_many(self.db.insert_many, tmp, change.node_updates, {"action": "node_updated"}) + rd = trafo_many(self.db.insert_many, tmp, change.node_deletes, {"action": "node_deleted"}) edge_i = [ trafo_many(self.db.insert_many, tmp, inserts, {"action": "edge_insert", "edge_type": tpe}) - for tpe, inserts in edge_inserts.items() + for tpe, inserts in change.edge_inserts.items() ] edge_u = [ trafo_many(self.db.insert_many, tmp, updates, {"action": "edge_update", "edge_type": tpe}) - for tpe, updates in edge_updates.items() + for tpe, updates in change.edge_updates.items() ] edge_d = [ trafo_many(self.db.insert_many, tmp, deletes, {"action": "edge_delete", "edge_type": tpe}) - for tpe, deletes in edge_deletes.items() + for tpe, deletes in change.edge_deletes.items() ] await asyncio.gather(*([ri, ru, rd] + edge_i + edge_u + edge_d)) @@ -1562,7 +1508,7 @@ async def copy_graph(self, to_graph: GraphName, to_snapshot: bool = False) -> Gr if await self.db.has_graph(to_graph): raise ValueError(f"Graph {to_graph} already exists") - new_graph_db = ArangoGraphDB(db=self.db, name=to_graph, adjust_node=self.node_adjuster, config=self.config) + new_graph_db = ArangoGraphDB(self.db, to_graph, self.node_adjuster, self.config, self.lock_db) # collection creation can't be a part of a transaction so we do that first # we simply reuse the existing create_update_schema method but do not insert any genesis data diff --git a/fixcore/fixcore/db/model.py b/fixcore/fixcore/db/model.py index 081d14f571..5e5675938d 100644 --- a/fixcore/fixcore/db/model.py +++ b/fixcore/fixcore/db/model.py @@ -1,14 +1,16 @@ from __future__ import annotations -from abc import ABC -from typing import Dict, Any, Optional, Tuple, List +from collections import defaultdict +from typing import Dict, Any, Optional, Tuple, List, DefaultDict from attr import define +from attrs import field -from fixcore.model.graph_access import Section +from fixcore.model.graph_access import Section, EdgeTypes from fixcore.model.model import Model, ResolvedPropertyPath, ComplexKind from fixcore.model.resolve_in_graph import GraphResolver from fixcore.query.model import Query +from fixcore.types import Json, EdgeType from fixcore.util import first ancestor_merges = { @@ -49,7 +51,7 @@ def owners(self, path: str) -> List[ComplexKind]: @define(repr=True, eq=True) -class GraphUpdate(ABC): +class GraphUpdate: nodes_created: int = 0 nodes_updated: int = 0 nodes_deleted: int = 0 @@ -76,3 +78,57 @@ def __add__(self, other: GraphUpdate) -> GraphUpdate: self.edges_updated + other.edges_updated, self.edges_deleted + other.edges_deleted, ) + + +@define +class GraphChange: + node_inserts: List[Json] = field(factory=list) + node_updates: List[Json] = field(factory=list) + node_deletes: List[Json] = field(factory=list) + edge_inserts: DefaultDict[EdgeType, List[Json]] = field(factory=lambda: defaultdict(list)) + edge_updates: DefaultDict[EdgeType, List[Json]] = field(factory=lambda: defaultdict(list)) + edge_deletes: DefaultDict[EdgeType, List[Json]] = field(factory=lambda: defaultdict(list)) + + def to_update(self) -> GraphUpdate: + return GraphUpdate( + len(self.node_inserts), + len(self.node_updates), + len(self.node_deletes), + sum(len(edges) for edges in self.edge_inserts.values()), + sum(len(edges) for edges in self.edge_updates.values()), + sum(len(edges) for edges in self.edge_deletes.values()), + ) + + def change_count(self) -> int: + return self.to_update().all_changes() + + def __add__(self, other: GraphChange) -> GraphChange: + update = GraphChange() + # insert + update.node_inserts.extend(self.node_inserts) + update.node_inserts.extend(other.node_inserts) + # update + update.node_updates.extend(self.node_updates) + update.node_updates.extend(other.node_updates) + # delete + update.node_deletes.extend(self.node_deletes) + update.node_deletes.extend(other.node_deletes) + for edge_type in EdgeTypes.all: + # insert + update.edge_inserts[edge_type].extend(self.edge_inserts[edge_type]) + update.edge_inserts[edge_type].extend(other.edge_inserts[edge_type]) + # update + update.edge_updates[edge_type].extend(self.edge_updates[edge_type]) + update.edge_updates[edge_type].extend(other.edge_updates[edge_type]) + # delete + update.edge_deletes[edge_type].extend(self.edge_deletes[edge_type]) + update.edge_deletes[edge_type].extend(other.edge_deletes[edge_type]) + return update + + def clear(self) -> None: + self.node_inserts.clear() + self.node_updates.clear() + self.node_deletes.clear() + self.edge_inserts.clear() + self.edge_updates.clear() + self.edge_deletes.clear() diff --git a/fixcore/fixcore/model/graph_access.py b/fixcore/fixcore/model/graph_access.py index 2276be78b0..ed126fbf6d 100644 --- a/fixcore/fixcore/model/graph_access.py +++ b/fixcore/fixcore/model/graph_access.py @@ -409,8 +409,9 @@ def check_complete(self) -> None: for edge_type in EdgeTypes.all: key = GraphAccess.edge_key(rid, succ, edge_type) if self.graph.has_edge(rid, succ, key): + data = self.graph.get_edge_data(rid, succ, key) self.graph.remove_edge(rid, succ, key) - self.add_edge("root", succ, edge_type) + self.add_edge("root", succ, edge_type, data.get("reported")) self.graph.remove_node(rid) diff --git a/fixcore/tests/fixcore/conftest.py b/fixcore/tests/fixcore/conftest.py index c6abd831b6..5c72f2d5ca 100644 --- a/fixcore/tests/fixcore/conftest.py +++ b/fixcore/tests/fixcore/conftest.py @@ -198,8 +198,8 @@ def test_db(local_client: ArangoClient, system_db: StandardDatabase) -> Standard @fixture -async def graph_db(async_db: AsyncArangoDB) -> ArangoGraphDB: - graph_db = ArangoGraphDB(async_db, GraphName("ns"), NoAdjust(), GraphConfig(use_view=False)) +async def graph_db(async_db: AsyncArangoDB, lock_db: LockDB) -> ArangoGraphDB: + graph_db = ArangoGraphDB(async_db, GraphName("ns"), NoAdjust(), GraphConfig(use_view=False), lock_db) await graph_db.create_update_schema() await model_db(async_db, "ns_model").create_update_schema() await async_db.truncate(graph_db.in_progress)