-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add FTP support to jobstores #5142
Changes from 11 commits
195fdc8
176381c
af46642
1ea4e3d
65e5564
5ba00b2
35fbe08
754b43c
5ed71cf
17536ba
13f4ac5
f234825
0f38d58
bc5da85
665e3bd
a7455a0
7d31e54
d2da46c
a2105af
66d3587
9f86277
91f1cbd
5884417
f490ed3
bbd321a
59e3fea
8fe81cc
f499caf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,7 @@ | |
) | ||
from urllib.error import HTTPError | ||
from urllib.parse import ParseResult, urlparse | ||
from urllib.request import urlopen | ||
from urllib.request import urlopen, Request | ||
from uuid import uuid4 | ||
|
||
from toil.common import Config, getNodeID, safeUnpickleFromStream | ||
|
@@ -46,7 +46,9 @@ | |
JobException, | ||
ServiceJobDescription, | ||
) | ||
from toil.jobStores.ftp_utils import FtpFsAccess | ||
from toil.lib.compatibility import deprecated | ||
from toil.lib.conversions import strtobool | ||
from toil.lib.io import WriteWatchingStream | ||
from toil.lib.memoize import memoize | ||
from toil.lib.retry import ErrorCondition, retry | ||
|
@@ -1872,15 +1874,28 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): | |
stores. | ||
""" | ||
|
||
ftp = None | ||
|
||
@classmethod | ||
def _setup_ftp(cls) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's weird that the |
||
if cls.ftp is None: | ||
cls.ftp = FtpFsAccess(insecure=strtobool(os.environ.get('TOIL_FTP_USE_SSL', 'True')) is False) | ||
|
||
@classmethod | ||
def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: | ||
return url.scheme.lower() in ("http", "https", "ftp") and not export | ||
|
||
@classmethod | ||
def _url_exists(cls, url: ParseResult) -> bool: | ||
# Deal with FTP first to support user/password auth | ||
if url.scheme.lower() == "ftp": | ||
cls._setup_ftp() | ||
# mypy is unable to understand that ftp must exist by this point | ||
assert cls.ftp is not None | ||
return cls.ftp.exists(url.geturl()) | ||
|
||
try: | ||
# TODO: Figure out how to HEAD instead of this. | ||
with cls._open_url(url): | ||
with closing(urlopen(Request(url.geturl(), method="HEAD"))): | ||
return True | ||
except FileNotFoundError: | ||
return False | ||
|
@@ -1896,11 +1911,15 @@ def _url_exists(cls, url: ParseResult) -> bool: | |
) | ||
def _get_size(cls, url: ParseResult) -> Optional[int]: | ||
if url.scheme.lower() == "ftp": | ||
return None | ||
with closing(urlopen(url.geturl())) as readable: | ||
# just read the header for content length | ||
size = readable.info().get("content-length") | ||
return int(size) if size is not None else None | ||
cls._setup_ftp() | ||
# mypy is unable to understand that ftp must exist by this point | ||
assert cls.ftp is not None | ||
return cls.ftp.size(url.geturl()) | ||
|
||
# just read the header for content length | ||
resp = urlopen(Request(url.geturl(), method="HEAD")) | ||
size = resp.info().get("content-length") | ||
return int(size) if size is not None else None | ||
|
||
@classmethod | ||
def _read_from_url( | ||
|
@@ -1933,6 +1952,14 @@ def count(l: int) -> None: | |
] | ||
) | ||
def _open_url(cls, url: ParseResult) -> IO[bytes]: | ||
# Deal with FTP first so we support user/password auth | ||
if url.scheme.lower() == "ftp": | ||
cls._setup_ftp() | ||
# mypy is unable to understand that ftp must exist by this point | ||
assert cls.ftp is not None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you structured this as a method that returns an FtpFsAccess and not one that has a side effect of setting a class-level field, you wouldn't need all these asserts and you also wouldn't need to remember two member names. |
||
# we open in read mode as write mode is not supported | ||
return cls.ftp.open(url.geturl(), mode="r") | ||
|
||
try: | ||
return cast(IO[bytes], closing(urlopen(url.geturl()))) | ||
except HTTPError as e: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be better placed in |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
# Copyright 2017 Oregon Health and Science University | ||
# | ||
# Copyright (C) 2015-2021 Regents of the University of California | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
import ftplib | ||
import logging | ||
import netrc | ||
import os | ||
from contextlib import closing | ||
from typing import Optional, Any, cast, IO | ||
from urllib.parse import urlparse | ||
from urllib.request import urlopen | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class FtpFsAccess: | ||
""" | ||
FTP access with upload. | ||
|
||
Taken and modified from https://github.com/ohsu-comp-bio/cwl-tes/blob/03f0096f9fae8acd527687d3460a726e09190c3a/cwl_tes/ftp.py#L37-L251 | ||
""" | ||
|
||
def __init__( | ||
self, cache: Optional[dict[Any, ftplib.FTP]] = None, insecure: bool = False | ||
): | ||
""" | ||
FTP object to handle FTP connections. By default, connect over FTP with TLS. | ||
|
||
:param cache: cache of generated FTP objects | ||
:param insecure: Whether to connect over FTP with TLS | ||
""" | ||
self.cache = cache or {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want to be able to turn off using a connection cache at all, if we're going to make an FtpFsAccess that is never destroyed. |
||
self.netrc = None | ||
self.insecure = insecure | ||
try: | ||
if "HOME" in os.environ: | ||
if os.path.exists(os.path.join(os.environ["HOME"], ".netrc")): | ||
self.netrc = netrc.netrc(os.path.join(os.environ["HOME"], ".netrc")) | ||
elif os.path.exists(os.path.join(os.curdir, ".netrc")): | ||
self.netrc = netrc.netrc(os.path.join(os.curdir, ".netrc")) | ||
except netrc.NetrcParseError as err: | ||
logger.debug(err) | ||
|
||
def exists(self, fn: str) -> bool: | ||
""" | ||
Check if a file/directory exists over an FTP server | ||
:param fn: FTP url | ||
:return: True or false depending on whether the object exists on the server | ||
""" | ||
return self.isfile(fn) or self.isdir(fn) | ||
|
||
def isfile(self, fn: str) -> bool: | ||
""" | ||
Check if the FTP url points to a file | ||
:param fn: FTP url | ||
:return: True if url is file, else false | ||
""" | ||
ftp = self._connect(fn) | ||
if ftp: | ||
try: | ||
if not self.size(fn) is None: | ||
return True | ||
else: | ||
return False | ||
except ftplib.all_errors: | ||
return False | ||
return False | ||
|
||
def isdir(self, fn: str) -> bool: | ||
""" | ||
Check if the FTP url points to a directory | ||
:param fn: FTP url | ||
:return: True if url is directory, else false | ||
""" | ||
ftp = self._connect(fn) | ||
if ftp: | ||
try: | ||
cwd = ftp.pwd() | ||
ftp.cwd(urlparse(fn).path) | ||
ftp.cwd(cwd) | ||
return True | ||
except ftplib.all_errors: | ||
return False | ||
return False | ||
|
||
def open(self, fn: str, mode: str) -> IO[bytes]: | ||
""" | ||
Open an FTP url. | ||
|
||
Only supports reading, no write support. | ||
:param fn: FTP url | ||
:param mode: Mode to open FTP url in | ||
:return: | ||
""" | ||
if "r" in mode: | ||
host, user, passwd, path = self._parse_url(fn) | ||
handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) | ||
return cast(IO[bytes], closing(handle)) | ||
# TODO: support write mode | ||
raise Exception("Write mode FTP not implemented") | ||
|
||
def _parse_url( | ||
self, url: str | ||
) -> tuple[Optional[str], Optional[str], Optional[str], str]: | ||
""" | ||
Parse an FTP url into hostname, username, password, and path | ||
:param url: | ||
:return: hostname, username, password, path | ||
""" | ||
parse = urlparse(url) | ||
user = parse.username | ||
passwd = parse.password | ||
host = parse.hostname | ||
path = parse.path | ||
if parse.scheme == "ftp": | ||
if not user and self.netrc: | ||
if host is not None: | ||
creds = self.netrc.authenticators(host) | ||
if creds: | ||
user, _, passwd = creds | ||
if not user: | ||
if host is not None: | ||
user, passwd = self._recall_credentials(host) | ||
if passwd is None: | ||
passwd = "anonymous@" | ||
if user is None: | ||
user = "anonymous" | ||
|
||
return host, user, passwd, path | ||
|
||
def _connect(self, url: str) -> Optional[ftplib.FTP]: | ||
""" | ||
Connect to an FTP server. Handles authentication. | ||
:param url: FTP url | ||
:return: FTP object | ||
""" | ||
parse = urlparse(url) | ||
if parse.scheme == "ftp": | ||
host, user, passwd, _ = self._parse_url(url) | ||
if host is None: | ||
# there has to be a host | ||
return None | ||
if (host, user, passwd) in self.cache: | ||
if self.cache[(host, user, passwd)].pwd(): | ||
return self.cache[(host, user, passwd)] | ||
ftp = ftplib.FTP_TLS() | ||
ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) | ||
ftp.connect(host) | ||
env_user = os.getenv("TOIL_FTP_USER") | ||
env_passwd = os.getenv("TOIL_FTP_PASSWORD") | ||
if env_user: | ||
user = env_user | ||
if env_passwd: | ||
passwd = env_passwd | ||
ftp.login(user or "", passwd or "", secure=not self.insecure) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, so according to the obviously reliable https://tedboy.github.io/python_stdlib/_modules/ftplib.html#FTP_TLS.login what So it looks like unless the user sets One server people might want to use is Another thing we don't do is call the So I think that by default we should allow unencrypted FTP connections without user intervention to accept them. That's the current behavior. If we want to change Toil to enforce secure FTP by default, then we also need to enforce security on the actual reads. And we probably would need a way to individually allow particular insecure servers? |
||
self.cache[(host, user, passwd)] = ftp | ||
return ftp | ||
return None | ||
|
||
def _recall_credentials( | ||
self, desired_host: str | ||
) -> tuple[Optional[str], Optional[str]]: | ||
""" | ||
Grab the cached credentials | ||
:param desired_host: FTP hostname | ||
:return: username, password | ||
""" | ||
for host, user, passwd in self.cache: | ||
if desired_host == host: | ||
return user, passwd | ||
return None, None | ||
|
||
def size(self, fn: str) -> Optional[int]: | ||
""" | ||
Get the size of an FTP object | ||
:param fn: FTP url | ||
:return: Size of object | ||
""" | ||
ftp = self._connect(fn) | ||
if ftp: | ||
host, user, passwd, path = self._parse_url(fn) | ||
try: | ||
return ftp.size(path) | ||
except ftplib.all_errors as e: | ||
if str(e) == "550 SIZE not allowed in ASCII mode": | ||
# some servers don't allow grabbing size in ascii mode | ||
# https://stackoverflow.com/questions/22090001/get-folder-size-using-ftplib/22093848#22093848 | ||
ftp.voidcmd("TYPE I") | ||
return ftp.size(path) | ||
if host is None: | ||
# no host | ||
return None | ||
handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) | ||
info = handle.info() | ||
handle.close() | ||
if "Content-length" in info: | ||
return int(info["Content-length"]) | ||
return None | ||
|
||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep a class-level
FtpFsAccess
instance that is never destroyed, and theFtpFsAccess
instance caches open FTP connections to servers, when do we close our FTP connections? It might be never, and that's probably not what we want.