Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Feature/manager #15

Merged
merged 25 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
abdb193
Adds manager
bwalsh Nov 11, 2022
828de8c
Install the dev dependencies, test mock, skip terra test.
bwalsh Nov 11, 2022
2f2f371
Moves constant to proper location
bwalsh Nov 16, 2022
efef7b8
Optimizes memory use
bwalsh Nov 18, 2022
f18ca4b
# This is a combination of 2 commits.
Nov 21, 2022
1d903b7
replaced pandas with csv, added optimizer, added download sort by fil…
lbeckman314 Nov 23, 2022
3f6645d
Add checksums to release assets
lbeckman314 Nov 28, 2022
b1821b2
Remove extraneous driver file
lbeckman314 Dec 12, 2022
111dcc9
Cleans up code
bwalsh Dec 13, 2022
1ae93c1
Adds integration tests
bwalsh Dec 13, 2022
db3949c
Adds gen3
bwalsh Dec 13, 2022
aad0fa5
Adds testplan outline
bwalsh Dec 13, 2022
5bc84e9
flake8
bwalsh Dec 13, 2022
e44745b
Minor cleanup
bwalsh Dec 14, 2022
4662a12
Adds check for correct object size; skip if errors.
bwalsh Dec 14, 2022
526d3aa
Adds failure tests to mock
bwalsh Dec 14, 2022
8d19c36
Capture exceptions in drs_object.errors, close session
bwalsh Dec 14, 2022
02d9135
Uses logger instead of print
bwalsh Dec 14, 2022
e4866ad
Fix docstring
bwalsh Dec 14, 2022
40db04f
Flake8
bwalsh Dec 14, 2022
4213252
Speed up by checking only our code
bwalsh Dec 14, 2022
c12b5bc
Addressed Michael's review comments resolved basic problems
lbeckman314 Dec 19, 2022
25dfcb0
added optimizer part size test and terra default directory download test
matthewpeterkort Dec 19, 2022
6410e24
Output download destination to stdout
lbeckman314 Dec 21, 2022
eae8fec
Fix README example
lbeckman314 Dec 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ jobs:
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi
pip install -e .
mkdir /tmp/testing
bwalsh marked this conversation as resolved.
Show resolved Hide resolved
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
Expand All @@ -35,4 +38,4 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
pytest tests/ -k 'not test_terra'
bwalsh marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 9 additions & 0 deletions drs_downloader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
KB = 1024
MB = KB * KB
GB = MB * MB

DEFAULT_MAX_SIMULTANEOUS_OBJECT_RETRIEVERS = 10
DEFAULT_MAX_SIMULTANEOUS_DOWNLOADERS = 10
DEFAULT_MAX_SIMULTANEOUS_PART_HANDLERS = 3
MAX_SIZE_OF_OBJECT = 50 * MB
DEFAULT_PART_SIZE = 10 * MB
121 changes: 117 additions & 4 deletions drs_downloader/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,124 @@
import logging
import uuid
from pathlib import Path
from typing import List

import click
import pandas as pd

from drs_downloader.clients.mock import MockDrsClient
from drs_downloader.clients.terra import TerraDrsClient
from drs_downloader.manager import DrsAsyncManager

logging.basicConfig(format='%(asctime)s %(message)s', encoding='utf-8', level=logging.INFO)
logger = logging.getLogger(__name__)

# these control our simulation
NUMBER_OF_OBJECT_IDS = 10


@click.command()
@click.group()
def cli():
"""Welcome to drs_downloader."""
print(cli.__doc__)
"""Copy DRS objects from the cloud to your local system ."""
pass


@cli.command()
@click.option("--silent", "-s", is_flag=True, show_default=True, default=False, help="Display nothing.")
@click.option("--destination_dir", "-d", show_default=True, default='/tmp/testing', help="Destination directory.")
def mock(silent: bool, destination_dir: str):
bwalsh marked this conversation as resolved.
Show resolved Hide resolved
"""Generate test files locally, without the need for server."""
if destination_dir:
destination_dir = Path(destination_dir)

# get a drs client
drs_client = MockDrsClient()

# assign it to a manager
drs_manager = DrsAsyncManager(drs_client=drs_client, show_progress=not silent)

# simulate read from manifest
ids_from_manifest = [str(uuid.uuid4()) for _ in range(NUMBER_OF_OBJECT_IDS)]

# get the DrsObject for the ids
drs_objects = drs_manager.get_objects(ids_from_manifest)
bwalsh marked this conversation as resolved.
Show resolved Hide resolved

# shape the workload
drs_objects = drs_manager.optimize_workload(drs_objects)

# download the objects, now with file_parts
drs_objects = drs_manager.download(drs_objects, destination_dir)

# show results
if not silent:
for drs_object in drs_objects:
if len(drs_object.errors) > 0:
logger.error((drs_object.name, 'ERROR', drs_object.size, len(drs_object.file_parts), drs_object.errors))
else:
logger.info((drs_object.name, 'OK', drs_object.size, len(drs_object.file_parts)))
logger.info(('done', 'statistics.max_files_open', drs_client.statistics.max_files_open))


@cli.command()
@click.option("--silent", "-s", is_flag=True, show_default=True, default=False, help="Display nothing.")
@click.option("--destination_dir", "-d", show_default=True, default='/tmp/testing', help="Destination directory.")
@click.option("--manifest_path", "-m", show_default=True, default='tests/fixtures/terra-data.tsv',
help="Path to manifest tsv.")
def terra(silent: bool, destination_dir: str, manifest_path: str):
"""Copy files from terra.bio"""

if destination_dir:
destination_dir = Path(destination_dir)
if manifest_path:
manifest_path = Path(manifest_path)

# get a drs client
drs_client = TerraDrsClient()

# assign it to a manager
drs_manager = DrsAsyncManager(drs_client=drs_client, show_progress=not silent)

# read from manifest
def _extract_tsv_info(manifest_path_: Path, drs_header: str = 'pfb:ga4gh_drs_uri') -> List[str]:
"""Extract the DRS URI's from the provided TSV file.

Args:
manifest_path_ (str): The input file containing a list of DRS URI's.
drs_header (str): Column header for the DRS URI's.
Returns:
List[str]: The URI's corresponding to the DRS objects.
"""
uris = []

df = pd.read_csv(manifest_path_, sep='\t')
if drs_header in df.columns.values.tolist():
for i in range(df[drs_header].count()):
uris.append(df['pfb:ga4gh_drs_uri'][i])
else:
raise KeyError(f"Header '{drs_header}' not found in {manifest_path}")

return uris

ids_from_manifest = _extract_tsv_info(manifest_path)

# get the DrsObject for the ids
drs_objects = drs_manager.get_objects(ids_from_manifest)
bwalsh marked this conversation as resolved.
Show resolved Hide resolved

# shape the workload
drs_objects = drs_manager.optimize_workload(drs_objects)

# download the objects, now with file_parts
drs_objects = drs_manager.download(drs_objects, destination_dir)

# show results
if not silent:
for drs_object in drs_objects:
if len(drs_object.errors) > 0:
logger.error((drs_object.name, 'ERROR', drs_object.size, len(drs_object.file_parts), drs_object.errors))
else:
logger.info((drs_object.name, 'OK', drs_object.size, len(drs_object.file_parts)))
logger.info(('done', 'statistics.max_files_open', drs_client.statistics.max_files_open))


if __name__ == '__main__':
if __name__ == "__main__":
cli()
Empty file.
120 changes: 120 additions & 0 deletions drs_downloader/clients/mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import asyncio
import hashlib
import logging
import os
import tempfile
import uuid
from pathlib import Path
import random

from drs_downloader import MAX_SIZE_OF_OBJECT
from drs_downloader.models import DrsClient, DrsObject, AccessMethod, Checksum

logger = logging.getLogger(__name__)


class MockDrsClient(DrsClient):
"""Simulate responses from server.

"""

async def sign_url(self, drs_object: DrsObject) -> DrsObject:
"""Simulate url signing by waiting 1-3 seconds, return populated DrsObject

Args:
drs_object:

Returns:
populated DrsObject
"""
# here we sleep while the file is open and measure total files open
fp = tempfile.TemporaryFile()
sleep_duration = random.randint(1, 3)
await asyncio.sleep(delay=sleep_duration)
fp.write(b'sign url')
self.statistics.set_max_files_open()
fp.close()

# provide expected result, e.g. X-Signature
access_url = f"{drs_object.self_uri}?X-Signature={uuid.uuid1()}"
# place it in the right spot in the drs object
drs_object.access_methods.append(AccessMethod(access_url=access_url, type='gs'))

return drs_object

async def download_part(self, drs_object: DrsObject, start: int, size: int, destination_path: Path) -> Path:
"""Actually download a part.

Args:
destination_path:
drs_object:
start:
size:

Returns:
full path to that part.
"""

# calculate actual part size from range see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range

length_ = size - start
# logger.info((drs_object.name, start, length_))
with open(f'/tmp/testing/{drs_object.name}.golden', 'rb') as f:
f.seek(start)
data = f.read(length_)

(fd, name,) = tempfile.mkstemp(prefix=f'{drs_object.name}.{start}.{size}.', suffix='.part',
dir=str(destination_path))
with os.fdopen(fd, 'wb') as fp:
sleep_duration = random.randint(1, 3)
await asyncio.sleep(delay=sleep_duration)
fp.write(data)
self.statistics.set_max_files_open()
fp.close()

return Path(name)

async def get_object(self, object_id: str) -> DrsObject:
"""Fetch the object from repository DRS Service.

See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_getobject.

Args:
object_id:

Returns:

"""
# """Actually fetch the object.
#
# """
fp = tempfile.TemporaryFile()
sleep_duration = random.randint(1, 3)
await asyncio.sleep(delay=sleep_duration)
fp.write(b'get object')
self.statistics.set_max_files_open()
fp.close()

id_ = str(uuid.uuid4())
name_ = f"file-{id_}.txt"

line = b'Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.\n' # noqa
line_len = len(line)
number_of_lines = int(random.randint(line_len, MAX_SIZE_OF_OBJECT) / line_len)
lines = line * number_of_lines
size_ = len(lines)

# write it for testing
with open(f'/tmp/testing/{name_}.golden', 'wb') as f:
f.write(lines)
return DrsObject(
self_uri=f"drs://{object_id}",
size=size_,
# md5, etag, crc32c, trunc512, or sha1
checksums=[
Checksum(checksum=hashlib.new('md5', lines).hexdigest(),
type='md5')
],
id=id_,
name=name_,
)
97 changes: 97 additions & 0 deletions drs_downloader/clients/terra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import subprocess
import tempfile
from pathlib import Path

import aiofiles
import aiohttp

from drs_downloader.models import DrsClient, DrsObject, AccessMethod, Checksum


class TerraDrsClient(DrsClient):
"""
Calls the terra DRS server.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.endpoint = "https://us-central1-broad-dsde-prod.cloudfunctions.net/martha_v3"
self.token = self._get_auth_token()

@staticmethod
def _get_auth_token() -> str:
"""Get Google Cloud authentication token.
User must run 'gcloud auth login' from the shell before starting this script.

Returns:
str: auth token
"""
token_command = "gcloud auth print-access-token"
cmd = token_command.split(' ')
token = subprocess.check_output(cmd).decode("ascii")[0:-1]
assert token, "No token retrieved."
return token

async def download_part(self, drs_object: DrsObject, start: int, size: int, destination_path: Path) -> Path:

headers = {'Range': f'bytes={start}-{size}'}
(fd, name,) = tempfile.mkstemp(prefix=f'{drs_object.name}.{start}.{size}.', suffix='.part',
dir=str(destination_path))
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(drs_object.access_methods[0].access_url) as request:
file = await aiofiles.open(name, 'wb')
self.statistics.set_max_files_open()
await file.write(await request.content.read())
return Path(name)

async def sign_url(self, drs_object: DrsObject) -> DrsObject:
"""No-op. terra returns a signed url in `get_object` """
return drs_object

async def get_object(self, object_id: str) -> DrsObject:
"""Sends a POST request for the signed URL, hash, and file size of a given DRS object.

Args:
object_id (str): DRS URI

Raises:
Exception: The request was rejected by the server

Returns:
DownloadURL: The downloadable bundle ready for async download
"""
data = {
"url": object_id,
"fields": ["fileName", "size", "hashes", "accessUrl"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drs_downloader code appears to get the "accessUrl"(/signed URL) again before actually downloading the data. If that is the case, the "accessUrl" should not be requested here in get_object.
This is because the other fields ("fileName", "size", "hashes") can be obtained from the first request to the DRS server, yet getting the "accessUrl" requires a second request to the DRS server and performing the relatively expensive operation of signing the URL.
If the "accessUrl" obtained here is never actually used, then you should just request the other fields here and not "accessUrl".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Will address with a test in this PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
session = aiohttp.ClientSession(headers={
'authorization': 'Bearer ' + self.token,
'content-type': 'application/json'
})

async with session.post(url=self.endpoint, json=data) as response:
try:
self.statistics.set_max_files_open()
response.raise_for_status()
resp = await response.json(content_type=None)
assert 'accessUrl' in resp, resp
if resp['accessUrl'] is None:
account_command = 'gcloud config get-value account'
cmd = account_command.split(' ')
account = subprocess.check_output(cmd).decode("ascii")
raise Exception(
f"A valid URL was not returned from the server. Please check the access for {account}\n{resp}")
url_ = resp['accessUrl']['url']
md5_ = resp['hashes']['md5']
size_ = resp['size']
name_ = resp['fileName']
return DrsObject(
self_uri=object_id,
size=size_,
checksums=[Checksum(checksum=md5_, type='md5')],
id=object_id,
name=name_,
access_methods=[AccessMethod(access_url=url_, type='gs')]
)
finally:
await session.close()
Loading