Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SAT-17783] Fix/corrupted RA blocks content streaming #6064

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/5012.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed content-app behavior for the case where the client would get a 200 response for a package
streamed from a Remote which didnt match the expected checksum.
Now, the connection is closed before finalizing the response.
4 changes: 2 additions & 2 deletions pulp_file/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def file_fixtures_root(tmp_path):

@pytest.fixture
def write_3_iso_file_fixture_data_factory(file_fixtures_root):
def _write_3_iso_file_fixture_data_factory(name):
file_fixtures_root.joinpath(name).mkdir()
def _write_3_iso_file_fixture_data_factory(name, overwrite=False):
file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite)
file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"))
file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"))
file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"))
Expand Down
33 changes: 30 additions & 3 deletions pulpcore/content/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from multidict import CIMultiDict
import os
import re
import socket
import struct
from gettext import gettext as _

from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError
Expand Down Expand Up @@ -54,7 +56,10 @@
cache_key,
)

from pulpcore.exceptions import UnsupportedDigestValidationError # noqa: E402
from pulpcore.exceptions import ( # noqa: E402
UnsupportedDigestValidationError,
DigestValidationError,
)
from pulpcore.metrics import artifacts_size_counter # noqa: E402

from jinja2 import Template # noqa: E402: module level not at top of file
Expand Down Expand Up @@ -1125,13 +1130,25 @@ async def finalize():
await original_finalize()

downloader = remote.get_downloader(
remote_artifact=remote_artifact, headers_ready_callback=handle_response_headers
remote_artifact=remote_artifact,
headers_ready_callback=handle_response_headers,
)
original_handle_data = downloader.handle_data
downloader.handle_data = handle_data
original_finalize = downloader.finalize
downloader.finalize = finalize
download_result = await downloader.run()
try:
download_result = await downloader.run()
except DigestValidationError:
await downloader.session.close()
close_tcp_connection(request.transport._sock)
raise RuntimeError(
f"We tried streaming {remote_artifact.url!r} to the client, but it "
"failed checkusm validation. "
"At this point, we cant recover from wrong data already sent, "
"so we are forcing the connection to close. "
"If this error persists, the remote server might be corrupted."
)

if content_length := response.headers.get("Content-Length"):
response.headers["X-PULP-ARTIFACT-SIZE"] = content_length
Expand All @@ -1149,3 +1166,13 @@ async def finalize():
if response.status == 404:
raise HTTPNotFound()
return response


def close_tcp_connection(sock):
"""Configure socket to close TCP connection immediately."""
try:
l_onoff = 1
l_linger = 0 # 0 seconds timeout - immediate close
sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", l_onoff, l_linger))
except (socket.error, OSError) as e:
log.warning(f"Error configuring socket for force close: {e}")
187 changes: 182 additions & 5 deletions pulpcore/tests/functional/api/using_plugin/test_content_delivery.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"""Tests related to content delivery."""

from aiohttp.client_exceptions import ClientResponseError
import hashlib
import pytest
import subprocess
import uuid
from urllib.parse import urljoin

from pulpcore.client.pulp_file import (
RepositorySyncURL,
)
import pytest
from aiohttp.client_exceptions import ClientPayloadError, ClientResponseError

from pulpcore.client.pulp_file import RepositorySyncURL
from pulpcore.tests.functional.utils import download_file, get_files_in_manifest


Expand Down Expand Up @@ -102,3 +102,180 @@ def test_remote_artifact_url_update(
actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest()
expected_checksum = expected_file_list[0][1]
assert expected_checksum == actual_checksum


@pytest.mark.parallel
def test_remote_content_changed_with_on_demand(
write_3_iso_file_fixture_data_factory,
file_repo_with_auto_publish,
file_remote_ssl_factory,
file_bindings,
monitor_task,
file_distribution_factory,
):
"""
GIVEN a remote synced on demand with fileA (e.g, digest=123),
WHEN on the remote server, fileA changed its content (e.g, digest=456),
THEN retrieving fileA from the content app will cause a connection-close/incomplete-response.
"""
# GIVEN
basic_manifest_path = write_3_iso_file_fixture_data_factory("basic")
remote = file_remote_ssl_factory(manifest_path=basic_manifest_path, policy="on_demand")
body = RepositorySyncURL(remote=remote.pulp_href)
monitor_task(
file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task
)
repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href)
distribution = file_distribution_factory(repository=repo.pulp_href)
expected_file_list = list(get_files_in_manifest(remote.url))

# WHEN
write_3_iso_file_fixture_data_factory("basic", overwrite=True)

# THEN
get_url = urljoin(distribution.base_url, expected_file_list[0][0])
with pytest.raises(ClientPayloadError, match="Response payload is not completed"):
download_file(get_url)

# Assert again with curl just to be sure.
result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert result.returncode == 18
assert b"* Closing connection 0" in result.stderr
assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr


# import select
# from multiprocessing import Process, Queue

# import requests


# def run_server(port: int, server_dir: str, q: Queue):
# import http.server
# import os

# handler_cls = http.server.SimpleHTTPRequestHandler
# server_cls = http.server.HTTPServer

# os.chdir(server_dir)
# server_address = ("", port)
# httpd = server_cls(server_address, handler_cls)

# q.put(httpd.fileno()) # send to parent so can use select
# httpd.serve_forever()


# def create_server(port: int, server_dir: str) -> Process:
# # setup/teardown server
# q = Queue()
# proc = Process(target=run_server, args=(port, server_dir, q))
# proc.start()

# # block until the server socket fd is ready for write
# server_socket_fd = q.get()
# _, w, _ = select.select([], [server_socket_fd], [], 5)
# if not w:
# proc.terminate()
# proc.join()
# raise TimeoutError("The test server didnt get ready.")
# return proc


# @pytest.fixture
# def http_server(write_3_iso_file_fixture_data_factory):
# # setup data
# server_dir = write_3_iso_file_fixture_data_factory("server_foo").rsplit("/")[0]
# # setup server
# port = 8787
# proc = create_server(port, server_dir)
# base_url = f"http://localhost:{port}"
# yield base_url
# proc.terminate()
# proc.join()

# def test_fixture_server(http_server):
# result = requests.get(http_server + "/1.iso")
# assert result.ok is True


def test_fixture_server(
write_3_iso_file_fixture_data_factory,
gen_fixture_server,
):
import requests
import time

fixture_root = write_3_iso_file_fixture_data_factory("server_a")
server_data = gen_fixture_server(fixture_root, None)
url = server_data.make_url(fixture_root)
time.sleep(5)
result = requests.get(url)
assert result.ok is True


@pytest.mark.parallel
def test_handling_remote_artifact_on_demand_streaming_failure(
write_3_iso_file_fixture_data_factory,
file_repo_with_auto_publish,
file_remote_ssl_factory,
file_bindings,
monitor_task,
monitor_task_group,
file_distribution_factory,
gen_object_with_cleanup,
gen_fixture_server,
):
"""
GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS).
AND Only the ACS RemoteArtifact (that has priority on the content-app) is corrupted

WHEN a client requests the content for the first time
THEN the client doesnt get any content

WHEN a client requests the content for the second time
THEN the client gets the right content
"""

# Plumbing
def create_remote(remote_url):
return file_remote_ssl_factory(manifest_path=remote_url, policy="on_demand")

def create_fixture_server(name, http_server=False):
fixture_root = write_3_iso_file_fixture_data_factory(name)
if not http_server: # return filesystem path
return fixture_root
else: # return http server url
server_data = gen_fixture_server(fixture_root, None)
return server_data.make_url(fixture_root)

def sync_publish_and_distribute(remote):
body = RepositorySyncURL(remote=remote.pulp_href)
monitor_task(
file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task
)
repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href)
distribution = file_distribution_factory(repository=repo.pulp_href)
return repo, distribution

def create_and_bind_acs_to_remote(remote, paths: list[str]):
acs = gen_object_with_cleanup(
file_bindings.AcsFileApi,
{"remote": remote.pulp_href, "paths": paths, "name": str(uuid.uuid4())},
)
monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group)
return acs

# GIVEN
sync_server_path = create_fixture_server("sync_server")
acs_server_url = create_fixture_server("acs_server", http_server=True)
remote = create_remote(remote_url=sync_server_path)
repo, distribution = sync_publish_and_distribute(remote)
create_and_bind_acs_to_remote(remote, [acs_server_url])

# WHEN (first request)

# THEN (first request)

# WHEN (second request)

# THEN (second request)
Loading