diff --git a/haystack/preview/components/fetchers/link_content.py b/haystack/preview/components/fetchers/link_content.py index 4412da0f86..62368aff11 100644 --- a/haystack/preview/components/fetchers/link_content.py +++ b/haystack/preview/components/fetchers/link_content.py @@ -1,17 +1,16 @@ -import io import logging from collections import defaultdict -from datetime import datetime -from typing import Optional, Dict, List, Callable, Any, IO +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable, Dict, List, Optional, Tuple import requests from requests import Response from requests.exceptions import HTTPError -from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryCallState -from haystack.preview import component, default_from_dict, default_to_dict +from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential from haystack import __version__ -from haystack.preview import Document +from haystack.preview import component, default_from_dict, default_to_dict +from haystack.preview.dataclasses import ByteStream logger = logging.getLogger(__name__) @@ -26,26 +25,27 @@ } -def text_content_handler(response: Response) -> Dict[str, str]: +def text_content_handler(response: Response) -> ByteStream: """ :param response: Response object from the request. :return: The extracted text. """ - return {"text": response.text} + return ByteStream.from_string(response.text) -def binary_content_handler(response: Response) -> Dict[str, IO[bytes]]: +def binary_content_handler(response: Response) -> ByteStream: """ :param response: Response object from the request. :return: The extracted binary file-like object. """ - return {"blob": io.BytesIO(response.content)} + return ByteStream(data=response.content) @component class LinkContentFetcher: """ - LinkContentFetcher fetches content from a URL link and converts it to a Document object. + LinkContentFetcher is a component for fetching and extracting content from URLs. It supports handling various + content types, retries on failures, and automatic user-agent rotation for failed web requests. """ def __init__( @@ -56,15 +56,13 @@ def __init__( timeout: int = 3, ): """ - Creates a LinkContentFetcher instance. + Initializes a LinkContentFetcher instance. - :param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs - during content extraction. If False, the error is simply logged and the program continues. - Defaults to False. - :param user_agents: A list of user agents to use when fetching content. Defaults to None, in which case a - default user agent is used. - :param retry_attempts: The number of times to retry fetching content. Defaults to 2. - :param timeout: The timeout in seconds for the request. Defaults to 3. + :param raise_on_failure: If True, raises an exception on failure when fetching a single URL. + For multiple URLs, errors are logged and successful fetches are returned. Default is True. + :param user_agents: A list of user agents for fetching content. If None, a default user agent is used. + :param retry_attempts: Number of retry attempts for fetching content. Default is 2. + :param timeout: Timeout in seconds for the request. Default is 3. """ self.raise_on_failure = raise_on_failure self.user_agents = user_agents or [DEFAULT_USER_AGENT] @@ -73,7 +71,7 @@ def __init__( self.timeout = timeout # register default content handlers that extract data from the response - self.handlers: Dict[str, Callable[[Response], Dict[str, Any]]] = defaultdict(lambda: text_content_handler) + self.handlers: Dict[str, Callable[[Response], ByteStream]] = defaultdict(lambda: text_content_handler) self.handlers["text/html"] = text_content_handler self.handlers["text/plain"] = text_content_handler self.handlers["application/pdf"] = binary_content_handler @@ -116,37 +114,96 @@ def from_dict(cls, data: Dict[str, Any]) -> "LinkContentFetcher": """ return default_from_dict(cls, data) - @component.output_types(documents=Optional[Document]) - def run(self, url: str): + @component.output_types(streams=List[ByteStream]) + def run(self, urls: List[str]): """ - Fetches content from a URL and converts it to a Document objects. If no content is extracted, - an empty Document object is returned (if raise_on_failure is False). + Fetches content from a list of URLs and returns a list of extracted content streams. + Each content stream is a ByteStream object containing the extracted content as binary data. + The content type of each stream is stored in the metadata of the ByteStream object under + the key "content_type". The URL of the fetched content is stored under the key "url". + + :param urls: A list of URLs to fetch content from. + :return: A lists of ByteStream objects representing the extracted content. - :param url: URL to fetch content from. - :param timeout: Timeout in seconds for the request. - :return: List of Document objects or an empty list if no content is extracted. + :raises: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to True, + an exception will be raised in case of an error during content retrieval. In all other scenarios, any + retrieval errors are logged, and a list of successfully retrieved ByteStream objects is returned. """ - document_data: Dict[str, Any] = {"metadata": {"url": url, "timestamp": int(datetime.utcnow().timestamp())}} + streams: List[ByteStream] = [] + if not urls: + return {"streams": streams} + + # don't use multithreading if there's only one URL + if len(urls) == 1: + stream_metadata, stream = self.fetch(urls[0]) + stream.metadata.update(stream_metadata) + streams.append(stream) + else: + with ThreadPoolExecutor() as executor: + results = executor.map(self._fetch_with_exception_suppression, urls) + + for stream_metadata, stream in results: # type: ignore + if stream_metadata is not None and stream is not None: + stream.metadata.update(stream_metadata) + streams.append(stream) + + return {"streams": streams} + + def fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]: + """ + Fetches content from a URL and returns it as a ByteStream. + + :param url: The URL to fetch content from. + :return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream. + ByteStream metadata contains the URL and the content type of the fetched content. + The content type is a string indicating the type of content fetched (e.g., "text/html", "application/pdf"). + The ByteStream object contains the fetched content as binary data. + + :raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will + raise an exception. Otherwise, all fetching errors are logged, and an empty ByteStream is returned. + + """ + content_type: str = "text/html" + stream: ByteStream = ByteStream(data=b"") try: response = self._get_response(url) content_type = self._get_content_type(response) - document_data["mime_type"] = content_type handler: Callable = self.handlers[content_type] - document_data.update(handler(response)) - return {"document": Document(**document_data)} - + stream = handler(response) except Exception as e: if self.raise_on_failure: raise e - logger.debug("Couldn't retrieve content from %s", url) - return {"document": None} + # less verbose log as this is expected to happen often (requests failing, blocked, etc.) + logger.debug("Couldn't retrieve content from %s due to %s", url, str(e)) finally: self.current_user_agent_idx = 0 + return {"content_type": content_type, "url": url}, stream + + def _fetch_with_exception_suppression(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]: + """ + Fetches content from a URL and returns it as a ByteStream. + + If `raise_on_failure` is set to True, this method will wrap the fetch method and catch any exceptions. + Otherwise, it will simply call the fetch method. + :param url: The URL to fetch content from. + :return: A tuple containing the ByteStream metadata dict and the corresponding ByteStream. + + """ + if self.raise_on_failure: + try: + return self.fetch(url) + except Exception as e: + logger.warning("Error fetching %s: %s", url, str(e)) + return {"content_type": "Unknown", "url": url}, None + else: + return self.fetch(url) + def _get_content_type(self, response: Response): """ Get the content type of the response. + :param response: The response object. :return: The content type of the response. """ @@ -157,6 +214,7 @@ def _switch_user_agent(self, retry_state: RetryCallState) -> None: """ Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents. Used by tenacity to retry the requests with a different user agent. + :param retry_state: The retry state (unused, required by tenacity). """ self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents) diff --git a/releasenotes/notes/add-link-content-fetcher-145915976f38e1e0.yaml b/releasenotes/notes/add-link-content-fetcher-145915976f38e1e0.yaml index bd4c8610d1..f2699d4051 100644 --- a/releasenotes/notes/add-link-content-fetcher-145915976f38e1e0.yaml +++ b/releasenotes/notes/add-link-content-fetcher-145915976f38e1e0.yaml @@ -1,5 +1,5 @@ --- preview: - | - Adds LinkContentFetcher component to Haystack 2.0. LinkContentFetcher fetches content from a given URL and - converts it into a Document object, which can then be used within the Haystack 2.0 pipeline. + Introduced the LinkContentFetcher in Haystack 2.0. This component fetches content from specified + URLs and converts them into ByteStream objects for further processing in Haystack pipelines. diff --git a/test/preview/components/fetchers/test_link_content_fetcher.py b/test/preview/components/fetchers/test_link_content_fetcher.py index a8be562cd1..91b88dee69 100644 --- a/test/preview/components/fetchers/test_link_content_fetcher.py +++ b/test/preview/components/fetchers/test_link_content_fetcher.py @@ -1,7 +1,7 @@ -import io from unittest.mock import patch, Mock import pytest +import requests from haystack.preview.components.fetchers.link_content import ( LinkContentFetcher, @@ -99,27 +99,29 @@ def test_from_dict(self): @pytest.mark.unit def test_run_text(self): + correct_response = b"Example test response" with patch("haystack.preview.components.fetchers.link_content.requests") as mock_run: mock_run.get.return_value = Mock( status_code=200, text="Example test response", headers={"Content-Type": "text/plain"} ) fetcher = LinkContentFetcher() - document = fetcher.run("https://www.example.com")["document"] - assert document.text == "Example test response" - assert document.metadata["url"] == "https://www.example.com" - assert "timestamp" in document.metadata + streams = fetcher.run(urls=["https://www.example.com"])["streams"] + first_stream = streams[0] + assert first_stream.data == correct_response + assert first_stream.metadata["content_type"] == "text/plain" @pytest.mark.unit def test_run_html(self): + correct_response = b"