Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CADC-13844 - refactor to #84

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
10 changes: 6 additions & 4 deletions gem2caom2/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ def build(self, entry):
or self._config.use_local_files
):
self._logger.debug(f'Using entry for source.')
result = gem_name.GemName(file_name=f_name)
result = gem_name.GemName(file_name=f_name, filter_cache=self._metadata_reader.filter_cache)
result.source_names = [entry]
elif '.fits' in entry or '.jpg' in entry:
self._logger.debug('Using file_id for source.')
result = gem_name.GemName(file_name=f_name)
result = gem_name.GemName(file_name=f_name, filter_cache=self._metadata_reader.filter_cache)
result.source_names = [result.file_id]
elif '.fits' not in entry and '.jpg' not in entry:
# this case exists so that retries.txt entries are
Expand All @@ -129,7 +129,9 @@ def build(self, entry):
'entry might be file_id, try a made-up name.'
)
made_up_file_name = f'{entry}.fits'
result = gem_name.GemName(file_name=made_up_file_name)
result = gem_name.GemName(
file_name=made_up_file_name, filter_cache=self._metadata_reader.filter_cache
)
result.source_names = [result.file_id]
self._metadata_reader.set(result)
# StorageName instance is only partially constructed at this
Expand All @@ -139,5 +141,5 @@ def build(self, entry):
return result
except Exception as e:
self._logger.error(e)
self._logger.debug(traceback.format_exc())
self._logger.error(traceback.format_exc())
raise mc.CadcException(e)
35 changes: 8 additions & 27 deletions gem2caom2/composable.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
from gem2caom2 import pull_augmentation, data_source, builder
from gem2caom2 import cleanup_augmentation, fits2caom2_augmentation
from gem2caom2 import gemini_metadata, svofps
from gem2caom2.gem_name import GemName


DATA_VISITORS = [ghost_preview_augmentation]
META_VISITORS = [fits2caom2_augmentation, pull_augmentation, preview_augmentation, cleanup_augmentation]
Expand Down Expand Up @@ -148,13 +150,11 @@ def _common_init():
clients = GemClientCollection(config)
meta_visitors = META_VISITORS
gemini_session = mc.get_endpoint_session()
provenance_finder = gemini_metadata.ProvenanceFinder(
config, clients.query_client, gemini_session
)
svofps_session = mc.get_endpoint_session()
filter_cache = svofps.FilterMetadataCache(svofps_session)
clients.gemini_session = gemini_session
clients.svo_session = svofps_session
provenance_finder = gemini_metadata.ProvenanceFinder(clients, config)
if config.use_local_files or mc.TaskType.SCRAPE in config.task_types:
metadata_reader = gemini_metadata.GeminiFileMetadataReader(
gemini_session, provenance_finder, filter_cache
Expand Down Expand Up @@ -331,40 +331,21 @@ def _run_incremental_diskfiles():
clients = GemClientCollection(config)
meta_visitors = META_VISITORS
gemini_session = mc.get_endpoint_session()
provenance_finder = gemini_metadata.ProvenanceFinder(config, clients.query_client, gemini_session)
svofps_session = mc.get_endpoint_session()
filter_cache = svofps.FilterMetadataCache(svofps_session)
clients.gemini_session = gemini_session
clients.svo_session = svofps_session
metadata_reader = gemini_metadata.FileInfoBeforeJsonReader(
clients.data_client, gemini_session, provenance_finder, filter_cache
)
reader_lookup = gemini_metadata.GeminiMetadataLookup(metadata_reader)
reader_lookup.reader = metadata_reader
name_builder = builder.GemObsIDBuilder(config, metadata_reader, reader_lookup)
incremental_source = data_source.IncrementalSourceDiskfiles(config, metadata_reader)
result = rc.run_by_state(
incremental_source = data_source.IncrementalSourceDiskfiles(config, gemini_session, GemName, filter_cache)
return rc.run_by_state_runner_meta(
config=config,
name_builder=name_builder,
meta_visitors=meta_visitors,
data_visitors=DATA_VISITORS,
sources=[incremental_source],
clients=clients,
metadata_reader=metadata_reader,
organizer_module_name='gem2caom2.composable',
organizer_class_name='GemOrganizeExecutes',
organizer_module_name='gem2caom2.gemini_metadata',
organizer_class_name='GeminiOrganizeExecutesRunnerMeta',
storage_name_ctor=GemName,
)
if incremental_source.max_records_encountered:
# There's a currently 10k limit on the number of records returned from the archive.gemini.edu endpoints.
# As of this writing, if that limit is encountered, the ingestion requires manual intervention to recover.
# As opposed to stopping the incremental ingestion with a time-box where it will just continually fail
# with this same error, which is what raising the exception would do, the app notifies Ops via the log
# message, and then carries on, so as to more efficiently process large numbers of inputs.
logging.warning('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
logging.warning('Encountered maximum records!!')
logging.warning('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
result |= -1
return result


def run_incremental_diskfiles():
Expand Down
44 changes: 29 additions & 15 deletions gem2caom2/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@
from collections import defaultdict, deque, OrderedDict
from datetime import datetime

from cadcdata import FileInfo
from caom2utils.blueprints import _to_int
from caom2utils.data_util import get_file_type
from caom2pipe import client_composable as clc
from caom2pipe import data_source_composable as dsc
from caom2pipe.manage_composable import build_uri, CaomName, ISO_8601_FORMAT, make_datetime, query_endpoint_session
from caom2pipe.manage_composable import StorageName
from gem2caom2.obs_file_relationship import repair_data_label


__all__ = ['GEM_BOOKMARK', 'IncrementalSource', 'PublicIncremental']
Expand Down Expand Up @@ -230,13 +234,14 @@ class IncrementalSourceDiskfiles(dsc.IncrementalDataSource):
entrytime is when the DB record behind the JSON being displayed was created.
"""

def __init__(self, config, reader):
def __init__(self, config, gemini_session, storage_name_ctor, filter_cache):
super().__init__(config, start_key=GEM_BOOKMARK)
self._max_records_encountered = False
self._encounter_start = None
self._encounter_end = None
self._session = reader._session
self._metadata_reader = reader
self._session = gemini_session
self._storage_name_ctor = storage_name_ctor
self._filter_cache = filter_cache

def _initialize_end_dt(self):
self._end_dt = datetime.now()
Expand All @@ -250,10 +255,14 @@ def get_time_box_work(self, prev_exec_dt, exec_dt):
"""

self._logger.debug(f'Begin get_time_box_work from {prev_exec_dt} to {exec_dt}.')
self._max_records_encountered = False
# datetime format 2019-12-01T00:00:00 => no microseconds in the url
prev_exec_dt_iso = prev_exec_dt.replace(microsecond=0)
exec_dt_iso = exec_dt.replace(microsecond=0)
url = f'https://archive.gemini.edu/diskfiles/entrytimedaterange={prev_exec_dt_iso.isoformat()}--{exec_dt_iso.isoformat()}'
url = (
f'https://archive.gemini.edu/diskfiles/entrytimedaterange='
f'{prev_exec_dt_iso.isoformat()}--{exec_dt_iso.isoformat()}'
)

# needs to be ordered by timestamps when processed
self._logger.info(f'Querying {url}')
Expand All @@ -277,13 +286,21 @@ def get_time_box_work(self, prev_exec_dt, exec_dt):
for entry_dt, values in metadata.items():
for value in values:
file_name = value.get('filename')
entries.append(dsc.StateRunnerMeta(file_name, entry_dt))
uri = build_uri(StorageName.collection, file_name, StorageName.scheme)
self._metadata_reader.add_file_info_html_record(uri, value)
storage_name = self._storage_name_ctor(file_name, self._filter_cache)
storage_name.file_info[storage_name.destination_uris[0]] = FileInfo(
id=file_name,
size=_to_int(value.get('data_size')),
md5sum=value.get('data_md5'),
file_type=get_file_type(file_name),
)
repaired_data_label = repair_data_label(file_name, value.get('datalabel'))
storage_name.obs_id = repaired_data_label
entries.append(dsc.RunnerMeta(storage_name, entry_dt))
finally:
if response is not None:
response.close()
if len(entries) == MAX_ENTRIES:
self._logger.error(f'Max records window {self._encounter_start} to {self._encounter_end}.')
self._max_records_encountered = True
self._encounter_start = prev_exec_dt
self._encounter_end = exec_dt
Expand All @@ -298,9 +315,12 @@ def _parse_diskfiles_response(self, html_string):
for row in rows:
cells = row.find_all('td')
entry_time = make_datetime(cells[5].text.strip()) # what the https query is keyed on
file_name = cells[0].text.strip()
if file_name.endswith('-fits!-md!'):
continue
value = {
'filename': cells[0].text.strip(),
'datalabel': cells[1].text.strip(),
'filename': file_name,
'datalabel': cells[1].text.strip().split('\n')[-1],
'instrument': cells[2].text.strip(),
'lastmod': make_datetime(cells[6].text.strip()),
'data_size': cells[10].text.strip(),
Expand All @@ -310,11 +330,5 @@ def _parse_diskfiles_response(self, html_string):
result = OrderedDict(sorted(temp.items()))
return result

@property
def max_records_encountered(self):
if self._max_records_encountered:
self._logger.error(
f'Max records window {self._encounter_start} to '
f'{self._encounter_end}.'
)
return self._max_records_encountered
22 changes: 20 additions & 2 deletions gem2caom2/fits2caom2_augmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
from caom2pipe import caom_composable as cc
from gem2caom2 import main_app


class GeminiFits2caom2Visitor(cc.Fits2caom2Visitor):
def __init__(self, observation, **kwargs):
super().__init__(observation, **kwargs)
Expand All @@ -87,5 +86,24 @@ def _get_mapping(self, headers, _):
)


class GeminiFits2caom2VisitorRunnerMeta(cc.Fits2caom2VisitorRunnerMeta):
def __init__(self, observation, **kwargs):
super().__init__(observation, **kwargs)

def _get_mapping(self, dest_uri):
return main_app.mapping_factory(
self._storage_name,
self._storage_name.metadata.get(dest_uri),
None, # metadata reader
self._clients,
self._reporter._observable,
self._observation,
self._config,
)


def visit(observation, **kwargs):
return GeminiFits2caom2Visitor(observation, **kwargs).visit()
if 'reporter' in kwargs:
return GeminiFits2caom2VisitorRunnerMeta(observation, **kwargs).visit()
else:
return GeminiFits2caom2Visitor(observation, **kwargs).visit()
15 changes: 14 additions & 1 deletion gem2caom2/gem_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
# ***********************************************************************
#

from os.path import basename

from caom2pipe import manage_composable as mc

from gem2caom2.obs_file_relationship import remove_extensions
Expand Down Expand Up @@ -145,10 +147,21 @@ class GemName(mc.StorageName):
def __init__(
self,
file_name=None,
filter_cache=None,
):
super().__init__(file_name=file_name.replace('.header', ''))
super().__init__(file_name=basename(file_name.replace('.header', '')))
# use the file id because the extension doesn't help much in the archive.gemini.edu URL
self._source_names = [self._file_id]
self._json_metadata = {}
self._filter_cache = filter_cache

@property
def json_metadata(self):
return self._json_metadata

@property
def name(self):
return self._file_id

@property
def prev(self):
Expand Down
Loading