diff --git a/lib/galaxy/config/sample/datatypes_conf.xml.sample b/lib/galaxy/config/sample/datatypes_conf.xml.sample
index 86ad796485c9..dcd56e7d2b29 100644
--- a/lib/galaxy/config/sample/datatypes_conf.xml.sample
+++ b/lib/galaxy/config/sample/datatypes_conf.xml.sample
@@ -360,6 +360,7 @@
+
@@ -1064,6 +1065,7 @@
+
diff --git a/lib/galaxy/datatypes/binary.py b/lib/galaxy/datatypes/binary.py
index cf1803dbcafe..de92e2a165dc 100644
--- a/lib/galaxy/datatypes/binary.py
+++ b/lib/galaxy/datatypes/binary.py
@@ -89,6 +89,7 @@
from galaxy.util.checkers import (
is_bz2,
is_gzip,
+ is_xz,
)
from . import (
data,
@@ -3685,6 +3686,9 @@ class Fast5ArchiveGz(Fast5Archive):
>>> fname = get_test_fname('test.fast5.tar.gz')
>>> Fast5ArchiveGz().sniff(fname)
True
+ >>> fname = get_test_fname('test.fast5.tar.xz')
+ >>> Fast5ArchiveGz().sniff(fname)
+ False
>>> fname = get_test_fname('test.fast5.tar.bz2')
>>> Fast5ArchiveGz().sniff(fname)
False
@@ -3701,6 +3705,33 @@ def sniff(self, filename: str) -> bool:
return Fast5Archive.sniff(self, filename)
+class Fast5ArchiveXz(Fast5Archive):
+ """
+ Class describing a xz-compressed FAST5 archive
+
+ >>> from galaxy.datatypes.sniff import get_test_fname
+ >>> fname = get_test_fname('test.fast5.tar.gz')
+ >>> Fast5ArchiveXz().sniff(fname)
+ False
+ >>> fname = get_test_fname('test.fast5.tar.xz')
+ >>> Fast5ArchiveXz().sniff(fname)
+ True
+ >>> fname = get_test_fname('test.fast5.tar.bz2')
+ >>> Fast5ArchiveXz().sniff(fname)
+ False
+ >>> fname = get_test_fname('test.fast5.tar')
+ >>> Fast5ArchiveXz().sniff(fname)
+ False
+ """
+
+ file_ext = "fast5.tar.xz"
+
+ def sniff(self, filename: str) -> bool:
+ if not is_xz(filename):
+ return False
+ return Fast5Archive.sniff(self, filename)
+
+
class Fast5ArchiveBz2(Fast5Archive):
"""
Class describing a bzip2-compressed FAST5 archive
@@ -3709,6 +3740,9 @@ class Fast5ArchiveBz2(Fast5Archive):
>>> fname = get_test_fname('test.fast5.tar.bz2')
>>> Fast5ArchiveBz2().sniff(fname)
True
+ >>> fname = get_test_fname('test.fast5.tar.xz')
+ >>> Fast5ArchiveBz2().sniff(fname)
+ False
>>> fname = get_test_fname('test.fast5.tar.gz')
>>> Fast5ArchiveBz2().sniff(fname)
False
diff --git a/lib/galaxy/datatypes/test/test.fast5.tar.xz b/lib/galaxy/datatypes/test/test.fast5.tar.xz
new file mode 100644
index 000000000000..418d5a4d94c4
Binary files /dev/null and b/lib/galaxy/datatypes/test/test.fast5.tar.xz differ
diff --git a/lib/galaxy/util/__init__.py b/lib/galaxy/util/__init__.py
index 6949dbafbb71..0a234fc925cb 100644
--- a/lib/galaxy/util/__init__.py
+++ b/lib/galaxy/util/__init__.py
@@ -156,6 +156,7 @@ def shlex_join(split_command):
gzip_magic = b"\x1f\x8b"
bz2_magic = b"BZh"
+xz_magic = b"\xfd7zXZ\x00"
DEFAULT_ENCODING = os.environ.get("GALAXY_DEFAULT_ENCODING", "utf-8")
NULL_CHAR = b"\x00"
BINARY_CHARS = [NULL_CHAR]
diff --git a/lib/galaxy/util/checkers.py b/lib/galaxy/util/checkers.py
index de2e149aa6e4..a7bd132925c4 100644
--- a/lib/galaxy/util/checkers.py
+++ b/lib/galaxy/util/checkers.py
@@ -1,5 +1,6 @@
import bz2
import gzip
+import lzma
import os
import re
import tarfile
@@ -117,6 +118,26 @@ def check_gzip(file_path: str, check_content: bool = True) -> Tuple[bool, bool]:
return (True, True)
+def check_xz(file_path: str, check_content: bool = True) -> Tuple[bool, bool]:
+ try:
+ with open(file_path, "rb") as temp:
+ magic_check = temp.read(6)
+ if magic_check != util.xz_magic:
+ return (False, False)
+ except Exception:
+ return (False, False)
+
+ if not check_content:
+ return (True, True)
+
+ with lzma.LZMAFile(file_path, mode="rb") as xzipped_file:
+ chunk = xzipped_file.read(CHUNK_SIZE)
+ # See if we have a compressed HTML file
+ if check_html(chunk, file_path=False):
+ return (True, False)
+ return (True, True)
+
+
def check_bz2(file_path: str, check_content: bool = True) -> Tuple[bool, bool]:
try:
with open(file_path, "rb") as temp:
@@ -165,6 +186,11 @@ def is_gzip(file_path: str) -> bool:
return is_gzipped
+def is_xz(file_path: str) -> bool:
+ is_xzipped, is_valid = check_xz(file_path, check_content=False)
+ return is_xzipped
+
+
def is_zip(file_path: str) -> bool:
is_zipped, is_valid = check_zip(file_path, check_content=False)
return is_zipped
@@ -197,6 +223,7 @@ def check_image(file_path: str):
COMPRESSION_CHECK_FUNCTIONS: Dict[str, CompressionChecker] = {
"gzip": check_gzip,
"bz2": check_bz2,
+ "xz": check_xz,
"zip": check_zip,
}
@@ -211,5 +238,6 @@ def check_image(file_path: str):
"COMPRESSION_CHECK_FUNCTIONS",
"is_gzip",
"is_bz2",
+ "is_xz",
"is_zip",
)
diff --git a/lib/galaxy/util/compression_utils.py b/lib/galaxy/util/compression_utils.py
index addfe96dd6f9..d54c0ffcbc24 100644
--- a/lib/galaxy/util/compression_utils.py
+++ b/lib/galaxy/util/compression_utils.py
@@ -2,6 +2,7 @@
import gzip
import io
import logging
+import lzma
import os
import shutil
import tarfile
@@ -29,6 +30,7 @@
from .checkers import (
is_bz2,
is_gzip,
+ is_xz,
)
try:
@@ -40,7 +42,7 @@
log = logging.getLogger(__name__)
FileObjTypeStr = Union[IO[str], io.TextIOWrapper]
-FileObjTypeBytes = Union[gzip.GzipFile, bz2.BZ2File, IO[bytes]]
+FileObjTypeBytes = Union[gzip.GzipFile, bz2.BZ2File, lzma.LZMAFile, IO[bytes]]
FileObjType = Union[FileObjTypeStr, FileObjTypeBytes]
@@ -72,7 +74,7 @@ def get_fileobj(filename: str, mode: str = "r", compressed_formats: Optional[Lis
:param filename: path to file that should be opened
:param mode: mode to pass to opener
:param compressed_formats: list of allowed compressed file formats among
- 'bz2', 'gzip' and 'zip'. If left to None, all 3 formats are allowed
+ 'bz2', 'gzip', 'xz' and 'zip'. If left to None, all 3 formats are allowed
"""
return get_fileobj_raw(filename, mode, compressed_formats)[1]
@@ -103,7 +105,7 @@ def get_fileobj_raw(
filename: str, mode: str = "r", compressed_formats: Optional[List[str]] = None
) -> Tuple[Optional[str], FileObjType]:
if compressed_formats is None:
- compressed_formats = ["bz2", "gzip", "zip"]
+ compressed_formats = ["bz2", "gzip", "xz", "zip"]
# Remove 't' from mode, which may cause an error for compressed files
mode = mode.replace("t", "")
# 'U' mode is deprecated, we open in 'r'.
@@ -111,12 +113,16 @@ def get_fileobj_raw(
mode = "r"
compressed_format = None
if "gzip" in compressed_formats and is_gzip(filename):
- fh: Union[gzip.GzipFile, bz2.BZ2File, IO[bytes]] = gzip.GzipFile(filename, mode)
+ fh: Union[gzip.GzipFile, bz2.BZ2File, lzma.LZMAFile, IO[bytes]] = gzip.GzipFile(filename, mode)
compressed_format = "gzip"
elif "bz2" in compressed_formats and is_bz2(filename):
mode = cast(Literal["a", "ab", "r", "rb", "w", "wb", "x", "xb"], mode)
fh = bz2.BZ2File(filename, mode)
compressed_format = "bz2"
+ elif "xz" in compressed_formats and is_xz(filename):
+ mode = cast(Literal["a", "ab", "r", "rb", "w", "wb", "x", "xb"], mode)
+ fh = lzma.LZMAFile(filename, mode)
+ compressed_format = "xz"
elif "zip" in compressed_formats and zipfile.is_zipfile(filename):
# Return fileobj for the first file in a zip file.
# 'b' is not allowed in the ZipFile mode argument
diff --git a/test/integration/test_datatype_upload.py b/test/integration/test_datatype_upload.py
index af5d264e518d..a27ef610877b 100644
--- a/test/integration/test_datatype_upload.py
+++ b/test/integration/test_datatype_upload.py
@@ -10,6 +10,7 @@
from galaxy.util.checkers import (
is_bz2,
is_gzip,
+ is_xz,
is_zip,
)
from galaxy.util.hash_util import md5_hash_file
@@ -77,7 +78,7 @@ def upload_datatype_helper(
delete_cache_dir: bool = False,
) -> None:
is_compressed = False
- for is_method in (is_bz2, is_gzip, is_zip):
+ for is_method in (is_bz2, is_gzip, is_xz, is_zip):
is_compressed = is_method(test_data.path)
if is_compressed:
break