Skip to content

Commit

Permalink
[feature] Support remote and compressed index files
Browse files Browse the repository at this point in the history
 - [ ] Check that temporary files are deleted
  • Loading branch information
mxmlnkn committed Oct 19, 2024
1 parent ca139b1 commit 025fe25
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 28 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,10 @@ Some often-used configuration environment variables are copied here for easier v
This functionality of ratarmount offers a hopefully more-tested and out-of-the-box experience over the experimental [fsspec.fuse](https://filesystem-spec.readthedocs.io/en/latest/features.html#mount-anything-with-fuse) implementation.
And, it also works in conjunction with the other features of ratarmount such as union mounting and recursive mounting.

Index files specified with `--index-file` can also be compressed and/or be an fsspec ([chained](https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining)) URL, e.g., `https://host.org/file.tar.index.sqlite.gz`.
In such a case, the index file will be downloaded and/or extracted into the default temporary folder.
If the default temporary folder has insufficient disk space, it can be changed by setting the `RATARMOUNT_INDEX_TMPDIR` environment variable.


# Writable Mounting

Expand Down
189 changes: 172 additions & 17 deletions core/ratarmountcore/SQLiteIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
import json
import os
import re
import shutil
import sqlite3
import stat
import sys
import tarfile
import tempfile
import time
import traceback
import urllib.parse

from typing import Any, AnyStr, Callable, Dict, IO, List, Optional, Tuple, Union
from dataclasses import dataclass

try:
import fsspec
except ImportError:
fsspec = None # type: ignore

try:
import indexed_gzip
except ImportError:
Expand All @@ -27,9 +34,22 @@

from .version import __version__
from .MountSource import FileInfo, createRootFileInfo
from .compressions import TAR_COMPRESSION_FORMATS
from .compressions import (
CompressionInfo,
LIBARCHIVE_FILTER_FORMATS,
TAR_COMPRESSION_FORMATS,
detectCompression,
findAvailableOpen,
)
from .SQLiteBlobFile import SQLiteBlobsFile, WriteSQLiteBlobs
from .utils import RatarmountError, IndexNotOpenError, InvalidIndexError, findModuleVersion, MismatchingIndexError
from .utils import (
CompressionError,
IndexNotOpenError,
InvalidIndexError,
RatarmountError,
MismatchingIndexError,
findModuleVersion,
)


def getSqliteTables(connection: sqlite3.Connection):
Expand Down Expand Up @@ -210,20 +230,31 @@ def __init__(
if not backendName:
raise ValueError("A non-empty backend name must be specified!")

self._compressionsToTest = TAR_COMPRESSION_FORMATS.copy()
self._compressionsToTest.update(LIBARCHIVE_FILTER_FORMATS)

self.printDebug = printDebug
self.sqlConnection: Optional[sqlite3.Connection] = None
# Will hold the actually opened valid path to an index file
self.indexFilePath: Optional[str] = None
# This is true if the index file found was compressed or an URL and had to be downloaded
# and/or extracted into a temporary folder.
self.indexFilePathDeleteOnClose: bool = False
self.encoding = encoding
self.possibleIndexFilePaths = SQLiteIndex.getPossibleIndexFilePaths(
indexFilePath, indexFolders, archiveFilePath, ignoreCurrentFolder
indexFilePath,
indexFolders,
archiveFilePath,
ignoreCurrentFolder,
compressionsToTest=self._compressionsToTest,
)
# stores which parent folders were last tried to add to database and therefore do exist
self.parentFolderCache: List[Tuple[str, str]] = []
self.preferMemory = preferMemory
self.indexMinimumFileCount = indexMinimumFileCount
self.backendName = backendName
self._insertedRowCount = 0
self._temporaryIndexFile: Optional[Any] = None

assert self.backendName

Expand All @@ -249,25 +280,52 @@ def getPossibleIndexFilePaths(
indexFolders: Optional[List[str]] = None,
archiveFilePath: Optional[str] = None,
ignoreCurrentFolder: bool = False,
compressionsToTest: Optional[Dict[str, CompressionInfo]] = None,
) -> List[str]:
if indexFilePath == ':memory:':
return []

possibleIndexFilePaths = []
if indexFilePath:
return [] if indexFilePath == ':memory:' else [os.path.abspath(os.path.expanduser(indexFilePath))]
# Prior versions did simply return indexFilePath as the only possible path if it was specified.
# This worked well enough because if the path did not exist, it would simply be created.
# However, for non-writable locations, or if parent folders are missing, or for remote URLs,
# this will fail badly and result in TAR files being opened with the fallback, libarchive, instead,
# which is unwanted behavior. It should fall back to another index storage location instead.
# Or even better, it should escalate the error to the user, but that seems too difficult with the
# current trial-and-error architecture for opening archives.
if '://' not in indexFilePath:
return [os.path.abspath(os.path.expanduser(indexFilePath))]
possibleIndexFilePaths.append(indexFilePath)

if not archiveFilePath:
return []
return possibleIndexFilePaths

# Look for default (compressed) indexes. The compressed ones should only be used as fallbacks
# because they are less performant and because we do not want to accidentally created index files
# with a compressed extension even though it is uncompressed. The latter reason is also why we
# check for file existence before adding it as a default, although I think it might not be necessary. */
defaultIndexFilePath = archiveFilePath + ".index.sqlite"
defaultIndexFilePaths = [defaultIndexFilePath]
if compressionsToTest:
for _, info in compressionsToTest.items():
for suffix in info.suffixes:
path = defaultIndexFilePath + '.' + suffix
if os.path.isfile(path) and os.stat(path).st_size > 0:
defaultIndexFilePaths.append(path)

if not indexFolders:
return [defaultIndexFilePath, ':memory:']
possibleIndexFilePaths.extend(defaultIndexFilePaths)
possibleIndexFilePaths.append(':memory:')
return possibleIndexFilePaths

possibleIndexFilePaths = []
indexPathAsName = defaultIndexFilePath.replace("/", "_")
for folder in indexFolders:
if folder:
indexPath = os.path.join(folder, indexPathAsName)
possibleIndexFilePaths.append(os.path.abspath(os.path.expanduser(indexPath)))
elif not ignoreCurrentFolder:
possibleIndexFilePaths.append(defaultIndexFilePath)
possibleIndexFilePaths.extend(defaultIndexFilePaths)
return possibleIndexFilePaths

def clearIndexes(self):
Expand All @@ -279,7 +337,6 @@ def openExisting(self, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]
"""Tries to find an already existing index."""
for indexPath in self.possibleIndexFilePaths:
if self._tryLoadIndex(indexPath, checkMetadata=checkMetadata):
self.indexFilePath = indexPath
break

def openInMemory(self):
Expand Down Expand Up @@ -327,6 +384,8 @@ def close(self):
pass
self.sqlConnection = None

self._setIndexFilePath(None)

def getConnection(self) -> sqlite3.Connection:
if self.sqlConnection:
return self.sqlConnection
Expand Down Expand Up @@ -459,6 +518,11 @@ def getIndexVersion(self):

@staticmethod
def _pathIsWritable(path: str, printDebug: int = 0) -> bool:
# Writing indexes to remote filesystems currently not supported and we need to take care that URLs
# are not interpreted as local file paths, i.e., creating an ftp: folder with a user:password@host subfolder.
if '://' in path:
return False

try:
folder = os.path.dirname(path)
if folder:
Expand Down Expand Up @@ -952,7 +1016,31 @@ def indexIsLoaded(self) -> bool:

return True

def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]) -> None:
def _setIndexFilePath(self, indexFilePath: Optional[str], deleteOnClose: bool = False):
# This is called from __del__, so we need to account for this being called when something
# in the constructor raises an exception and not all members of self exist.
if (
getattr(self, 'indexFilePath', None)
and getattr(self, 'indexFilePathDeleteOnClose', False)
and self.indexFilePath
and os.path.isfile(self.indexFilePath)
):
try:
os.remove(self.indexFilePath)
except Exception as exception:
if self.printDebug >= 1:
print(
"[Warning] Failed to remove temporarily downloaded and/or extracted index file at:",
self.indexFilePath,
"because of:",
exception,
)

if hasattr(self, 'indexFilePath') and hasattr(self, 'indexFilePathDeleteOnClose'):
self.indexFilePath = indexFilePath
self.indexFilePathDeleteOnClose = deleteOnClose

def _loadIndex(self, indexFilePath: str, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]) -> None:
"""
Loads the given index SQLite database and checks it for validity raising an exception if it is invalid.
Expand All @@ -964,7 +1052,71 @@ def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Di
if self.indexIsLoaded():
return

self.sqlConnection = self._openSqlDb(indexFilePath)
# Download and/or extract the file to a temporary file if necessary.

# Strip a single file:// prefix, not any more because URL chaining is supported by fsspec,
# to avoid useless copies to the temporary directory.
if indexFilePath.count('://') == 1:
fileURLPrefix = 'file://'
if indexFilePath.startswith(fileURLPrefix):
indexFilePath = indexFilePath[len(fileURLPrefix) :]

temporaryFolder = os.environ.get("RATARMOUNT_INDEX_TMPDIR", None)

def _undoCompression(file):
compression = detectCompression(
file, printDebug=self.printDebug, compressionsToTest=self._compressionsToTest
)
if not compression or compression not in self._compressionsToTest:
return None

if self.printDebug >= 2:
print(f"[Info] Detected {compression}-compressed index.")

formatOpen = findAvailableOpen(compression)
if not formatOpen:
moduleNames = [module.name for module in TAR_COMPRESSION_FORMATS[compression].modules]
raise CompressionError(
f"Cannot open a {compression} compressed index file {indexFilePath} "
f"without any of these modules: {moduleNames}"
)

return formatOpen(file)

def _copyToTemp(file):
self._temporaryIndexFile = tempfile.NamedTemporaryFile(suffix=".tmp.sqlite.index", dir=temporaryFolder)
# TODO add progress bar / output?
with open(self._temporaryIndexFile.name, 'wb') as targetFile:
shutil.copyfileobj(file, targetFile)

if '://' in indexFilePath:
if fsspec is None:
raise RatarmountError(
"Detected an URL for the index path but fsspec could not be imported!\n"
"Try installing it with 'pip install fsspec' or 'pip install ratarmount[full]'."
)

with fsspec.open(indexFilePath) as file:
decompressedFile = _undoCompression(file)
with decompressedFile if decompressedFile else file as fileToCopy:
_copyToTemp(fileToCopy)
else:
if not os.path.isfile(indexFilePath):
return None

with open(indexFilePath, 'rb') as file:
decompressedFile = _undoCompression(file)
if decompressedFile:
with decompressedFile:
_copyToTemp(decompressedFile)
else:
temporaryIndexFilePath = indexFilePath

temporaryIndexFilePath = self._temporaryIndexFile.name if self._temporaryIndexFile else indexFilePath

# Done downloading and/or extracting the SQLite index.

self.sqlConnection = self._openSqlDb(temporaryIndexFilePath)
tables = getSqliteTables(self.sqlConnection)
versions = None
try:
Expand Down Expand Up @@ -1036,19 +1188,21 @@ def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Di
pass

if self.printDebug >= 1:
print(f"Successfully loaded offset dictionary from {str(indexFilePath)}")
message = "Successfully loaded offset dictionary from " + str(indexFilePath)
if temporaryIndexFilePath != indexFilePath:
message += " temporarily downloaded/decompressed into: " + str(temporaryIndexFilePath)
print(message)

self._setIndexFilePath(temporaryIndexFilePath)

def _tryLoadIndex(
self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Dict[str, Any]], None]] = None
self, indexFilePath: str, checkMetadata: Optional[Callable[[Dict[str, Any]], None]] = None
) -> bool:
"""Calls loadIndex if index is not loaded already and provides extensive error handling."""

if self.indexIsLoaded():
return True

if not os.path.isfile(indexFilePath):
return False

try:
self._loadIndex(indexFilePath, checkMetadata=checkMetadata)
except MismatchingIndexError as e:
Expand All @@ -1075,7 +1229,8 @@ def _tryLoadIndex(
print(" e.g., by opening an issue on the public github page.")

try:
os.remove(indexFilePath)
if '://' not in indexFilePath:
os.remove(indexFilePath)
except OSError:
print("[Warning] Failed to remove corrupted old cached index file:", indexFilePath)

Expand Down
7 changes: 5 additions & 2 deletions core/ratarmountcore/compressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,10 @@ def getGzipInfo(fileobj: IO[bytes]) -> Optional[Tuple[str, int]]:


def detectCompression(
fileobj: IO[bytes], prioritizedBackends: Optional[List[str]], printDebug: int = 0
fileobj: IO[bytes],
prioritizedBackends: Optional[List[str]] = None,
printDebug: int = 0,
compressionsToTest: Dict[str, CompressionInfo] = TAR_COMPRESSION_FORMATS,
) -> Optional[str]:
# isinstance(fileobj, io.IOBase) does not work for everything, e.g., for paramiko.sftp_file.SFTPFile
# because it does not inherit from io.IOBase. Therefore, do duck-typing and test for required methods.
Expand All @@ -594,7 +597,7 @@ def detectCompression(
return None

oldOffset = fileobj.tell()
for compressionId, compression in TAR_COMPRESSION_FORMATS.items():
for compressionId, compression in compressionsToTest.items():
# The header check is a necessary condition not a sufficient condition.
# Especially for gzip, which only has 2 magic bytes, false positives might happen.
# Therefore, only use the magic bytes based check if the module could not be found
Expand Down
18 changes: 16 additions & 2 deletions ratarmount.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,15 @@ def pointsIntoMountPoint(pathToTest):
hasIndexPath = False

if 'indexFilePath' in options and isinstance(options['indexFilePath'], str):
indexFilePath = os.path.realpath(options['indexFilePath'])
indexFilePath = options['indexFilePath']
# Strip a single file://, not any more because URL chaining is supported by fsspec.
if options['indexFilePath'].count('://') == 1:
fileURLPrefix = 'file://'
if indexFilePath.startswith(fileURLPrefix):
indexFilePath = indexFilePath[len(fileURLPrefix) :]
if '://' not in indexFilePath:
indexFilePath = os.path.realpath(options['indexFilePath'])

if pointsIntoMountPoint(indexFilePath):
del options['indexFilePath']
else:
Expand Down Expand Up @@ -1265,7 +1273,13 @@ def _parseArgs(rawArgs: Optional[List[str]] = None):
indexGroup.add_argument(
'--index-file', type=str,
help='Specify a path to the .index.sqlite file. Setting this will disable fallback index folders. '
'If the given path is ":memory:", then the index will not be written out to disk.')
'If the given path is ":memory:", then the index will not be written out to disk. '
'If the specified path is a remote URL, such as "https://host.org/file.tar.index.sqlite", or '
'a compressed index, such as "file.tar.index.sqlite.gz", then the index file will be downloaded '
f'and/or extracted into the default temporary folder ({tempfile.gettempdir()}). This path can be '
'changed with the environment variable RATARMOUNT_INDEX_TMPDIR. The temporary folder in general '
'can also be changed with these environment variables in decreasing priority: TMPDIR, TEMP, TMP '
'as described in the Python tempfile standard library documentation.')

indexFolders = ['', os.path.join( "~", ".ratarmount")]
xdgCacheHome = getXdgCacheHome()
Expand Down
10 changes: 9 additions & 1 deletion tests/ratarmount-help.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 025fe25

Please sign in to comment.