Skip to content

Commit

Permalink
chore: fetch and store ODCS compose
Browse files Browse the repository at this point in the history
Adds a method to pull and store ODCS compose.

Signed-off-by: Omer <[email protected]>
  • Loading branch information
Omeramsc committed Dec 7, 2023
1 parent 57e85c0 commit 4443117
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ black = "*"
mypy = "*"
pylint = "*"
isort = "*"
pre-commit = "*"
responses = "*"

[dev-packages]
pytest = "*"

[requires]
python_version = "3.11"
Expand Down
208 changes: 205 additions & 3 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion compose_generator/odcs_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""Fetch ready ODCS compose"""
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

import requests # type: ignore

from .odcs_requester import ODCSRequestReference
from .protocols import ComposeFetcher, ComposeReference


Expand All @@ -20,5 +25,35 @@ class ODCSFetcher(ComposeFetcher):
Fetch ODCS compose based on a remote compose-reference and store it locally
"""

compose_path: Optional[Path] = None

def __call__(self, request_reference: ComposeReference) -> ODCSResultReference:
raise NotImplementedError()
"""
Fetch the ODCS compose from a remote reference.
:param request_reference: object containing the url reference for the ODCS compose file.
:raises HTTPError: If the request for the ODCS compose file failed.
:return: The filesystem path to the downloaded ODCS compose file.
"""
compose_path = self._get_compose_destination_path()

assert isinstance(request_reference, ODCSRequestReference)
response = requests.get(request_reference.compose_url, timeout=30) ####
response.raise_for_status()

with open(compose_path, "wb") as file:
file.write(response.content)
ocds_result_ref = ODCSResultReference(compose_path=compose_path)
return ocds_result_ref

def _get_compose_destination_path(self) -> Path:
"""Return the compose file destination and creates it if it does
not already exist.
:return: the Path to the compose file.
"""
compose_path = self.compose_path or Path(tempfile.mkdtemp())
compose_path.mkdir(parents=True, exist_ok=True)
compose_path = compose_path / "ocds_compose.repo"
return compose_path
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""conftest.py - Common pytest fixtures
"""
import tempfile
from pathlib import Path

import pytest


@pytest.fixture
def tmpdir() -> Path:
"""Creates a tmporary directory and return its path"""
return Path(tempfile.mkdtemp())
Loading

0 comments on commit 4443117

Please sign in to comment.