-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add FTP support to jobstores #5142
Changes from 3 commits
195fdc8
176381c
af46642
1ea4e3d
65e5564
5ba00b2
35fbe08
754b43c
5ed71cf
17536ba
13f4ac5
f234825
0f38d58
bc5da85
665e3bd
a7455a0
7d31e54
d2da46c
a2105af
66d3587
9f86277
91f1cbd
5884417
f490ed3
bbd321a
59e3fea
8fe81cc
f499caf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,9 @@ | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
import ftplib | ||
import logging | ||
import netrc | ||
import os | ||
import pickle | ||
import re | ||
|
@@ -35,7 +37,7 @@ | |
) | ||
from urllib.error import HTTPError | ||
from urllib.parse import ParseResult, urlparse | ||
from urllib.request import urlopen | ||
from urllib.request import urlopen, Request | ||
from uuid import uuid4 | ||
|
||
from toil.common import Config, getNodeID, safeUnpickleFromStream | ||
|
@@ -47,6 +49,7 @@ | |
ServiceJobDescription, | ||
) | ||
from toil.lib.compatibility import deprecated | ||
from toil.lib.conversions import strtobool | ||
from toil.lib.io import WriteWatchingStream | ||
from toil.lib.memoize import memoize | ||
from toil.lib.retry import ErrorCondition, retry | ||
|
@@ -1870,15 +1873,28 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): | |
stores. | ||
""" | ||
|
||
ftp = None | ||
|
||
@classmethod | ||
def _setup_ftp(cls) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's weird that the |
||
if cls.ftp is None: | ||
cls.ftp = FtpFsAccess(insecure=strtobool(os.environ.get('TOIL_FTP_INSECURE', 'False')) is True) | ||
|
||
@classmethod | ||
def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: | ||
return url.scheme.lower() in ("http", "https", "ftp") and not export | ||
|
||
@classmethod | ||
def _url_exists(cls, url: ParseResult) -> bool: | ||
# Deal with FTP first to support user/password auth | ||
if url.scheme.lower() == "ftp": | ||
cls._setup_ftp() | ||
# mypy is unable to understand that ftp must exist by this point | ||
assert cls.ftp is not None | ||
return cls.ftp.exists(url.geturl()) | ||
|
||
try: | ||
# TODO: Figure out how to HEAD instead of this. | ||
with cls._open_url(url): | ||
with closing(urlopen(Request(url.geturl(), method="HEAD"))): | ||
return True | ||
except FileNotFoundError: | ||
return False | ||
|
@@ -1894,11 +1910,15 @@ def _url_exists(cls, url: ParseResult) -> bool: | |
) | ||
def _get_size(cls, url: ParseResult) -> Optional[int]: | ||
if url.scheme.lower() == "ftp": | ||
return None | ||
with closing(urlopen(url.geturl())) as readable: | ||
# just read the header for content length | ||
size = readable.info().get("content-length") | ||
return int(size) if size is not None else None | ||
cls._setup_ftp() | ||
# mypy is unable to understand that ftp must exist by this point | ||
assert cls.ftp is not None | ||
return cls.ftp.size(url.geturl()) | ||
|
||
# just read the header for content length | ||
resp = urlopen(Request(url.geturl(), method="HEAD")) | ||
size = resp.info().get("content-length") | ||
return int(size) if size is not None else None | ||
|
||
@classmethod | ||
def _read_from_url( | ||
|
@@ -1931,6 +1951,14 @@ def count(l: int) -> None: | |
] | ||
) | ||
def _open_url(cls, url: ParseResult) -> IO[bytes]: | ||
# Deal with FTP first so we support user/password auth | ||
if url.scheme.lower() == "ftp": | ||
cls._setup_ftp() | ||
# mypy is unable to understand that ftp must exist by this point | ||
assert cls.ftp is not None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you structured this as a method that returns an FtpFsAccess and not one that has a side effect of setting a class-level field, you wouldn't need all these asserts and you also wouldn't need to remember two member names. |
||
# we open in read mode as write mode is not supported | ||
return cls.ftp.open(url.geturl(), mode="r") | ||
|
||
try: | ||
return cast(IO[bytes], closing(urlopen(url.geturl()))) | ||
except HTTPError as e: | ||
|
@@ -1958,3 +1986,142 @@ def _get_is_directory(cls, url: ParseResult) -> bool: | |
def _list_url(cls, url: ParseResult) -> list[str]: | ||
# TODO: Implement HTTP index parsing and FTP directory listing | ||
raise NotImplementedError("HTTP and FTP URLs cannot yet be listed") | ||
|
||
|
||
class FtpFsAccess: | ||
""" | ||
FTP access with upload. | ||
|
||
Taken and modified from https://github.com/ohsu-comp-bio/cwl-tes/blob/03f0096f9fae8acd527687d3460a726e09190c3a/cwl_tes/ftp.py#L8 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The license at cwl-tes is also Apache 2.0. Should I add a section to the Toil license that specifically states all code under this class is under cwl-tes's Apache 2.0 license? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need to repeat the license. If there are code quotes, then you could mention the copyright of the original authors / copyright holders:
Otherwise if this implementation is different enough, you don't need to mention the authors. I suggest this metric: if we wanted to change the license of Toil, would you have to get permission from these copyright holders because the code is essentially the same? Or is the code so adapted or different for other reasons that it is a new work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One approach is just add their Apache 2.0 license's copyright line to Toil's LICENSE file along with the others. You could also drop it in a different file and import it to here, and in that file have Also the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's easier to do the latter by putting the code in a separate file, so I'll do that |
||
""" | ||
|
||
def __init__( | ||
self, cache: Optional[dict[Any, ftplib.FTP]] = None, insecure: bool = False | ||
): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have docstrings for this and all the other functions in the class, ideally. It's especially important with a parameter called |
||
self.cache = cache or {} | ||
self.netrc = None | ||
self.insecure = insecure | ||
try: | ||
if "HOME" in os.environ: | ||
if os.path.exists(os.path.join(os.environ["HOME"], ".netrc")): | ||
self.netrc = netrc.netrc(os.path.join(os.environ["HOME"], ".netrc")) | ||
elif os.path.exists(os.path.join(os.curdir, ".netrc")): | ||
self.netrc = netrc.netrc(os.path.join(os.curdir, ".netrc")) | ||
except netrc.NetrcParseError as err: | ||
logger.debug(err) | ||
|
||
def exists(self, fn: str) -> bool: | ||
return self.isfile(fn) or self.isdir(fn) | ||
|
||
def isfile(self, fn: str) -> bool: | ||
ftp = self._connect(fn) | ||
if ftp: | ||
try: | ||
if not self.size(fn) is None: | ||
return True | ||
else: | ||
return False | ||
except ftplib.all_errors: | ||
return False | ||
return False | ||
|
||
def isdir(self, fn: str) -> bool: | ||
ftp = self._connect(fn) | ||
if ftp: | ||
try: | ||
cwd = ftp.pwd() | ||
ftp.cwd(urlparse(fn).path) | ||
ftp.cwd(cwd) | ||
return True | ||
except ftplib.all_errors: | ||
return False | ||
return False | ||
|
||
def open(self, fn: str, mode: str) -> IO[bytes]: | ||
if 'r' in mode: | ||
host, user, passwd, path = self._parse_url(fn) | ||
handle = urlopen( | ||
"ftp://{}:{}@{}/{}".format(user, passwd, host, path)) | ||
return cast(IO[bytes], closing(handle)) | ||
# TODO: support write mode | ||
raise Exception('Write mode FTP not implemented') | ||
|
||
def _parse_url( | ||
self, url: str | ||
) -> tuple[Optional[str], Optional[str], Optional[str], str]: | ||
parse = urlparse(url) | ||
user = parse.username | ||
passwd = parse.password | ||
host = parse.hostname | ||
path = parse.path | ||
if parse.scheme == "ftp": | ||
if not user and self.netrc: | ||
if host is not None: | ||
creds = self.netrc.authenticators(host) | ||
if creds: | ||
user, _, passwd = creds | ||
if not user: | ||
if host is not None: | ||
user, passwd = self._recall_credentials(host) | ||
if passwd is None: | ||
passwd = "anonymous@" | ||
if user is None: | ||
user = "anonymous" | ||
|
||
return host, user, passwd, path | ||
|
||
def _connect(self, url: str) -> Optional[ftplib.FTP]: | ||
parse = urlparse(url) | ||
if parse.scheme == "ftp": | ||
host, user, passwd, _ = self._parse_url(url) | ||
if host is None: | ||
# there has to be a host | ||
return None | ||
if (host, user, passwd) in self.cache: | ||
if self.cache[(host, user, passwd)].pwd(): | ||
return self.cache[(host, user, passwd)] | ||
ftp = ftplib.FTP_TLS() | ||
ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) | ||
ftp.connect(host) | ||
env_user = os.getenv("TOIL_FTP_USER") | ||
env_passwd = os.getenv("TOIL_FTP_PASSWORD") | ||
if env_user: | ||
user = env_user | ||
if env_passwd: | ||
passwd = env_passwd | ||
ftp.login(user or "", passwd or "", secure=not self.insecure) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this using an undocumented There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ftp in this case is an FTP_TLS object, which supports connecting over TLS. It's not on the official FTP_TLS python documentation, nor is there an associated docstring, but the code itself simply sets up an SSL connection to the FTP server |
||
self.cache[(host, user, passwd)] = ftp | ||
return ftp | ||
return None | ||
|
||
def _recall_credentials( | ||
self, desired_host: str | ||
) -> tuple[Optional[str], Optional[str]]: | ||
for host, user, passwd in self.cache: | ||
if desired_host == host: | ||
return user, passwd | ||
return None, None | ||
|
||
def size(self, fn: str) -> Optional[int]: | ||
ftp = self._connect(fn) | ||
if ftp: | ||
host, user, passwd, path = self._parse_url(fn) | ||
try: | ||
return ftp.size(path) | ||
except ftplib.all_errors as e: | ||
if str(e) == "550 SIZE not allowed in ASCII mode": | ||
# some servers don't allow grabbing size in ascii mode | ||
# https://stackoverflow.com/questions/22090001/get-folder-size-using-ftplib/22093848#22093848 | ||
ftp.voidcmd("TYPE I") | ||
return ftp.size(path) | ||
if host is None: | ||
# no host | ||
return None | ||
handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) | ||
info = handle.info() | ||
handle.close() | ||
if "Content-length" in info: | ||
return int(info["Content-length"]) | ||
return None | ||
|
||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep a class-level
FtpFsAccess
instance that is never destroyed, and theFtpFsAccess
instance caches open FTP connections to servers, when do we close our FTP connections? It might be never, and that's probably not what we want.