diff --git a/src/llmling/resources/base.py b/src/llmling/resources/base.py index d8d5b3c..76fc550 100644 --- a/src/llmling/resources/base.py +++ b/src/llmling/resources/base.py @@ -5,9 +5,13 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime +import os +import re from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, cast, overload +import urllib.parse import logfire +import upath from llmling.config.models import BaseResource from llmling.core import exceptions @@ -97,6 +101,8 @@ class ResourceLoader[TResource](ABC): context_class: type[TResource] uri_scheme: ClassVar[str] supported_mime_types: ClassVar[list[str]] = ["text/plain"] + # Invalid path characters (Windows + Unix) + invalid_chars_pattern = re.compile(r'[\x00-\x1F<>:"|?*\\]') def __init__(self, context: LoaderContext[TResource] | None = None) -> None: """Initialize loader with optional context.""" @@ -132,18 +138,49 @@ def get_name_from_uri(cls, uri: str) -> str: """ try: if not cls.supports_uri(uri): - msg = f"Unsupported URI scheme: {uri}" + msg = f"Unsupported URI: {uri}" raise exceptions.LoaderError(msg) # noqa: TRY301 - # Split "scheme://name" and get name part - _, name = uri.split("://", 1) + path = upath.UPath(uri) - # Normalize path separators - return name.replace("\\", "/").lstrip("/") + # Get parts excluding protocol info + parts = [ + urllib.parse.unquote(str(part)) # URL decode each part + for part in path.parts + if not cls._is_ignorable_part(str(part)) + ] + if not parts: + msg = "Empty path after normalization" + raise exceptions.LoaderError(msg) # noqa: TRY301 + + # Validate path components + for part in parts: + if cls.invalid_chars_pattern.search(part): + msg = f"Invalid characters in path component: {part}" + raise exceptions.LoaderError(msg) # noqa: TRY301 + + # Join with forward slashes and normalize path + joined = "/".join(parts) + # Normalize path (resolve .. and .) + normalized = os.path.normpath(joined).replace("\\", "/") except Exception as exc: - msg = f"Invalid URI format: {uri}" + if isinstance(exc, exceptions.LoaderError): + raise + msg = f"Invalid URI: {uri}" raise exceptions.LoaderError(msg) from exc + else: + return normalized + + @staticmethod + def _is_ignorable_part(part: str) -> bool: + """Check if a path component should be ignored.""" + return ( + not part + or part in {".", ".."} + or (len(part) == 2 and part[1] == ":") # Drive letter # noqa: PLR2004 + or part in {"/", "\\"} + ) @classmethod def create_uri(cls, *, name: str) -> str: diff --git a/src/llmling/resources/loaders/path.py b/src/llmling/resources/loaders/path.py index d9e7e3e..fdabba8 100644 --- a/src/llmling/resources/loaders/path.py +++ b/src/llmling/resources/loaders/path.py @@ -26,8 +26,6 @@ class PathResourceLoader(ResourceLoader[PathResource]): "text/markdown", "text/yaml", ] - # Invalid path characters (Windows + Unix) - invalid_chars_pattern = re.compile(r'[\x00-\x1F<>:"|?*\\]') @classmethod def supports_uri(cls, uri: str) -> bool: @@ -75,16 +73,6 @@ def get_name_from_uri(cls, uri: str) -> str: msg = f"Invalid URI: {uri}" raise exceptions.LoaderError(msg) from exc - @staticmethod - def _is_ignorable_part(part: str) -> bool: - """Check if a path component should be ignored.""" - return ( - not part - or part in {".", ".."} - or (len(part) == 2 and part[1] == ":") # Drive letter # noqa: PLR2004 - or part in {"/", "\\"} - ) - @classmethod def create_uri(cls, *, name: str) -> str: """Create a URI from a path or URL."""