From 7bd7465808b2777fde40180032143b3287e236dd Mon Sep 17 00:00:00 2001 From: David Manthey Date: Fri, 26 Jan 2024 09:08:33 -0500 Subject: [PATCH 1/2] Import data if imported from an assetstore --- Dockerfile | 7 +- devops/wsi_deid/docker-compose.yml | 16 ++ devops/wsi_deid/install_and_start_isyntax.sh | 1 + wsi_deid/__init__.py | 6 +- wsi_deid/assetstore_import.py | 82 +++++++++ wsi_deid/import_export.py | 184 ++++++++++++------- 6 files changed, 231 insertions(+), 65 deletions(-) create mode 100644 wsi_deid/assetstore_import.py diff --git a/Dockerfile b/Dockerfile index a230c6e..71d4c11 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,6 +18,7 @@ RUN apt-get update && \ ca-certificates \ curl \ fonts-dejavu \ + fuse \ git \ less \ libmagic-dev \ @@ -93,6 +94,8 @@ RUN curl -sL https://deb.nodesource.com/setup_14.x | bash && \ nodejs && \ apt-get clean && rm -rf /var/lib/apt/lists/* +RUN mkdir -p /fuse --mode=a+rwx + RUN mkdir -p wsi_deid && \ mkdir -p /conf @@ -105,7 +108,7 @@ RUN python -m pip install --no-cache-dir \ # git+https://github.com/DigitalSlideArchive/DSA-WSI-DeID.git \ . \ # girder[mount] adds dependencies to show tiles from S3 assets \ - # girder[mount] \ + girder[mount] \ # Add additional girder plugins here \ # girder-homepage \ # Use prebuilt wheels whenever possible \ @@ -132,4 +135,4 @@ COPY ./devops/wsi_deid/girder.local.conf ./devops/wsi_deid/provision.py ./devops ENTRYPOINT ["/usr/bin/tini", "--"] -CMD python /conf/provision.py && girder serve +CMD python /conf/provision.py && (girder mount /fuse 2>/dev/null || true) && girder serve diff --git a/devops/wsi_deid/docker-compose.yml b/devops/wsi_deid/docker-compose.yml index 9fbe0ef..e7c3066 100644 --- a/devops/wsi_deid/docker-compose.yml +++ b/devops/wsi_deid/docker-compose.yml @@ -8,6 +8,22 @@ services: # so that local file assetstores and logs are owned by yourself. # user: ${CURRENT_UID} restart: unless-stopped + # To have full capabilities with S3 assetstores, we use a user file system + # (fuse). This requires some privileges. This is not needed if only + # filesystem assetstores are used. Instead of privileged mode, fuse can + # use specific devices, security_opt, and cap_add: + # devices: + # - /dev/fuse:/dev/fuse + # security_opt: + # - apparmor:unconfined + # cap_add: + # - SYS_ADMIN + # but these may be somewhat host specific, so we default to privileged. If + # the docker daemon is being run with --no-new-privileges, fuse may not + # work. + # See also https://github.com/docker/for-linux/issues/321 for possible + # methods to avoid both privileged mode and cap_add SYS_ADMIN. + privileged: true # Set DSA_PORT to expose the interface on another port (default 8080). ports: - "${DSA_PORT:-8080}:8080" diff --git a/devops/wsi_deid/install_and_start_isyntax.sh b/devops/wsi_deid/install_and_start_isyntax.sh index 3173e68..647e7da 100755 --- a/devops/wsi_deid/install_and_start_isyntax.sh +++ b/devops/wsi_deid/install_and_start_isyntax.sh @@ -22,4 +22,5 @@ set -e ) || true # This is our original start method python /conf/provision.py +girder mount /fuse || true girder serve diff --git a/wsi_deid/__init__.py b/wsi_deid/__init__.py index 44d0943..b3505dd 100644 --- a/wsi_deid/__init__.py +++ b/wsi_deid/__init__.py @@ -3,7 +3,7 @@ import girder import PIL.Image import psutil -from girder import plugin +from girder import events, plugin from girder.constants import AssetstoreType from girder.exceptions import GirderException, ValidationException from girder.models.assetstore import Assetstore @@ -13,6 +13,7 @@ from girder.utility import setting_utilities from pkg_resources import DistributionNotFound, get_distribution +from . import assetstore_import from .constants import PluginSettings from .import_export import SftpMode from .rest import WSIDeIDResource, addSystemEndpoints @@ -106,3 +107,6 @@ def load(self, info): if idx1 not in File()._indices: File().ensureIndex(idx1) PIL.Image.ANTIALIAS = PIL.Image.LANCZOS + + events.bind('rest.post.assetstore/:id/import.after', 'wsi_deid', + assetstore_import.assetstoreImportEvent) diff --git a/wsi_deid/assetstore_import.py b/wsi_deid/assetstore_import.py new file mode 100644 index 0000000..323192a --- /dev/null +++ b/wsi_deid/assetstore_import.py @@ -0,0 +1,82 @@ +import os + +from girder.api.rest import getCurrentUser +from girder.models.folder import Folder +from girder.models.item import Item +from girder.models.setting import Setting +from girder.utility.progress import ProgressContext + +from . import import_export +from .constants import PluginSettings + + +def assetstoreImportEvent(event): + """ + After an assetstore import, check if the import was done within the Unfiled + folder. If so, import the data. + """ + check = False + try: + params = event.info['returnVal']['params'] + if params['destinationType'] == 'folder': + folder = Folder().load(params['destinationId'], force=True) + if import_export.isProjectFolder(folder) == 'unfiled': + check = True + except Exception: + return + if not check: + return + progress = True + user = getCurrentUser() + with ProgressContext(progress, user=user, title='Importing data') as ctx: + importFromUnfiled(user, ctx) + + +def assetstoreIngestFindFilesFolder(folder, excelFiles, imageFiles): + """ + Walk a folder and add likely files to excel and image file lists. + + :param folder: folder to walk + :param excelFiles: list of items that are probably excel files. + :param imageFiles: list of items that are probably image files. + """ + for subfolder in Folder().childFolders(folder, parentType='folder'): + assetstoreIngestFindFilesFolder(subfolder, excelFiles, imageFiles) + for item in Folder().childItems(folder): + _, ext = os.path.splitext(item['name']) + # skip this item if it has metadata + if len(item.get('meta', {})): + continue + if ext.lower() in {'.xls', '.xlsx', '.csv'} and not item['name'].startswith('~$'): + if len(list(Item().childFiles(item, limit=1))): + excelFiles.append(item) + elif 'largeImage' in item: + imageFiles.append(item) + + +def assetstoreIngestFindFiles(importPath): + """ + Get a list of excel and image files for import. + + :param importPath: the import folder path (ignored). + :returns: a two tuple of lists of excel items and image items. + """ + unfiledFolderId = Setting().get(PluginSettings.WSI_DEID_UNFILED_FOLDER) + unfiledFolder = Folder().load(unfiledFolderId, force=True, exc=True) + + excelFiles = [] + imageFiles = [] + assetstoreIngestFindFilesFolder(unfiledFolder, excelFiles, imageFiles) + return excelFiles, imageFiles + + +def importFromUnfiled(user, ctx): + """ + Walk the unfiled folder. Collect all excel and image files that haven't + been processed before, then process them as if they came through the import + mechanism. + + :param user: the user that started the process. + :param ctx: A progress context. + """ + import_export.ingestData(user=user, walkData=assetstoreIngestFindFiles, ctx=ctx) diff --git a/wsi_deid/import_export.py b/wsi_deid/import_export.py index d9df78d..810145e 100644 --- a/wsi_deid/import_export.py +++ b/wsi_deid/import_export.py @@ -4,6 +4,7 @@ import shutil import subprocess import tempfile +import time import jsonschema import magic @@ -30,7 +31,7 @@ SCHEMA_FILE_PATH = os.path.join(os.path.dirname(__file__), 'schema', 'importManifestSchema.json') -def readExcelData(filepath): +def readExcelData(filepathOrFptr): """ Read in the data from excel, while attempting to be forgiving about the exact location of the header row. @@ -44,10 +45,17 @@ def readExcelData(filepath): validateImageIDField = config.getConfig('validate_image_id_field', True) potential_header = 0 reader = pd.read_csv - mimetype = magic.from_file(filepath, mime=True) + ispath = not hasattr(filepathOrFptr, 'seek') + if ispath: + filepath = filepathOrFptr + mimetype = magic.from_file(filepath, mime=True) + else: + fptr = filepathOrFptr + mimetype = magic.from_buffer(fptr.read(16384), mime=True) + fptr.seek(0) if 'excel' in mimetype or 'openxmlformats' in mimetype: reader = pd.read_excel - df = reader(filepath, header=potential_header, dtype=str) + df = reader(filepathOrFptr, header=potential_header, dtype=str) rows = df.shape[0] while potential_header < rows: # When the columns include TokenID, ImageID, this is the Header row. @@ -58,11 +66,13 @@ def readExcelData(filepath): if not validateImageIDField and folderNameField in df.columns: return df, potential_header potential_header += 1 - df = reader(filepath, header=potential_header, dtype=str) + if not ispath: + fptr.seek(0) + df = reader(filepathOrFptr, header=potential_header, dtype=str) err = (f'Was expecting columns named {folderNameField} and {imageNameField}.' if validateImageIDField else f'Was expecting a column named {folderNameField}.') - raise ValueError(f'Excel file {filepath} lacks a header row. ' + err) + raise ValueError(f'Excel file {filepath if ispath else "-"} lacks a header row. ' + err) def validateDataRow(validator, row, rowNumber, df): @@ -163,9 +173,24 @@ def readExcelFiles(filelist, ctx): # noqa else: properties = set(validator.schema['properties']) for filepath in filelist: + filepathOrFptr = filepath + if isinstance(filepath, dict): + item = filepath + filepath = item['name'] + try: + filepathOrFptr = File().open(next(Item().childFiles(item))) + except Exception as exc: + logger.info(f'Exception: {exc}') + message = 'Cannot read %s' % (os.path.basename(filepath), ) + status = 'badformat' + continue + # Set metadata indicating that we've parsed this file + timestamp = time.mktime(item['created'].timetuple()) + else: + timestamp = os.path.getmtime(filepath) ctx.update(message='Reading %s' % os.path.basename(filepath)) try: - df, header_row_number = readExcelData(filepath) + df, header_row_number = readExcelData(filepathOrFptr) for key in ['ScannedFileName', 'InputFileName']: if key in properties and key in df: df[key] = df[key].fillna('') @@ -189,7 +214,6 @@ def readExcelFiles(filelist, ctx): # noqa ctx.update(message=message) logger.info(message) continue - timestamp = os.path.getmtime(filepath) count = 0 totalErrors = [] for row_num, row in enumerate(df.itertuples()): @@ -282,8 +306,11 @@ def getExisting(imagePath, ctx): item because the file size changed. :returns: a status if the document exists or None if it doesn't. """ + if isinstance(imagePath, dict): + return None reimportIfMoved = config.getConfig('reimport_if_moved', False) - existingList = list(File().find({'path': imagePath, 'imported': True})) + existingList = list(File().find( + {'$or': [{'path': imagePath}, {'s3Key': imagePath}], 'imported': True})) existing = existingList[0] if existingList else None if reimportIfMoved and existing: moved = True @@ -328,25 +355,30 @@ def ingestOneItem(importFolder, imagePath, record, ctx, user, newItems): 'name': record[folderNameField], 'parentId': importFolder['_id']}) if not parentFolder: parentFolder = Folder().createFolder(importFolder, record[folderNameField], creator=user) - # TODO: (a) use the getTargetAssetstore method from Upload(), (b) ensure - # that the assetstore is a filesystem assestore. - assetstore = Assetstore().getCurrent() - # TODO: When imageNameFiled is blank or undefined, use the folder name - # plus a number - name = (record[imageNameField] or '') + os.path.splitext(record['name'])[1] - mimeType = 'image/tiff' - if Item().findOne({'name': {'$regex': '^%s\\.' % record[imageNameField]}}): - return 'duplicate' - item = Item().createItem(name=name, creator=user, folder=parentFolder) - stat = os.stat(imagePath) - file = File().createFile( - name=name, creator=user, item=item, reuseExisting=False, - assetstore=assetstore, mimeType=mimeType, size=stat.st_size, - saveFile=False) - file['path'] = os.path.abspath(os.path.expanduser(imagePath)) - file['mtime'] = stat.st_mtime - file['imported'] = True - file = File().save(file) + if not isinstance(imagePath, dict): + # TODO: (a) use the getTargetAssetstore method from Upload(), (b) + # ensure that the assetstore is a filesystem assestore. + assetstore = Assetstore().getCurrent() + # TODO: When imageNameFiled is blank or undefined, use the folder name + # plus a number + name = (record[imageNameField] or '') + os.path.splitext(record['name'])[1] + mimeType = 'image/tiff' + if Item().findOne({'name': {'$regex': '^%s\\.' % record[imageNameField]}}): + return 'duplicate' + item = Item().createItem(name=name, creator=user, folder=parentFolder) + stat = os.stat(imagePath) + file = File().createFile( + name=name, creator=user, item=item, reuseExisting=False, + assetstore=assetstore, mimeType=mimeType, size=stat.st_size, + saveFile=False) + file['path'] = os.path.abspath(os.path.expanduser(imagePath)) + file['mtime'] = stat.st_mtime + file['imported'] = True + file = File().save(file) + else: + # Move an existing item to the parent folder + item = Item().move(item, parentFolder) + # TODO: add metadata marking that this was added # Reload the item as it will have changed item = Item().load(item['_id'], force=True) if isinstance(record['fields'], dict): @@ -367,21 +399,27 @@ def ingestOneItem(importFolder, imagePath, record, ctx, user, newItems): def ingestImageToUnfiled(imagePath, unfiledFolder, ctx, user, unfiledItems, uploadInfo): if getExisting(imagePath, ctx) == 'present': return - ctx.update(message='Importing %s to the Unfiled folder' % imagePath) - assetstore = Assetstore().getCurrent() - _, name = os.path.split(imagePath) - mimeType = 'image/tiff' - item = Item().createItem(name=name, creator=user, folder=unfiledFolder) - item['wsi_uploadInfo'] = uploadInfo - item = Item().save(item) - stat = os.stat(imagePath) - file = File().createFile( - name=name, creator=user, item=item, reuseExisting=False, - assetstore=assetstore, mimeType=mimeType, size=stat.st_size, saveFile=False) - file['path'] = os.path.abspath(os.path.expanduser(imagePath)) - file['mtime'] = stat.st_mtime - file['imported'] = True - file = File().save(file) + if not isinstance(imagePath, dict): + ctx.update(message='Importing %s to the Unfiled folder' % imagePath) + assetstore = Assetstore().getCurrent() + _, name = os.path.split(imagePath) + mimeType = 'image/tiff' + item = Item().createItem(name=name, creator=user, folder=unfiledFolder) + item['wsi_uploadInfo'] = uploadInfo + item = Item().save(item) + stat = os.stat(imagePath) + file = File().createFile( + name=name, creator=user, item=item, reuseExisting=False, + assetstore=assetstore, mimeType=mimeType, size=stat.st_size, saveFile=False) + file['path'] = os.path.abspath(os.path.expanduser(imagePath)) + file['mtime'] = stat.st_mtime + file['imported'] = True + file = File().save(file) + else: + item = imagePath + ctx.update(message='Importing %s to the Unfiled folder' % item['name']) + item = Item().move(item, unfiledFolder) + # TODO: add metadata marking that this was added unfiledItems.append(item['_id']) @@ -400,7 +438,29 @@ def startOcrJobForUnfiled(itemIds, imageInfoDict, user, reportInfo): return unfiledJob['_id'] -def ingestData(ctx, user=None): # noqa +def directIngestFindFiles(importPath): + """ + Find all the image and excel files in the import path. + + :param importPath: the import path. + :returns: a two tuple of lists of excel files and image files. + """ + excelFiles = [] + imageFiles = [] + for base, _dirs, files in os.walk(importPath): + for file in files: + filePath = os.path.join(importPath, base, file) + _, ext = os.path.splitext(file) + if ext.lower() in {'.xls', '.xlsx', '.csv'} and not file.startswith('~$'): + excelFiles.append(filePath) + # ignore some extensions + elif (ext.lower() not in {'.zip', '.txt', '.xml', '.swp', '.xlk'} and + not file.startswith('~$') and not file.startswith('.~')): + imageFiles.append(filePath) + return excelFiles, imageFiles + + +def ingestData(ctx, user=None, walkData=None): # noqa """ Scan the import folder for image and excel files. For each excel file, extract the appropriate data. For each file listed in an excel file, @@ -413,6 +473,9 @@ def ingestData(ctx, user=None): # noqa :param ctx: a progress context. :param user: the user triggering this. + :param walkData: None to find files in the import path, otherwise a + function that takes an import path and yields a two tuple of a list of + excelFiles and a list of imageFiles. """ importPath = Setting().get(PluginSettings.WSI_DEID_IMPORT_PATH) importFolderId = Setting().get(PluginSettings.HUI_INGEST_FOLDER) @@ -420,18 +483,10 @@ def ingestData(ctx, user=None): # noqa raise Exception('Import path and/or folder not specified.') importFolder = Folder().load(importFolderId, force=True, exc=True) ctx.update(message='Scanning import folder') - excelFiles = [] - imageFiles = [] - for base, _dirs, files in os.walk(importPath): - for file in files: - filePath = os.path.join(importPath, base, file) - _, ext = os.path.splitext(file) - if ext.lower() in {'.xls', '.xlsx', '.csv'} and not file.startswith('~$'): - excelFiles.append(filePath) - # ignore some extensions - elif (ext.lower() not in {'.zip', '.txt', '.xml', '.swp', '.xlk'} and - not file.startswith('~$') and not file.startswith('.~')): - imageFiles.append(filePath) + if not walkData: + excelFiles, imageFiles = directIngestFindFiles(importPath) + else: + excelFiles, imageFiles = walkData(importPath) if not len(excelFiles): ctx.update(message='Failed to find any excel files in import directory.') if not len(imageFiles): @@ -484,10 +539,14 @@ def ingestData(ctx, user=None): # noqa for image in imageFiles: if unfiledFolder is None: status = 'unlisted' - report.append({'record': None, 'status': status, 'path': image}) + report.append({ + 'record': None, 'status': status, + 'path': image if not isinstance(image, dict) else image['name']}) else: ingestImageToUnfiled(image, unfiledFolder, ctx, user, unfiledItems, unfiledImages) - report.append({'status': 'unfiled', 'path': image}) + report.append({ + 'status': 'unfiled', + 'path': image if not isinstance(image, dict) else image['name']}) if len(unfiledItems): unfiledJobId = startOcrJobForUnfiled( unfiledItems, unfiledImages, user, @@ -507,7 +566,7 @@ def ingestData(ctx, user=None): # noqa args=(newItems), ) Job().scheduleJob(job=batchJob) - file = importReport(ctx, report, excelReport, user, importPath) + file = importReport(ctx, report, excelReport, user, importPath if not walkData else None) summary = reportSummary(report, excelReport, file=file) if startOcrDuringImport and batchJob: summary['ocr_job'] = batchJob['_id'] @@ -525,7 +584,7 @@ def importReport(ctx, report, excelReport, user, importPath, reason=None): :param excelReport: a list of excel files that were processed. :param user: the user triggering this. :param importPath: the path of the import folder. Used to show relative - paths in the report. + paths in the report. None to not use relative reporting. :return: the Girder file with the report """ ctx.update(message='Generating report') @@ -564,7 +623,8 @@ def importReport(ctx, report, excelReport, user, importPath, reason=None): anyErrors = False for row in excelReport: data = { - 'ExcelFilePath': os.path.relpath(row['path'], importPath), + 'ExcelFilePath': os.path.relpath(row['path'], importPath) + if importPath else row['path'], statusKey: excelStatusDict.get(row['status'], row['status']), reasonKey: row.get('reason'), } @@ -575,7 +635,7 @@ def importReport(ctx, report, excelReport, user, importPath, reason=None): for row in report: data = { 'WSIFilePath': os.path.relpath(row['path'], importPath) if row.get( - 'path') and row['status'] != 'missing' else row.get('path'), + 'path') and row['status'] != 'missing' and importPath else row.get('path'), statusKey: statusDict.get(row['status'], row['status']), } if row.get('record'): @@ -583,7 +643,7 @@ def importReport(ctx, report, excelReport, user, importPath, reason=None): fields = {key: value for key, value in fields.items() if key in reportFields} data.update(fields) for k, v in row['record'].items(): - if k == 'excel' and v: + if k == 'excel' and v and importPath: v = os.path.relpath(v, importPath) if k != 'fields': data[keyToColumns.get(k, k)] = v From 78569ee3ade1473c2a62c276ea4a9fa02b3564d9 Mon Sep 17 00:00:00 2001 From: David Manthey Date: Mon, 26 Feb 2024 09:52:51 -0500 Subject: [PATCH 2/2] Log more --- Dockerfile | 1 + wsi_deid/assetstore_import.py | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/Dockerfile b/Dockerfile index 71d4c11..3f15052 100644 --- a/Dockerfile +++ b/Dockerfile @@ -105,6 +105,7 @@ COPY . . # By using --no-cache-dir the Docker image is smaller RUN python -m pip install --no-cache-dir \ + git+https://github.com/DigitalSlideArchive/import-tracker.git \ # git+https://github.com/DigitalSlideArchive/DSA-WSI-DeID.git \ . \ # girder[mount] adds dependencies to show tiles from S3 assets \ diff --git a/wsi_deid/assetstore_import.py b/wsi_deid/assetstore_import.py index 323192a..b35d4de 100644 --- a/wsi_deid/assetstore_import.py +++ b/wsi_deid/assetstore_import.py @@ -1,5 +1,6 @@ import os +from girder import logger from girder.api.rest import getCurrentUser from girder.models.folder import Folder from girder.models.item import Item @@ -15,6 +16,7 @@ def assetstoreImportEvent(event): After an assetstore import, check if the import was done within the Unfiled folder. If so, import the data. """ + logger.info('Processing assetstore import event') check = False try: params = event.info['returnVal']['params'] @@ -23,9 +25,12 @@ def assetstoreImportEvent(event): if import_export.isProjectFolder(folder) == 'unfiled': check = True except Exception: + logger.exception('Processing assetstore import event: Failed to check') return if not check: + logger.info('Processing assetstore import event: not the unfiled folder') return + logger.info('Processing assetstore import event: will process') progress = True user = getCurrentUser() with ProgressContext(progress, user=user, title='Importing data') as ctx: