Skip to content

Commit

Permalink
[wip][feature] Support remote and compressed index files
Browse files Browse the repository at this point in the history
 - [ ] Add tests for remote and/or compressed files
 - [ ] Check that temporary files are deleted
       Add tests for this?
  • Loading branch information
mxmlnkn committed Oct 17, 2024
1 parent 1f3373d commit c5a2a0f
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 12 deletions.
136 changes: 129 additions & 7 deletions core/ratarmountcore/SQLiteIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
import json
import os
import re
import shutil
import sqlite3
import stat
import sys
import tarfile
import tempfile
import time
import traceback
import urllib.parse

from typing import Any, AnyStr, Callable, Dict, IO, List, Optional, Tuple, Union
from dataclasses import dataclass

try:
import fsspec
except ImportError:
fsspec = None # type: ignore

try:
import indexed_gzip
except ImportError:
Expand All @@ -27,9 +34,22 @@

from .version import __version__
from .MountSource import FileInfo, createRootFileInfo
from .compressions import TAR_COMPRESSION_FORMATS
from .compressions import (
CompressionInfo,
LIBARCHIVE_FILTER_FORMATS,
TAR_COMPRESSION_FORMATS,
detectCompression,
findAvailableOpen,
)
from .SQLiteBlobFile import SQLiteBlobsFile, WriteSQLiteBlobs
from .utils import RatarmountError, IndexNotOpenError, InvalidIndexError, findModuleVersion, MismatchingIndexError
from .utils import (
CompressionError,
IndexNotOpenError,
InvalidIndexError,
RatarmountError,
MismatchingIndexError,
findModuleVersion,
)


def getSqliteTables(connection: sqlite3.Connection):
Expand Down Expand Up @@ -214,6 +234,9 @@ def __init__(
self.sqlConnection: Optional[sqlite3.Connection] = None
# Will hold the actually opened valid path to an index file
self.indexFilePath: Optional[str] = None
# This is true if the index file found was compressed or an URL and had to be downloaded
# and/or extracted into a temporary folder.
self.indexFilePathDeleteOnClose: bool = False
self.encoding = encoding
self.possibleIndexFilePaths = SQLiteIndex.getPossibleIndexFilePaths(
indexFilePath, indexFolders, archiveFilePath, ignoreCurrentFolder
Expand All @@ -224,6 +247,7 @@ def __init__(
self.indexMinimumFileCount = indexMinimumFileCount
self.backendName = backendName
self._insertedRowCount = 0
self._temporaryIndexFile: Optional[Any] = None

assert self.backendName

Expand Down Expand Up @@ -251,6 +275,8 @@ def getPossibleIndexFilePaths(
ignoreCurrentFolder: bool = False,
) -> List[str]:
if indexFilePath:
if '://' in indexFilePath:
return [indexFilePath]
return [] if indexFilePath == ':memory:' else [os.path.abspath(os.path.expanduser(indexFilePath))]

if not archiveFilePath:
Expand Down Expand Up @@ -279,7 +305,6 @@ def openExisting(self, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]
"""Tries to find an already existing index."""
for indexPath in self.possibleIndexFilePaths:
if self._tryLoadIndex(indexPath, checkMetadata=checkMetadata):
self.indexFilePath = indexPath
break

def openInMemory(self):
Expand Down Expand Up @@ -327,6 +352,8 @@ def close(self):
pass
self.sqlConnection = None

self._setIndexFilePath(None)

def getConnection(self) -> sqlite3.Connection:
if self.sqlConnection:
return self.sqlConnection
Expand Down Expand Up @@ -459,6 +486,11 @@ def getIndexVersion(self):

@staticmethod
def _pathIsWritable(path: str, printDebug: int = 0) -> bool:
# Writing indexes to remote filesystems currently not supported and we need to take care that URLs
# are not interpreted as local file paths, i.e., creating an ftp: folder with a user:password@host subfolder.
if '://' in path:
return False

try:
folder = os.path.dirname(path)
if folder:
Expand Down Expand Up @@ -952,7 +984,31 @@ def indexIsLoaded(self) -> bool:

return True

def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]) -> None:
def _setIndexFilePath(self, indexFilePath: Optional[str], deleteOnClose: bool = False):
# This is called from __del__, so we need to account for this being called when something
# in the constructor raises an exception and not all members of self exist.
if (
getattr(self, 'indexFilePath', None)
and getattr(self, 'indexFilePathDeleteOnClose', False)
and self.indexFilePath
and os.path.isfile(self.indexFilePath)
):
try:
os.remove(self.indexFilePath)
except Exception as exception:
if self.printDebug >= 1:
print(
"[Warning] Failed to remove temporarily downloaded and/or extracted index file at:",
self.indexFilePath,
"because of:",
exception,
)

if hasattr(self, 'indexFilePath') and hasattr(self, 'indexFilePathDeleteOnClose'):
self.indexFilePath = indexFilePath
self.indexFilePathDeleteOnClose = deleteOnClose

def _loadIndex(self, indexFilePath: str, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]) -> None:
"""
Loads the given index SQLite database and checks it for validity raising an exception if it is invalid.
Expand All @@ -964,7 +1020,68 @@ def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Di
if self.indexIsLoaded():
return

self.sqlConnection = self._openSqlDb(indexFilePath)
# Download and/or extract the file to a temporary file if necessary.

# Strip file:// prefix to avoid useless copies to the temporary directory.
# TODO What about operator chainin?! It would be a valid use case for starting with file://!
fileURLPrefix = 'file://'
while indexFilePath.startswith(fileURLPrefix):
indexFilePath = indexFilePath[len(fileURLPrefix) :]

temporaryFolder = os.environ.get("RATARMOUNT_INDEX_TMPDIR", None)

def _undoCompression(file) -> Optional[Tuple[str, CompressionInfo]]:
compressionsToTest = TAR_COMPRESSION_FORMATS.copy()
compressionsToTest.update(LIBARCHIVE_FILTER_FORMATS)
compression = detectCompression(file, printDebug=self.printDebug, compressionsToTest=compressionsToTest)
if not compression or compression not in compressionsToTest:
return None

if self.printDebug >= 2:
print(f"[Info] Detected {compression}-compressed index.")

formatOpen = findAvailableOpen(compression)
if not formatOpen:
moduleNames = [module.name for module in TAR_COMPRESSION_FORMATS[compression].modules]
raise CompressionError(
f"Cannot open a {compression} compressed index file {indexFilePath} "
f"without any of these modules: {moduleNames}"
)

return formatOpen(file)

def _copyToTemp(file):
self._temporaryIndexFile = tempfile.NamedTemporaryFile(suffix=".tmp.sqlite.index", dir=temporaryFolder)
# TODO add progress bar / output?
with open(self._temporaryIndexFile.name, 'wb') as targetFile:
shutil.copyfileobj(file, targetFile)

if '://' in indexFilePath:
if fsspec is None:
raise RatarmountError(
"Detected an URL for the index path but fsspec could not be imported!\n"
"Try installing it with 'pip install fsspec' or 'pip install ratarmount[full]'."
)

# TODO Maybe manual deletion not even necessary when using tempfile correctly?
with fsspec.open(indexFilePath) as file:
decompressedFile = _undoCompression(file)
with decompressedFile if decompressedFile else file as fileToCopy:
_copyToTemp(fileToCopy)
else:
with open(indexFilePath, 'rb') as file:
decompressedFile = _undoCompression(file)
if decompressedFile:
with decompressedFile:
_copyToTemp(decompressedFile)
else:
temporaryIndexFilePath = indexFilePath

temporaryIndexFilePath = self._temporaryIndexFile.name if self._temporaryIndexFile else indexFilePath

# Done downloading and/or extracting the SQLite index.

self.sqlConnection = self._openSqlDb(temporaryIndexFilePath)
tables = getSqliteTables(self.sqlConnection)
versions = None
try:
Expand Down Expand Up @@ -1036,10 +1153,15 @@ def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Di
pass

if self.printDebug >= 1:
print(f"Successfully loaded offset dictionary from {str(indexFilePath)}")
message = "Successfully loaded offset dictionary from " + str(indexFilePath)
if temporaryIndexFilePath != indexFilePath:
message += " temporarily downloaded/decompressed into: " + str(temporaryIndexFilePath)
print(message)

self._setIndexFilePath(temporaryIndexFilePath)

def _tryLoadIndex(
self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Dict[str, Any]], None]] = None
self, indexFilePath: str, checkMetadata: Optional[Callable[[Dict[str, Any]], None]] = None
) -> bool:
"""Calls loadIndex if index is not loaded already and provides extensive error handling."""

Expand Down
7 changes: 5 additions & 2 deletions core/ratarmountcore/compressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,10 @@ def getGzipInfo(fileobj: IO[bytes]) -> Optional[Tuple[str, int]]:


def detectCompression(
fileobj: IO[bytes], prioritizedBackends: Optional[List[str]], printDebug: int = 0
fileobj: IO[bytes],
prioritizedBackends: Optional[List[str]] = None,
printDebug: int = 0,
compressionsToTest: Dict[str, CompressionInfo] = TAR_COMPRESSION_FORMATS,
) -> Optional[str]:
# isinstance(fileobj, io.IOBase) does not work for everything, e.g., for paramiko.sftp_file.SFTPFile
# because it does not inherit from io.IOBase. Therefore, do duck-typing and test for required methods.
Expand All @@ -594,7 +597,7 @@ def detectCompression(
return None

oldOffset = fileobj.tell()
for compressionId, compression in TAR_COMPRESSION_FORMATS.items():
for compressionId, compression in compressionsToTest.items():
# The header check is a necessary condition not a sufficient condition.
# Especially for gzip, which only has 2 magic bytes, false positives might happen.
# Therefore, only use the magic bytes based check if the module could not be found
Expand Down
18 changes: 16 additions & 2 deletions ratarmount.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,15 @@ def pointsIntoMountPoint(pathToTest):
hasIndexPath = False

if 'indexFilePath' in options and isinstance(options['indexFilePath'], str):
indexFilePath = os.path.realpath(options['indexFilePath'])
indexFilePath = options['indexFilePath']
# TODO What about operator chainin?! It would be a valid use case for starting with file://!
if '://' in options['indexFilePath']:
fileURLPrefix = 'file://'
while indexFilePath.startswith(fileURLPrefix):
indexFilePath = indexFilePath[len(fileURLPrefix) :]
if '://' not in indexFilePath:
indexFilePath = os.path.realpath(options['indexFilePath'])

if pointsIntoMountPoint(indexFilePath):
del options['indexFilePath']
else:
Expand Down Expand Up @@ -1265,7 +1273,13 @@ def _parseArgs(rawArgs: Optional[List[str]] = None):
indexGroup.add_argument(
'--index-file', type=str,
help='Specify a path to the .index.sqlite file. Setting this will disable fallback index folders. '
'If the given path is ":memory:", then the index will not be written out to disk.')
'If the given path is ":memory:", then the index will not be written out to disk. '
'If the specified path is a remote URL, such as "https://host.org/file.tar.index.sqlite", or '
'a compressed index, such as "file.tar.index.sqlite.gz", then the index file will be downloaded '
f'and/or extracted into the default temporary folder ({tempfile.gettempdir()}). This path can be '
'changed with the environment variable RATARMOUNT_INDEX_TMPDIR. The temporary folder in general '
'can also be changed with these environment variables in decreasing priority: TMPDIR, TEMP, TMP '
'as described in the Python tempfile standard library documentation.')

indexFolders = ['', os.path.join( "~", ".ratarmount")]
xdgCacheHome = getXdgCacheHome()
Expand Down
10 changes: 9 additions & 1 deletion tests/ratarmount-help.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c5a2a0f

Please sign in to comment.