From fe69da70014c28af3235269a65cb9dc414740d1b Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Tue, 10 Dec 2024 09:45:16 +0100 Subject: [PATCH] Leverage kinto-http 11.7.0 features (#1502) --- commands/__init__.py | 26 ++++++++++++-------------- commands/backport_records.py | 31 +++++++++---------------------- commands/build_bundles.py | 6 ++---- requirements.in | 2 +- requirements.txt | 6 +++--- tests/test_build_bundles.py | 2 +- 6 files changed, 28 insertions(+), 45 deletions(-) diff --git a/commands/__init__.py b/commands/__init__.py index a3682fe8..c6146b12 100644 --- a/commands/__init__.py +++ b/commands/__init__.py @@ -10,6 +10,7 @@ PARALLEL_REQUESTS = int(os.getenv("PARALLEL_REQUESTS", 4)) REQUESTS_TIMEOUT_SECONDS = float(os.getenv("REQUESTS_TIMEOUT_SECONDS", 2)) REQUESTS_NB_RETRIES = int(os.getenv("REQUESTS_NB_RETRIES", 4)) +DRY_MODE = os.getenv("DRY_RUN", "0") in "1yY" retry_timeout = backoff.on_exception( backoff.expo, @@ -38,6 +39,7 @@ class KintoClient(kinto_http.Client): def __init__(self, *args, **kwargs): kwargs.setdefault("retry", REQUESTS_NB_RETRIES) + kwargs.setdefault("dry_mode", DRY_MODE) super().__init__(*args, **kwargs) @retry_timeout @@ -57,20 +59,16 @@ def get_records_timestamp(self, *args, **kwargs): return super().get_records_timestamp(*args, **kwargs) @retry_timeout - def get_changeset(self, bid, cid, expected): - url = f"{self.session.server_url}/buckets/{bid}/collections/{cid}/changeset?_expected={expected}" - resp = requests.get(url) - resp.raise_for_status() - changeset = resp.json() - return changeset - - -def records_equal(a, b): - """Compare records, ignoring timestamps.""" - ignored_fields = ("last_modified", "schema") - ra = {k: v for k, v in a.items() if k not in ignored_fields} - rb = {k: v for k, v in b.items() if k not in ignored_fields} - return ra == rb + def get_changeset(self, *args, **kwargs): + return super().get_changeset(*args, **kwargs) + + @retry_timeout + def approve_changes(self, *args, **kwargs): + return super().approve_changes(*args, **kwargs) + + @retry_timeout + def request_review(self, *args, **kwargs): + return super().request_review(*args, **kwargs) def call_parallel(func, args_list, max_workers=PARALLEL_REQUESTS): diff --git a/commands/backport_records.py b/commands/backport_records.py index ae7e2eab..0c43a45b 100644 --- a/commands/backport_records.py +++ b/commands/backport_records.py @@ -2,9 +2,9 @@ import os from decouple import config +from kinto_http.utils import collection_diff from . import KintoClient as Client -from . import records_equal def backport_records(event, context, **kwargs): @@ -56,21 +56,10 @@ def backport_records(event, context, **kwargs): ) source_records = source_client.get_records(**source_filters) - dest_records_by_id = {r["id"]: r for r in dest_client.get_records()} + dest_records = dest_client.get_records() + to_create, to_update, to_delete = collection_diff(source_records, dest_records) - # Create or update the destination records. - to_create = [] - to_update = [] - for r in source_records: - dest_record = dest_records_by_id.pop(r["id"], None) - if dest_record is None: - to_create.append(r) - elif not records_equal(r, dest_record): - to_update.append(r) - # Delete the records missing from source. - to_delete = dest_records_by_id.values() - - is_behind = (len(to_create) + len(to_update) + len(to_delete)) > 0 + is_behind = to_create or to_update or to_delete has_pending_changes = is_behind if not is_behind: # When this lambda is ran with a signed collection as @@ -87,13 +76,11 @@ def backport_records(event, context, **kwargs): with dest_client.batch() as dest_batch: for r in to_create: dest_batch.create_record(data=r) - for r in to_update: - # Let the server assign a new timestamp. - del r["last_modified"] + for old, new in to_update: # Add some concurrency control headers (make sure the # destination record wasn't changed since we read it). - if_match = dest_record["last_modified"] if safe_headers else None - dest_batch.update_record(data=r, if_match=if_match) + if_match = old["last_modified"] if safe_headers else None + dest_batch.update_record(data=new, if_match=if_match) for r in to_delete: dest_batch.delete_record(id=r["id"]) @@ -126,9 +113,9 @@ def backport_records(event, context, **kwargs): ) if has_autoapproval: # Approve the changes. - dest_client.patch_collection(data={"status": "to-sign"}) + dest_client.approve_changes() print(f"Done. {ops_count} changes applied and signed.") else: # Request review. - dest_client.patch_collection(data={"status": "to-review"}) + dest_client.request_review(message="r?") print(f"Done. Requested review for {ops_count} changes.") diff --git a/commands/build_bundles.py b/commands/build_bundles.py index 135efe28..25ba815e 100644 --- a/commands/build_bundles.py +++ b/commands/build_bundles.py @@ -8,7 +8,6 @@ import io import json import os -import random import tempfile import zipfile from email.utils import parsedate_to_datetime @@ -40,15 +39,14 @@ def fetch_all_changesets(client): The result contains the metadata and all the records of all collections for both preview and main buckets. """ - random_cache_bust = random.randint(999999000000, 999999999999) - monitor_changeset = client.get_changeset("monitor", "changes", random_cache_bust) + monitor_changeset = client.get_changeset("monitor", "changes", bust_cache=True) print("%s collections" % len(monitor_changeset["changes"])) args_list = [ (c["bucket"], c["collection"], c["last_modified"]) for c in monitor_changeset["changes"] ] all_changesets = call_parallel( - lambda bid, cid, ts: client.get_changeset(bid, cid, ts), args_list + lambda bid, cid, ts: client.get_changeset(bid, cid, _expected=ts), args_list ) return all_changesets diff --git a/requirements.in b/requirements.in index 597b1ad5..0e8688f1 100644 --- a/requirements.in +++ b/requirements.in @@ -1,6 +1,6 @@ backoff python-decouple -kinto-http +kinto-http>=11.7.0 requests sentry_sdk google-cloud-storage diff --git a/requirements.txt b/requirements.txt index 2d082bc4..08bd6ee3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -217,9 +217,9 @@ idna==3.7 \ --hash=sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc \ --hash=sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0 # via requests -kinto-http==11.6.0 \ - --hash=sha256:30722690f672f549f908bc9c45eb49010463930eeccfc439d4d318c6e5d5ae27 \ - --hash=sha256:701b489cc25d304a4e1cf74da1d01168895b39baa709f04b0482ac8eda6431cb +kinto-http==11.7.0 \ + --hash=sha256:10613c60eb3ad609e00cb3b598eceecf57c7e0a7a889441c6e8c181fc85eca13 \ + --hash=sha256:1eb658d68dfc94779db3c1683b72865f8d669db37364e51b1f3d23460577e029 # via -r requirements.in lz4==4.3.3 \ --hash=sha256:01fe674ef2889dbb9899d8a67361e0c4a2c833af5aeb37dd505727cf5d2a131e \ diff --git a/tests/test_build_bundles.py b/tests/test_build_bundles.py index 69402a5a..a1b59d7c 100644 --- a/tests/test_build_bundles.py +++ b/tests/test_build_bundles.py @@ -64,7 +64,7 @@ def dummy_func(x, y): @responses.activate -@patch("commands.build_bundles.random") +@patch("kinto_http.client.random") def test_fetch_all_changesets(mock_random): mock_random.randint.return_value = 42 changeset_url = (