From 9852f892f8b05176f66d3f90e539f53f0772f409 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Fri, 22 Nov 2024 18:23:51 -0300 Subject: [PATCH 01/10] Add tests ensure corrupt RA hit doesnt block good RA --- .../api/using_plugin/test_content_delivery.py | 146 +++++++++++++++++- 1 file changed, 141 insertions(+), 5 deletions(-) diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index 11735ca04d..3daee5e6de 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -1,15 +1,14 @@ """Tests related to content delivery.""" -from aiohttp.client_exceptions import ClientResponseError, ClientPayloadError import hashlib -import pytest import subprocess +import uuid from urllib.parse import urljoin -from pulpcore.client.pulp_file import ( - RepositorySyncURL, -) +import pytest +from aiohttp.client_exceptions import ClientPayloadError, ClientResponseError +from pulpcore.client.pulp_file import RepositorySyncURL from pulpcore.tests.functional.utils import download_file, get_files_in_manifest @@ -143,3 +142,140 @@ def test_remote_content_changed_with_on_demand( assert result.returncode == 18 assert b"* Closing connection 0" in result.stderr assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr + + +# import select +# from multiprocessing import Process, Queue + +# import requests + + +# def run_server(port: int, server_dir: str, q: Queue): +# import http.server +# import os + +# handler_cls = http.server.SimpleHTTPRequestHandler +# server_cls = http.server.HTTPServer + +# os.chdir(server_dir) +# server_address = ("", port) +# httpd = server_cls(server_address, handler_cls) + +# q.put(httpd.fileno()) # send to parent so can use select +# httpd.serve_forever() + + +# def create_server(port: int, server_dir: str) -> Process: +# # setup/teardown server +# q = Queue() +# proc = Process(target=run_server, args=(port, server_dir, q)) +# proc.start() + +# # block until the server socket fd is ready for write +# server_socket_fd = q.get() +# _, w, _ = select.select([], [server_socket_fd], [], 5) +# if not w: +# proc.terminate() +# proc.join() +# raise TimeoutError("The test server didnt get ready.") +# return proc + + +# @pytest.fixture +# def http_server(write_3_iso_file_fixture_data_factory): +# # setup data +# server_dir = write_3_iso_file_fixture_data_factory("server_foo").rsplit("/")[0] +# # setup server +# port = 8787 +# proc = create_server(port, server_dir) +# base_url = f"http://localhost:{port}" +# yield base_url +# proc.terminate() +# proc.join() + +# def test_fixture_server(http_server): +# result = requests.get(http_server + "/1.iso") +# assert result.ok is True + + +def test_fixture_server( + write_3_iso_file_fixture_data_factory, + gen_fixture_server, +): + import requests + import time + + fixture_root = write_3_iso_file_fixture_data_factory("server_a") + server_data = gen_fixture_server(fixture_root, None) + url = server_data.make_url(fixture_root) + time.sleep(5) + result = requests.get(url) + assert result.ok is True + + +@pytest.mark.parallel +def test_handling_remote_artifact_on_demand_streaming_failure( + write_3_iso_file_fixture_data_factory, + file_repo_with_auto_publish, + file_remote_ssl_factory, + file_bindings, + monitor_task, + monitor_task_group, + file_distribution_factory, + gen_object_with_cleanup, + gen_fixture_server, +): + """ + GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS). + AND Only the ACS RemoteArtifact (that has priority on the content-app) is corrupted + + WHEN a client requests the content for the first time + THEN the client doesnt get any content + + WHEN a client requests the content for the second time + THEN the client gets the right content + """ + + # Plumbing + def create_remote(remote_url): + return file_remote_ssl_factory(manifest_path=remote_url, policy="on_demand") + + def create_fixture_server(name, http_server=False): + fixture_root = write_3_iso_file_fixture_data_factory(name) + if not http_server: # return filesystem path + return fixture_root + else: # return http server url + server_data = gen_fixture_server(fixture_root, None) + return server_data.make_url(fixture_root) + + def sync_publish_and_distribute(remote): + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) + distribution = file_distribution_factory(repository=repo.pulp_href) + return repo, distribution + + def create_and_bind_acs_to_remote(remote, paths: list[str]): + acs = gen_object_with_cleanup( + file_bindings.AcsFileApi, + {"remote": remote.pulp_href, "paths": paths, "name": str(uuid.uuid4())}, + ) + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) + return acs + + # GIVEN + sync_server_path = create_fixture_server("sync_server") + acs_server_url = create_fixture_server("acs_server", http_server=True) + remote = create_remote(remote_url=sync_server_path) + repo, distribution = sync_publish_and_distribute(remote) + create_and_bind_acs_to_remote(remote, [acs_server_url]) + + # WHEN (first request) + + # THEN (first request) + + # WHEN (second request) + + # THEN (second request) From 561cfb2ee1bd798fcf76a465dea6134969d116ed Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Mon, 25 Nov 2024 20:02:55 -0300 Subject: [PATCH 02/10] fixup: create http-server fixture for acs --- .../api/using_plugin/test_content_delivery.py | 131 ++++++++++-------- 1 file changed, 70 insertions(+), 61 deletions(-) diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index 3daee5e6de..967f05a1fb 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -1,11 +1,16 @@ """Tests related to content delivery.""" import hashlib +import random +import select import subprocess import uuid +from contextlib import contextmanager +from multiprocessing import Process, Queue from urllib.parse import urljoin import pytest +import requests from aiohttp.client_exceptions import ClientPayloadError, ClientResponseError from pulpcore.client.pulp_file import RepositorySyncURL @@ -144,67 +149,73 @@ def test_remote_content_changed_with_on_demand( assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr -# import select -# from multiprocessing import Process, Queue +def run_server(port: int, server_dir: str, q: Queue): + import http.server + import os -# import requests + handler_cls = http.server.SimpleHTTPRequestHandler + server_cls = http.server.HTTPServer + os.chdir(server_dir) + server_address = ("", port) + httpd = server_cls(server_address, handler_cls) -# def run_server(port: int, server_dir: str, q: Queue): -# import http.server -# import os + q.put(httpd.fileno()) # send to parent so can use select + httpd.serve_forever() -# handler_cls = http.server.SimpleHTTPRequestHandler -# server_cls = http.server.HTTPServer -# os.chdir(server_dir) -# server_address = ("", port) -# httpd = server_cls(server_address, handler_cls) +def create_server(port: int, server_dir: str) -> Process: + # setup/teardown server + q = Queue() + proc = Process(target=run_server, args=(port, server_dir, q)) + proc.start() -# q.put(httpd.fileno()) # send to parent so can use select -# httpd.serve_forever() + # block until the server socket fd is ready for write + server_socket_fd = q.get() + _, w, _ = select.select([], [server_socket_fd], [], 5) + if not w: + proc.terminate() + proc.join() + raise TimeoutError("The test server didnt get ready.") + return proc -# def create_server(port: int, server_dir: str) -> Process: -# # setup/teardown server -# q = Queue() -# proc = Process(target=run_server, args=(port, server_dir, q)) -# proc.start() +@pytest.fixture +def http_server_factory(): -# # block until the server socket fd is ready for write -# server_socket_fd = q.get() -# _, w, _ = select.select([], [server_socket_fd], [], 5) -# if not w: -# proc.terminate() -# proc.join() -# raise TimeoutError("The test server didnt get ready.") -# return proc + @contextmanager + def _http_server(fixture_dir, url_path): + port = random.randint(9000, 10000) + try: + proc = create_server(port, fixture_dir) + base_url = f"http://localhost:{port}/" + yield base_url + finally: + proc.terminate() + proc.join() + return _http_server -# @pytest.fixture -# def http_server(write_3_iso_file_fixture_data_factory): -# # setup data -# server_dir = write_3_iso_file_fixture_data_factory("server_foo").rsplit("/")[0] -# # setup server -# port = 8787 -# proc = create_server(port, server_dir) -# base_url = f"http://localhost:{port}" -# yield base_url -# proc.terminate() -# proc.join() -# def test_fixture_server(http_server): -# result = requests.get(http_server + "/1.iso") -# assert result.ok is True +def test_http_server( + http_server_factory, write_3_iso_file_fixture_data_factory, file_fixtures_root +): + server_name = "myserver" + manifest_path = write_3_iso_file_fixture_data_factory(server_name) + with http_server_factory(file_fixtures_root, server_name) as http_server: + url = http_server + "1.iso" + result = requests.get(url) + assert result.ok is True def test_fixture_server( write_3_iso_file_fixture_data_factory, gen_fixture_server, ): - import requests import time + import requests + fixture_root = write_3_iso_file_fixture_data_factory("server_a") server_data = gen_fixture_server(fixture_root, None) url = server_data.make_url(fixture_root) @@ -223,7 +234,8 @@ def test_handling_remote_artifact_on_demand_streaming_failure( monitor_task_group, file_distribution_factory, gen_object_with_cleanup, - gen_fixture_server, + http_server_factory, + file_fixtures_root, ): """ GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS). @@ -237,16 +249,12 @@ def test_handling_remote_artifact_on_demand_streaming_failure( """ # Plumbing - def create_remote(remote_url): - return file_remote_ssl_factory(manifest_path=remote_url, policy="on_demand") + def generate_fixture_data(name): + manifest_path = write_3_iso_file_fixture_data_factory(name) + return file_fixtures_root / name, manifest_path - def create_fixture_server(name, http_server=False): - fixture_root = write_3_iso_file_fixture_data_factory(name) - if not http_server: # return filesystem path - return fixture_root - else: # return http server url - server_data = gen_fixture_server(fixture_root, None) - return server_data.make_url(fixture_root) + def create_remote(remote_url): + return file_remote_ssl_factory(manifest_path=str(remote_url), policy="on_demand") def sync_publish_and_distribute(remote): body = RepositorySyncURL(remote=remote.pulp_href) @@ -257,7 +265,7 @@ def sync_publish_and_distribute(remote): distribution = file_distribution_factory(repository=repo.pulp_href) return repo, distribution - def create_and_bind_acs_to_remote(remote, paths: list[str]): + def create_acs_and_bind_to_remote(remote, paths: list[str]): acs = gen_object_with_cleanup( file_bindings.AcsFileApi, {"remote": remote.pulp_href, "paths": paths, "name": str(uuid.uuid4())}, @@ -266,16 +274,17 @@ def create_and_bind_acs_to_remote(remote, paths: list[str]): return acs # GIVEN - sync_server_path = create_fixture_server("sync_server") - acs_server_url = create_fixture_server("acs_server", http_server=True) - remote = create_remote(remote_url=sync_server_path) - repo, distribution = sync_publish_and_distribute(remote) - create_and_bind_acs_to_remote(remote, [acs_server_url]) + _, sync_server_manifest_path = generate_fixture_data("sync_server", return_manifest_path=True) + acs_server_path, _ = generate_fixture_data("acs_server") + with (http_server_factory(acs_server_path, "acs_server") as acs_server_url,): + remote = create_remote(remote_url=sync_server_manifest_path) + repo, distribution = sync_publish_and_distribute(remote) + create_acs_and_bind_to_remote(remote, [acs_server_url]) - # WHEN (first request) + # WHEN (first request) - # THEN (first request) + # THEN (first request) - # WHEN (second request) + # WHEN (second request) - # THEN (second request) + # THEN (second request) From 5d8a3e7be187112135130367a1cc0a78133e4b21 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 26 Nov 2024 20:03:45 -0300 Subject: [PATCH 03/10] fixup: finish the test setup for the desired behavior --- pulp_file/pytest_plugin.py | 25 ++- pulp_file/tests/functional/api/test_acs.py | 15 -- .../api/using_plugin/test_content_delivery.py | 163 +++++++----------- pulpcore/tests/functional/utils.py | 9 +- 4 files changed, 87 insertions(+), 125 deletions(-) diff --git a/pulp_file/pytest_plugin.py b/pulp_file/pytest_plugin.py index 2fe4624469..d1c42057f9 100644 --- a/pulp_file/pytest_plugin.py +++ b/pulp_file/pytest_plugin.py @@ -83,11 +83,11 @@ def file_fixtures_root(tmp_path): @pytest.fixture def write_3_iso_file_fixture_data_factory(file_fixtures_root): - def _write_3_iso_file_fixture_data_factory(name, overwrite=False): + def _write_3_iso_file_fixture_data_factory(name, overwrite=False, seed=None): file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite) - file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso")) - file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso")) - file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso")) + file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"), seed=seed) + file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"), seed=seed) + file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"), seed=seed) generate_manifest( file_fixtures_root.joinpath(f"{name}/PULP_MANIFEST"), [file1, file2, file3] ) @@ -364,3 +364,20 @@ def _wget_recursive_download_on_host(url, destination): ) return _wget_recursive_download_on_host + + +@pytest.fixture +def generate_server_and_remote( + file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup +): + def _generate_server_and_remote(*, manifest_path, policy): + server = gen_fixture_server(file_fixtures_root, None) + url = server.make_url(manifest_path) + remote = gen_object_with_cleanup( + file_bindings.RemotesFileApi, + {"name": str(uuid.uuid4()), "url": str(url), "policy": policy}, + ) + return server, remote + + yield _generate_server_and_remote + diff --git a/pulp_file/tests/functional/api/test_acs.py b/pulp_file/tests/functional/api/test_acs.py index 873a607402..ab7d7632db 100644 --- a/pulp_file/tests/functional/api/test_acs.py +++ b/pulp_file/tests/functional/api/test_acs.py @@ -12,21 +12,6 @@ ) -@pytest.fixture -def generate_server_and_remote( - file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup -): - def _generate_server_and_remote(*, manifest_path, policy): - server = gen_fixture_server(file_fixtures_root, None) - url = server.make_url(manifest_path) - remote = gen_object_with_cleanup( - file_bindings.RemotesFileApi, - {"name": str(uuid.uuid4()), "url": str(url), "policy": policy}, - ) - return server, remote - - yield _generate_server_and_remote - @pytest.mark.parallel def test_acs_validation_and_update( diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index 967f05a1fb..d790324c1e 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -143,99 +143,24 @@ def test_remote_content_changed_with_on_demand( download_file(get_url) # Assert again with curl just to be sure. - result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result = subprocess.run(["curl", "-v", get_url], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) assert result.returncode == 18 assert b"* Closing connection 0" in result.stderr assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr -def run_server(port: int, server_dir: str, q: Queue): - import http.server - import os - - handler_cls = http.server.SimpleHTTPRequestHandler - server_cls = http.server.HTTPServer - - os.chdir(server_dir) - server_address = ("", port) - httpd = server_cls(server_address, handler_cls) - - q.put(httpd.fileno()) # send to parent so can use select - httpd.serve_forever() - - -def create_server(port: int, server_dir: str) -> Process: - # setup/teardown server - q = Queue() - proc = Process(target=run_server, args=(port, server_dir, q)) - proc.start() - - # block until the server socket fd is ready for write - server_socket_fd = q.get() - _, w, _ = select.select([], [server_socket_fd], [], 5) - if not w: - proc.terminate() - proc.join() - raise TimeoutError("The test server didnt get ready.") - return proc - - -@pytest.fixture -def http_server_factory(): - - @contextmanager - def _http_server(fixture_dir, url_path): - port = random.randint(9000, 10000) - try: - proc = create_server(port, fixture_dir) - base_url = f"http://localhost:{port}/" - yield base_url - finally: - proc.terminate() - proc.join() - - return _http_server - - -def test_http_server( - http_server_factory, write_3_iso_file_fixture_data_factory, file_fixtures_root -): - server_name = "myserver" - manifest_path = write_3_iso_file_fixture_data_factory(server_name) - with http_server_factory(file_fixtures_root, server_name) as http_server: - url = http_server + "1.iso" - result = requests.get(url) - assert result.ok is True - - -def test_fixture_server( - write_3_iso_file_fixture_data_factory, - gen_fixture_server, -): - import time - - import requests - - fixture_root = write_3_iso_file_fixture_data_factory("server_a") - server_data = gen_fixture_server(fixture_root, None) - url = server_data.make_url(fixture_root) - time.sleep(5) - result = requests.get(url) - assert result.ok is True - - @pytest.mark.parallel def test_handling_remote_artifact_on_demand_streaming_failure( write_3_iso_file_fixture_data_factory, file_repo_with_auto_publish, - file_remote_ssl_factory, + file_remote_factory, file_bindings, monitor_task, monitor_task_group, file_distribution_factory, gen_object_with_cleanup, - http_server_factory, - file_fixtures_root, + generate_server_and_remote ): """ GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS). @@ -249,42 +174,72 @@ def test_handling_remote_artifact_on_demand_streaming_failure( """ # Plumbing - def generate_fixture_data(name): - manifest_path = write_3_iso_file_fixture_data_factory(name) - return file_fixtures_root / name, manifest_path + def create_simple_remote(manifest_path): + remote = file_remote_factory( + manifest_path=manifest_path, policy="on_demand") + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync( + file_repo_with_auto_publish.pulp_href, body).task + ) + return remote - def create_remote(remote_url): - return file_remote_ssl_factory(manifest_path=str(remote_url), policy="on_demand") + def create_acs_remote(manifest_path): + acs_server, acs_remote = generate_server_and_remote( + manifest_path=manifest_path, policy="on_demand" + ) + acs = gen_object_with_cleanup( + file_bindings.AcsFileApi, + {"remote": acs_remote.pulp_href, + "paths": [], "name": str(uuid.uuid4())}, + ) + monitor_task_group(file_bindings.AcsFileApi.refresh( + acs.pulp_href).task_group) + return acs def sync_publish_and_distribute(remote): body = RepositorySyncURL(remote=remote.pulp_href) monitor_task( - file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + file_bindings.RepositoriesFileApi.sync( + file_repo_with_auto_publish.pulp_href, body).task ) - repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) + repo = file_bindings.RepositoriesFileApi.read( + file_repo_with_auto_publish.pulp_href) distribution = file_distribution_factory(repository=repo.pulp_href) - return repo, distribution + return distribution - def create_acs_and_bind_to_remote(remote, paths: list[str]): - acs = gen_object_with_cleanup( - file_bindings.AcsFileApi, - {"remote": remote.pulp_href, "paths": paths, "name": str(uuid.uuid4())}, - ) - monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) + def refresh_acs(acs): + monitor_task_group(file_bindings.AcsFileApi.refresh( + acs.pulp_href).task_group) return acs - # GIVEN - _, sync_server_manifest_path = generate_fixture_data("sync_server", return_manifest_path=True) - acs_server_path, _ = generate_fixture_data("acs_server") - with (http_server_factory(acs_server_path, "acs_server") as acs_server_url,): - remote = create_remote(remote_url=sync_server_manifest_path) - repo, distribution = sync_publish_and_distribute(remote) - create_acs_and_bind_to_remote(remote, [acs_server_url]) + def get_original_content_info(remote): + expected_files = get_files_in_manifest(remote.url) + content_unit = list(expected_files)[0] + return content_unit[0], content_unit[1] - # WHEN (first request) + def download_from_distribution(content, distribution): + content_unit_url = urljoin(distribution.base_url, content_name) + downloaded_file = download_file(content_unit_url) + actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest() + return actual_checksum - # THEN (first request) + # GIVEN + basic_manifest_path = write_3_iso_file_fixture_data_factory( + "basic", seed=123) + acs_manifest_path = write_3_iso_file_fixture_data_factory("acs", seed=123) + remote = create_simple_remote(basic_manifest_path) + distribution = sync_publish_and_distribute(remote) + acs = create_acs_remote(acs_manifest_path) + refresh_acs(acs) + write_3_iso_file_fixture_data_factory("acs", overwrite=True) # corrupt + + # WHEN/THEN (first request) + content_name, expected_checksum = get_original_content_info(remote) - # WHEN (second request) + with pytest.raises(ClientPayloadError, match="Response payload is not completed"): + download_from_distribution(content_name, distribution) - # THEN (second request) + # WHEN/THEN (second request) + actual_checksum = download_from_distribution(content_name, distribution) + assert actual_checksum == expected_checksum diff --git a/pulpcore/tests/functional/utils.py b/pulpcore/tests/functional/utils.py index 4248cd6ee0..d5651099fc 100644 --- a/pulpcore/tests/functional/utils.py +++ b/pulpcore/tests/functional/utils.py @@ -4,6 +4,7 @@ import asyncio import hashlib import os +import random from aiohttp import web from dataclasses import dataclass @@ -103,10 +104,14 @@ async def _download_file(url, auth=None, headers=None): return MockDownload(body=await response.read(), response_obj=response) -def generate_iso(full_path, size=1024, relative_path=None): +def generate_iso(full_path, size=1024, relative_path=None, seed=None): """Generate a random file.""" with open(full_path, "wb") as fout: - contents = os.urandom(size) + if seed: + random.seed(seed) + contents = random.randbytes(size) + else: + contents = os.urandom(size) fout.write(contents) fout.flush() digest = hashlib.sha256(contents).hexdigest() From 5594395f8519f98b3adafd7e9db1b4a16e895e70 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Tue, 26 Nov 2024 20:05:57 -0300 Subject: [PATCH 04/10] fixup: cleanup and autoformat --- pulp_file/pytest_plugin.py | 1 - pulp_file/tests/functional/api/test_acs.py | 1 - .../api/using_plugin/test_content_delivery.py | 34 ++++++------------- 3 files changed, 10 insertions(+), 26 deletions(-) diff --git a/pulp_file/pytest_plugin.py b/pulp_file/pytest_plugin.py index d1c42057f9..bd468d1ec6 100644 --- a/pulp_file/pytest_plugin.py +++ b/pulp_file/pytest_plugin.py @@ -380,4 +380,3 @@ def _generate_server_and_remote(*, manifest_path, policy): return server, remote yield _generate_server_and_remote - diff --git a/pulp_file/tests/functional/api/test_acs.py b/pulp_file/tests/functional/api/test_acs.py index ab7d7632db..b4150b3112 100644 --- a/pulp_file/tests/functional/api/test_acs.py +++ b/pulp_file/tests/functional/api/test_acs.py @@ -12,7 +12,6 @@ ) - @pytest.mark.parallel def test_acs_validation_and_update( file_bindings, diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index d790324c1e..862d0a6e0f 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -1,16 +1,11 @@ """Tests related to content delivery.""" import hashlib -import random -import select import subprocess import uuid -from contextlib import contextmanager -from multiprocessing import Process, Queue from urllib.parse import urljoin import pytest -import requests from aiohttp.client_exceptions import ClientPayloadError, ClientResponseError from pulpcore.client.pulp_file import RepositorySyncURL @@ -143,8 +138,7 @@ def test_remote_content_changed_with_on_demand( download_file(get_url) # Assert again with curl just to be sure. - result = subprocess.run(["curl", "-v", get_url], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) assert result.returncode == 18 assert b"* Closing connection 0" in result.stderr assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr @@ -160,7 +154,7 @@ def test_handling_remote_artifact_on_demand_streaming_failure( monitor_task_group, file_distribution_factory, gen_object_with_cleanup, - generate_server_and_remote + generate_server_and_remote, ): """ GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS). @@ -175,12 +169,10 @@ def test_handling_remote_artifact_on_demand_streaming_failure( # Plumbing def create_simple_remote(manifest_path): - remote = file_remote_factory( - manifest_path=manifest_path, policy="on_demand") + remote = file_remote_factory(manifest_path=manifest_path, policy="on_demand") body = RepositorySyncURL(remote=remote.pulp_href) monitor_task( - file_bindings.RepositoriesFileApi.sync( - file_repo_with_auto_publish.pulp_href, body).task + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task ) return remote @@ -190,27 +182,22 @@ def create_acs_remote(manifest_path): ) acs = gen_object_with_cleanup( file_bindings.AcsFileApi, - {"remote": acs_remote.pulp_href, - "paths": [], "name": str(uuid.uuid4())}, + {"remote": acs_remote.pulp_href, "paths": [], "name": str(uuid.uuid4())}, ) - monitor_task_group(file_bindings.AcsFileApi.refresh( - acs.pulp_href).task_group) + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) return acs def sync_publish_and_distribute(remote): body = RepositorySyncURL(remote=remote.pulp_href) monitor_task( - file_bindings.RepositoriesFileApi.sync( - file_repo_with_auto_publish.pulp_href, body).task + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task ) - repo = file_bindings.RepositoriesFileApi.read( - file_repo_with_auto_publish.pulp_href) + repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) distribution = file_distribution_factory(repository=repo.pulp_href) return distribution def refresh_acs(acs): - monitor_task_group(file_bindings.AcsFileApi.refresh( - acs.pulp_href).task_group) + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) return acs def get_original_content_info(remote): @@ -225,8 +212,7 @@ def download_from_distribution(content, distribution): return actual_checksum # GIVEN - basic_manifest_path = write_3_iso_file_fixture_data_factory( - "basic", seed=123) + basic_manifest_path = write_3_iso_file_fixture_data_factory("basic", seed=123) acs_manifest_path = write_3_iso_file_fixture_data_factory("acs", seed=123) remote = create_simple_remote(basic_manifest_path) distribution = sync_publish_and_distribute(remote) From d5f13af9cb9aa77315aed8ecc52f5d19dfa69b3d Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 27 Nov 2024 12:37:37 -0300 Subject: [PATCH 05/10] fixup: implement fix --- .../0126_remoteartifact_failed_at.py | 18 ++++++++++++++++++ pulpcore/app/models/content.py | 2 ++ pulpcore/content/handler.py | 12 +++++++++--- 3 files changed, 29 insertions(+), 3 deletions(-) create mode 100644 pulpcore/app/migrations/0126_remoteartifact_failed_at.py diff --git a/pulpcore/app/migrations/0126_remoteartifact_failed_at.py b/pulpcore/app/migrations/0126_remoteartifact_failed_at.py new file mode 100644 index 0000000000..d5b9e2309a --- /dev/null +++ b/pulpcore/app/migrations/0126_remoteartifact_failed_at.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.16 on 2024-11-27 15:06 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0125_openpgpdistribution_openpgpkeyring_openpgppublickey_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='remoteartifact', + name='failed_at', + field=models.DateTimeField(null=True), + ), + ] diff --git a/pulpcore/app/models/content.py b/pulpcore/app/models/content.py index a0ba46f627..5e18fa76aa 100644 --- a/pulpcore/app/models/content.py +++ b/pulpcore/app/models/content.py @@ -703,6 +703,7 @@ class RemoteArtifact(BaseModel, QueryMixin): sha256 (models.CharField): The expected SHA-256 checksum of the file. sha384 (models.CharField): The expected SHA-384 checksum of the file. sha512 (models.CharField): The expected SHA-512 checksum of the file. + failed_at (models.DateTimeField): The datetime of last download attempt failure. Relations: @@ -721,6 +722,7 @@ class RemoteArtifact(BaseModel, QueryMixin): sha256 = models.CharField(max_length=64, null=True, db_index=True) sha384 = models.CharField(max_length=96, null=True, db_index=True) sha512 = models.CharField(max_length=128, null=True, db_index=True) + failed_at = models.DateTimeField(null=True) content_artifact = models.ForeignKey(ContentArtifact, on_delete=models.CASCADE) remote = models.ForeignKey("Remote", on_delete=models.CASCADE) diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index 339837348c..456d970838 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -6,6 +6,7 @@ import socket import struct from gettext import gettext as _ +from datetime import timedelta from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError from aiohttp.web import FileResponse, StreamResponse, HTTPOk @@ -22,6 +23,7 @@ from asgiref.sync import sync_to_async import django +from django.utils import timezone from pulpcore.constants import STORAGE_RESPONSE_MAP from pulpcore.responses import ArtifactResponse @@ -828,9 +830,11 @@ async def _stream_content_artifact(self, request, response, content_artifact): ClientConnectionError, ) - remote_artifacts = content_artifact.remoteartifact_set.select_related( - "remote" - ).order_by_acs() + remote_artifacts = ( + content_artifact.remoteartifact_set.select_related("remote") + .order_by_acs() + .exclude(failed_at__gte=timezone.now() - timedelta(minutes=5)) + ) async for remote_artifact in remote_artifacts: try: response = await self._stream_remote_artifact(request, response, remote_artifact) @@ -1140,6 +1144,8 @@ async def finalize(): try: download_result = await downloader.run() except DigestValidationError: + remote_artifact.failed_at = timezone.now() + await remote_artifact.asave() await downloader.session.close() close_tcp_connection(request.transport._sock) raise RuntimeError( From 255057befe0c7fe2556930009c1c62c753d2c40d Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 27 Nov 2024 12:55:55 -0300 Subject: [PATCH 06/10] fixup: rewrite close-connection test to account for new behavior --- .../functional/api/using_plugin/test_content_delivery.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index 862d0a6e0f..1aa6560fd7 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -134,10 +134,6 @@ def test_remote_content_changed_with_on_demand( # THEN get_url = urljoin(distribution.base_url, expected_file_list[0][0]) - with pytest.raises(ClientPayloadError, match="Response payload is not completed"): - download_file(get_url) - - # Assert again with curl just to be sure. result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) assert result.returncode == 18 assert b"* Closing connection 0" in result.stderr From aff83aa49b65205947752a2c6ef2864bbec474e3 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 27 Nov 2024 14:20:25 -0300 Subject: [PATCH 07/10] Fix corrupted RA from blocking other Remotes content-streaming On a request for on-demand content in the content app, a corrupted Remote that contains the wrong binary (for that content) prevented other Remotes from being attempted on future requests. Now the last failed Remotes are temporarily ignored and others may be picked. Closes #5725 --- CHANGES/5725.bugfix | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 CHANGES/5725.bugfix diff --git a/CHANGES/5725.bugfix b/CHANGES/5725.bugfix new file mode 100644 index 0000000000..136e7ff2c9 --- /dev/null +++ b/CHANGES/5725.bugfix @@ -0,0 +1,4 @@ +On a request for on-demand content in the content app, a corrupted Remote that +contains the wrong binary (for that content) prevented other Remotes from being +attempted on future requests. Now the last failed Remotes are temporarily ignored +and others may be picked. From 60e179ab79a9ab08981d879d1f8e2e1febe4c9ce Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 27 Nov 2024 15:26:19 -0300 Subject: [PATCH 08/10] fixup: improve log error message --- pulpcore/content/handler.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index 456d970838..40a7709854 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -1149,11 +1149,13 @@ async def finalize(): await downloader.session.close() close_tcp_connection(request.transport._sock) raise RuntimeError( - f"We tried streaming {remote_artifact.url!r} to the client, but it " - "failed checkusm validation. " - "At this point, we cant recover from wrong data already sent, " - "so we are forcing the connection to close. " - "If this error persists, the remote server might be corrupted." + f"DigestValidationError: Pulp tried streaming {remote_artifact.url!r} to " + "the client, but it failed checkusm validation.\n\n" + "We cant recover from wrong data already sent so we are:\n" + "- Forcing the connection to close.\n" + "- Marking this Remote Server to be ignored for the next 5 minutes.\n\n" + "If the Remote is permanently corrupted, we advice the admin to " + "manually prune affected RemoteArtifacts/Remotes from Pulp." ) if content_length := response.headers.get("Content-Length"): From c66bc709de324a1bbb31188a666b6809ed0065ba Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Thu, 28 Nov 2024 10:28:42 -0300 Subject: [PATCH 09/10] fixup: rewrite error message --- pulpcore/content/handler.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index 40a7709854..31a4280e24 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -1149,13 +1149,15 @@ async def finalize(): await downloader.session.close() close_tcp_connection(request.transport._sock) raise RuntimeError( - f"DigestValidationError: Pulp tried streaming {remote_artifact.url!r} to " - "the client, but it failed checkusm validation.\n\n" - "We cant recover from wrong data already sent so we are:\n" + f"Pulp tried streaming {remote_artifact.url!r} to " + "the client, but it failed checksum validation.\n\n" + "We can't recover from wrong data already sent so we are:\n" "- Forcing the connection to close.\n" - "- Marking this Remote Server to be ignored for the next 5 minutes.\n\n" - "If the Remote is permanently corrupted, we advice the admin to " - "manually prune affected RemoteArtifacts/Remotes from Pulp." + "- Marking this Remote Server to be ignored for 5 minutes.\n\n" + "If the Remote is known to be fixed, try resyncing the associated repository.\n" + "If the Remote is known to be permanently corrupted, try removing " + "affected Pulp Remote, adding a good one and resyncing.\n" + "If the problem persists, please contact the Pulp team." ) if content_length := response.headers.get("Content-Length"): From 03d1a119c6d6ffdedb036cdb2943c3bf1da8529c Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Thu, 28 Nov 2024 15:32:53 -0300 Subject: [PATCH 10/10] fixup: make disable-time configurable --- pulpcore/app/settings.py | 5 +++++ pulpcore/content/handler.py | 12 +++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index 690b1a7108..d4cda8adb4 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -295,6 +295,11 @@ "EXPIRES_TTL": 600, # 10 minutes } +# The time a RemoteArtifact will be ignored after failure. +# In on-demand, if a fetching content from a remote failed due to corrupt data, +# the corresponding RemoteArtifact will be ignored for that time (seconds). +FAILED_REMOTE_ARTIFACT_PROTECTION_TIME = 5 * 60 # 5 minutes + SPECTACULAR_SETTINGS = { "SERVE_URLCONF": ROOT_URLCONF, "DEFAULT_GENERATOR_CLASS": "pulpcore.openapi.PulpSchemaGenerator", diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index 31a4280e24..7df3d007fb 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -822,24 +822,25 @@ async def _stream_content_artifact(self, request, response, content_artifact): [pulpcore.plugin.models.ContentArtifact][] returned the binary data needed for the client. """ - # We should only retry with exceptions that happen before we receive any data + # We should only skip exceptions that happen before we receive any data # and start streaming, as we can't rollback data if something happens after that. - RETRYABLE_EXCEPTIONS = ( + SKIPPABLE_EXCEPTIONS = ( ClientResponseError, UnsupportedDigestValidationError, ClientConnectionError, ) + protection_time = settings.FAILED_REMOTE_ARTIFACT_PROTECTION_TIME remote_artifacts = ( content_artifact.remoteartifact_set.select_related("remote") .order_by_acs() - .exclude(failed_at__gte=timezone.now() - timedelta(minutes=5)) + .exclude(failed_at__gte=timezone.now() - timedelta(seconds=protection_time)) ) async for remote_artifact in remote_artifacts: try: response = await self._stream_remote_artifact(request, response, remote_artifact) return response - except RETRYABLE_EXCEPTIONS as e: + except SKIPPABLE_EXCEPTIONS as e: log.warning( "Could not download remote artifact at '{}': {}".format( remote_artifact.url, str(e) @@ -1148,12 +1149,13 @@ async def finalize(): await remote_artifact.asave() await downloader.session.close() close_tcp_connection(request.transport._sock) + FAILED_REMOTE_ARTIFACT_PROTECTION_TIME = settings.FAILED_REMOTE_ARTIFACT_PROTECTION_TIME raise RuntimeError( f"Pulp tried streaming {remote_artifact.url!r} to " "the client, but it failed checksum validation.\n\n" "We can't recover from wrong data already sent so we are:\n" "- Forcing the connection to close.\n" - "- Marking this Remote Server to be ignored for 5 minutes.\n\n" + f"- Marking this Remote to be ignored for {FAILED_REMOTE_ARTIFACT_PROTECTION_TIME=}s.\n\n" "If the Remote is known to be fixed, try resyncing the associated repository.\n" "If the Remote is known to be permanently corrupted, try removing " "affected Pulp Remote, adding a good one and resyncing.\n"