From 3b2dfa2465788045e7e5275d207b46f15c736378 Mon Sep 17 00:00:00 2001 From: Mark Stuart Date: Fri, 2 Jun 2023 15:23:37 +1200 Subject: [PATCH] Port and improve work done in https://github.com/ckan/ckanext-spatial/pull/259 --- ckanext/spatial/harvesters/csw.py | 17 ++++++- ckanext/spatial/lib/csw_client.py | 73 ++++++++++++++++++++++++------- 2 files changed, 72 insertions(+), 18 deletions(-) diff --git a/ckanext/spatial/harvesters/csw.py b/ckanext/spatial/harvesters/csw.py index c27824be..7858d0ae 100644 --- a/ckanext/spatial/harvesters/csw.py +++ b/ckanext/spatial/harvesters/csw.py @@ -160,12 +160,25 @@ def fetch_stage(self,harvest_object): harvest_object) return False + # load config + self._set_source_config(harvest_object.source.config) + # get output_schema from config + output_schema = self.source_config.get('output_schema', self.output_schema()) + identifier = harvest_object.guid try: record = self.csw.getrecordbyid([identifier], outputschema=self.output_schema()) except Exception as e: - self._save_object_error('Error getting the CSW record with GUID %s' % identifier, harvest_object) - return False + try: + log.warn('Unable to fetch GUID {} with output schema: {}'.format(identifier, output_schema)) + if output_schema == self.output_schema(): + raise e + log.info('Fetching GUID {} with output schema: {}'.format(identifier, self.output_schema())) + # retry with default output schema + record = self.csw.getrecordbyid([identifier], outputschema=self.output_schema()) + except Exception as e: + self._save_object_error('Error getting the CSW record with GUID {}'.format(identifier), harvest_object) + return False if record is None: self._save_object_error('Empty record for GUID %s' % identifier, diff --git a/ckanext/spatial/lib/csw_client.py b/ckanext/spatial/lib/csw_client.py index 0ac075e5..3243aa04 100644 --- a/ckanext/spatial/lib/csw_client.py +++ b/ckanext/spatial/lib/csw_client.py @@ -35,6 +35,8 @@ def _xmd(self, obj): pass elif isinstance(val, six.string_types): md[attr] = val + elif isinstance(val, bytes): + md[attr] = val elif isinstance(val, int): md[attr] = val elif isinstance(val, list): @@ -57,7 +59,6 @@ def getcapabilities(self, debug=False, **kw): caps = self._xmd(ows) if not debug: if "request" in caps: del caps["request"] - if "response" in caps: del caps["response"] if "owscommon" in caps: del caps["owscommon"] return caps @@ -70,13 +71,41 @@ class CswService(OwsService): def __init__(self, endpoint=None): super(CswService, self).__init__(endpoint) self.sortby = SortBy([SortProperty('dc:identifier')]) + # check capabilities + log.warn("IN SETUP: %s", endpoint) + _cap = self.getcapabilities(endpoint)['response'] + self.capabilities = etree.ElementTree(etree.fromstring(_cap)) + self.output_schemas = { + 'GetRecords': self._get_output_schemas('GetRecords'), + 'GetRecordById': self._get_output_schemas('GetRecordById'), + } + log.warn("OUTPUTSCHEMAS: %s", self.output_schemas) + + def _get_output_schemas(self, operation): + _cap_ns = self.capabilities.getroot().nsmap + _ows_ns = _cap_ns.get('ows') + if not _ows_ns: + raise CswError('Bad getcapabilities response: OWS namespace not found ' + str(_cap_ns)) + _op = self.capabilities.find("//{{{}}}Operation[@name='{}']".format(_ows_ns, operation)) + _schemas = _op.find("{{{}}}Parameter[@name='outputSchema']".format(_ows_ns)) + _values = map(lambda v: v.text, _schemas.findall("{{{}}}Value".format(_ows_ns))) + output_schemas = {} + for key, value in _schemas.nsmap.items(): + if value in _values: + output_schemas.update({key : value}) + return output_schemas def getrecords(self, qtype=None, keywords=[], typenames="csw:Record", esn="brief", skip=0, count=10, outputschema="gmd", **kw): - from owslib.csw import namespaces constraints = [] csw = self._ows(**kw) + log.warn("OUTPUT_SCHEMA: %s", outputschema) + + # check target csw server capabilities for requested output schema + output_schemas = self.output_schemas['GetRecords'] + if not output_schemas.get(outputschema): + raise CswError('Output schema \'{}\' not supported by target server: '.format(output_schemas)) if qtype is not None: constraints.append(PropertyIsEqualTo("dc:type", qtype)) @@ -87,24 +116,29 @@ def getrecords(self, qtype=None, keywords=[], "esn": esn, "startposition": skip, "maxrecords": count, - "outputschema": namespaces[outputschema], + "outputschema": output_schemas[outputschema], "sortby": self.sortby - } + } log.info('Making CSW request: getrecords2 %r', kwa) csw.getrecords2(**kwa) if csw.exceptionreport: err = 'Error getting records: %r' % \ csw.exceptionreport.exceptions - #log.error(err) + log.error(err) raise CswError(err) return [self._xmd(r) for r in list(csw.records.values())] def getidentifiers(self, qtype=None, typenames="csw:Record", esn="brief", keywords=[], limit=None, page=10, outputschema="gmd", startposition=0, cql=None, **kw): - from owslib.csw import namespaces constraints = [] csw = self._ows(**kw) + log.warn("OUTPUT_SCHEMA: %s", outputschema) + + # fetch target csw server capabilities for requested output schema + output_schemas = self.output_schemas['GetRecords'] + if not output_schemas.get(outputschema): + raise CswError('Output schema \'{}\' not supported by target server: '.format(output_schemas)) if qtype is not None: constraints.append(PropertyIsEqualTo("dc:type", qtype)) @@ -115,20 +149,20 @@ def getidentifiers(self, qtype=None, typenames="csw:Record", esn="brief", "esn": esn, "startposition": startposition, "maxrecords": page, - "outputschema": namespaces[outputschema], + "outputschema": output_schemas[outputschema], "cql": cql, "sortby": self.sortby } i = 0 matches = 0 while True: - log.info('Making CSW request: getrecords2 %r', kwa) + log.warn('Making CSW request: getrecords2 %r', kwa) csw.getrecords2(**kwa) if csw.exceptionreport: err = 'Error getting identifiers: %r' % \ csw.exceptionreport.exceptions - #log.error(err) + log.error(err) raise CswError(err) if matches == 0: @@ -154,11 +188,15 @@ def getidentifiers(self, qtype=None, typenames="csw:Record", esn="brief", kwa["startposition"] = startposition def getrecordbyid(self, ids=[], esn="full", outputschema="gmd", **kw): - from owslib.csw import namespaces csw = self._ows(**kw) + # fetch target csw server capabilities for requested output schema + output_schemas=output_schemas = self.output_schemas['GetRecordById'] + if not output_schemas.get(outputschema): + raise CswError('Output schema \'{}\' not supported by target server: '.format(output_schemas)) + kwa = { "esn": esn, - "outputschema": namespaces[outputschema], + "outputschema": output_schemas[outputschema], } # Ordinary Python version's don't support the metadata argument log.info('Making CSW request: getrecordbyid %r %r', ids, kwa) @@ -168,14 +206,17 @@ def getrecordbyid(self, ids=[], esn="full", outputschema="gmd", **kw): csw.exceptionreport.exceptions #log.error(err) raise CswError(err) - if not csw.records: + elif csw.records: + record = self._xmd(list(csw.records.values())[0]) + elif csw.response: + record = self._xmd(etree.fromstring(csw.response)) + else: return - record = self._xmd(list(csw.records.values())[0]) ## strip off the enclosing results container, we only want the metadata - #md = csw._exml.find("/gmd:MD_Metadata")#, namespaces=namespaces) - # Ordinary Python version's don't support the metadata argument - md = csw._exml.find("/{http://www.isotc211.org/2005/gmd}MD_Metadata") + # '/{schema}*' expression should be safe enough and is able to match the + # desired schema followed by both MD_Metadata or MI_Metadata (iso19115[-2]) + md = csw._exml.find("/{{{schema}}}*".format(schema=output_schemas[outputschema])) mdtree = etree.ElementTree(md) try: record["xml"] = etree.tostring(mdtree, pretty_print=True, encoding=str)