Skip to content

Commit

Permalink
Refactoring step 4
Browse files Browse the repository at this point in the history
  • Loading branch information
vblagoje committed Jun 5, 2024
1 parent 5167f71 commit 83e8f88
Showing 1 changed file with 19 additions and 36 deletions.
55 changes: 19 additions & 36 deletions haystack_experimental/util/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
from haystack_experimental.util.schema_conversion import anthropic_converter, cohere_converter, openai_converter

VALID_HTTP_METHODS = ["get", "put", "post", "delete", "options", "head", "patch", "trace"]

MIN_REQUIRED_OPENAPI_SPEC_VERSION = 3

logger = logging.getLogger(__name__)


class AuthenticationStrategy:
"""
Represents an authentication strategy that can be applied to an HTTP request.
"""

def apply_auth(self, security_scheme: Dict[str, Any], request: Dict[str, Any]):
"""
Apply the authentication strategy to the given request.
Expand All @@ -41,7 +38,6 @@ def apply_auth(self, security_scheme: Dict[str, Any], request: Dict[str, Any]):
@dataclass
class ApiKeyAuthentication(AuthenticationStrategy):
"""API key authentication strategy."""

api_key: Optional[str] = None

def apply_auth(self, security_scheme: Dict[str, Any], request: Dict[str, Any]):
Expand All @@ -67,7 +63,6 @@ def apply_auth(self, security_scheme: Dict[str, Any], request: Dict[str, Any]):
@dataclass
class HTTPAuthentication(AuthenticationStrategy):
"""HTTP authentication strategy."""

username: Optional[str] = None
password: Optional[str] = None
token: Optional[str] = None
Expand Down Expand Up @@ -235,8 +230,7 @@ def get_security_schemes(self) -> Dict[str, Dict[str, Any]]:
"""
Get the security schemes from the OpenAPI specification.
"""
components = self.spec_dict.get("components", {})
return components.get("securitySchemes", {})
return self.spec_dict.get("components", {}).get("securitySchemes", {})


class ClientConfiguration:
Expand Down Expand Up @@ -391,54 +385,43 @@ def send_request(request: Dict[str, Any]) -> Dict[str, Any]:
return send_request

def _build_request(self, operation: Operation, **kwargs) -> Any:
request = {
"url": self._build_url(operation, **kwargs),
"method": operation.method.lower(),
"headers": self._build_headers(operation, **kwargs),
"params": self._build_query_params(operation, **kwargs),
"json": self._build_request_body(operation, **kwargs),
}
return request

def _build_headers(self, operation: Operation, **kwargs) -> Dict[str, str]:
headers = {}
for parameter in operation.get_parameters("header"):
param_value = kwargs.get(parameter["name"], None)
if param_value:
headers[parameter["name"]] = str(param_value)
elif parameter.get("required", False):
raise ValueError(f"Missing required header parameter: {parameter['name']}")
return headers

def _build_url(self, operation: Operation, **kwargs) -> str:
server_url = operation.get_server()
# url
path = operation.path
for parameter in operation.get_parameters("path"):
param_value = kwargs.get(parameter["name"], None)
if param_value:
path = path.replace(f"{{{parameter['name']}}}", str(param_value))
elif parameter.get("required", False):
raise ValueError(f"Missing required path parameter: {parameter['name']}")
return server_url + path

def _build_query_params(self, operation: Operation, **kwargs) -> Dict[str, Any]:
url = operation.get_server() + path
# method
method = operation.method.lower()
# headers
headers = {}
for parameter in operation.get_parameters("header"):
param_value = kwargs.get(parameter["name"], None)
if param_value:
headers[parameter["name"]] = str(param_value)
elif parameter.get("required", False):
raise ValueError(f"Missing required header parameter: {parameter['name']}")
# query params
query_params = {}
for parameter in operation.get_parameters("query"):
param_value = kwargs.get(parameter["name"], None)
if param_value:
query_params[parameter["name"]] = param_value
elif parameter.get("required", False):
raise ValueError(f"Missing required query parameter: {parameter['name']}")
return query_params

def _build_request_body(self, operation: Operation, **kwargs) -> Any:
json_payload = None
request_body = operation.request_body
if request_body:
content = request_body.get("content", {})
if "application/json" in content:
return {**kwargs}
raise NotImplementedError("Request body content type not supported")
return None
json_payload = {**kwargs}
else:
raise NotImplementedError("Request body content type not supported")
return {"url": url, "method": method, "headers": headers, "params": query_params, "json": json_payload}

def _apply_authentication(self, auth: AuthenticationStrategy, operation: Operation, request: Dict[str, Any]):
auth_config = auth or AuthenticationStrategy()
Expand Down

0 comments on commit 83e8f88

Please sign in to comment.