diff --git a/VERSION b/VERSION index 47b322c9..4d9d11cf 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.4.1 +3.4.2 diff --git a/src/hubmap_translator.py b/src/hubmap_translator.py index ab10b36d..e7648314 100644 --- a/src/hubmap_translator.py +++ b/src/hubmap_translator.py @@ -81,7 +81,9 @@ def __init__(self, indices, app_client_id, app_client_secret, token, ontology_ap self.INDICES: dict = {'default_index': self.DEFAULT_INDEX_WITHOUT_PREFIX, 'indices': self.indices} self.DEFAULT_ENTITY_API_URL = self.INDICES['indices'][self.DEFAULT_INDEX_WITHOUT_PREFIX]['document_source_endpoint'].strip('/') self._ontology_api_base_url = ontology_api_base_url - self.es_retry_on_conflict_param_value = indices['es_retry_on_conflict_param_value'] + + # Commented out by Zhou to avoid 409 conflicts - 7/20/2024 + # self.es_retry_on_conflict_param_value = indices['es_retry_on_conflict_param_value'] self.indexer = Indexer(self.indices, self.DEFAULT_INDEX_WITHOUT_PREFIX) @@ -249,200 +251,216 @@ def __get_scope_list(self, entity_id, document, index, scope): scope_list = ['public', 'private'] return scope_list - def _relationships_changed_since_indexed( self, neo4j_ancestor_ids:list[str], neo4j_descendant_ids:list[str], - existing_oss_doc:json): - # Start with the safe assumption that relationships have changed, and - # only toggle if verified unchanged below - relationships_changed = True - - # Get the ancestors and descendants of this entity as they exist in OpenSearch. - oss_ancestor_ids = [] - if existing_oss_doc and 'fields' in existing_oss_doc and 'ancestor_ids' in existing_oss_doc['fields']: - oss_ancestor_ids = existing_oss_doc['fields']['ancestor_ids'] - oss_descendant_ids = [] - if existing_oss_doc and 'fields' in existing_oss_doc and 'descendant_ids' in existing_oss_doc['fields']: - oss_descendant_ids = existing_oss_doc['fields']['descendant_ids'] - - # If the ancestor list and descendant list on the OpenSearch document for this entity are - # not both exactly the same set of IDs as in Neo4j, relationships have changed and this - # entity must be re-indexed rather than just updating existing documents for associated entities. - # - # These lists are implicitly sets, as they do not have duplicates and order does not mean anything. - # Leave algorithmic efficiency to Python's implementation of sets. - neo4j_descendant_id_set = frozenset(neo4j_descendant_ids) - oss_descendant_id_set = frozenset(oss_descendant_ids) - - if not neo4j_descendant_id_set.symmetric_difference(oss_descendant_id_set): - # Since the descendants are unchanged, check the ancestors to decide if re-indexing must be done. - neo4j_ancestor_id_set = frozenset(neo4j_ancestor_ids) - oss_ancestor_id_set = frozenset(oss_ancestor_ids) - - if not neo4j_ancestor_id_set.symmetric_difference(oss_ancestor_id_set): - relationships_changed = False - - return relationships_changed - - def _get_existing_entity_relationships(self, entity_uuid:str, es_url:str, es_index:str): - # Form a simple match query, and retrieve an existing OpenSearch document for entity_id, if it exists. - # N.B. This query does not pass through the AWS Gateway, so we will not have to retrieve the - # result from an AWS S3 Bucket. If it is larger than 10Mb, we will get it directly. - QDSL_SEARCH_ENDPOINT_MATCH_UUID_PATTERN =( - '{ ' + \ - '"query": { "bool": { "filter": [ {"terms": {"uuid": [""]}} ] } }' + \ - ', "fields": ["ancestor_ids", "descendant_ids"] ,"_source": false' + \ - ' }') - - qdsl_search_query_payload_string = QDSL_SEARCH_ENDPOINT_MATCH_UUID_PATTERN.replace('' - , entity_uuid) - json_query_dict = json.loads(qdsl_search_query_payload_string) - opensearch_response = execute_opensearch_query(query_against='_search' - , request=None - , index=es_index - , es_url=es_url - , query=json_query_dict - , request_params={'filter_path': 'hits.hits'}) - - # Verify the expected response was returned. If no document was returned, proceed with a re-indexing. - # If exactly one document is returned, distill it down to JSON used to update document fields. - if opensearch_response.status_code != 200: - logger.error(f"Unable to return ['hits']['hits'] content of opensearch_response for" - f" es_url={es_url}, with" - f" status_code={opensearch_response.status_code}.") - raise Exception(f"OpenSearch query return a status code of '{opensearch_response.status_code}'." - f" See logs.") - - resp_json = opensearch_response.json() - - if not resp_json or \ - 'hits' not in resp_json or \ - 'hits' not in resp_json['hits'] or \ - len(resp_json['hits']['hits']) == 0: - # If OpenSearch does not have an existing document for this entity, drop down to reindexing. - # Anything else Falsy JSON could be an unexpected result for an existing entity, but fall back to - # reindexing under those circumstances, too. - pass - elif len(resp_json['hits']['hits']) != 1: - # From the index populated with everything, raise an exception if exactly one document for the - # current entity is not what is returned. - logger.error(f"Found {len(resp_json['hits']['hits'])} documents instead" - f" of a single document searching resp_json['hits']['hits'] from opensearch_response with" - f" es_url={es_url}," - f" json_query_dict={json_query_dict}.") - raise Exception(f"Unexpected response to OpenSearch query for a single entity document." - f" See logs.") - elif 'fields' not in resp_json['hits']['hits'][0]: - # The QDSL query may return exactly one resp_json['hits']['hits'] if called for an - # entity which has a document but not the fields searched for e.g. a Donor being - # created with no ancestors or descendants yet. Return empty - # JSON rather than indicating this is an error. - return {} - else: - # Strip away whatever remains of OpenSearch artifacts, such as _source, to get to the - # exact JSON of this entity's existing, so that can become a part of the other documents which - # retain a snapshot of this entity, such as this entity's ancestors, this entity's descendants, - # Collection entity's containing this entity, etc. - # N.B. Many such artifacts should have already been stripped through usage of the filter_path. - return resp_json['hits']['hits'][0] - - def _directly_modify_related_entities( self, es_url:str, es_index:str, entity_id:str - , neo4j_ancestor_ids:list[str], neo4j_descendant_ids:list[str] - , neo4j_collection_ids:list[str], neo4j_upload_ids:list[str]): - # Directly update the OpenSearch documents for each associated entity with a current snapshot of - # this entity, since relationships graph of this entity is unchanged in Neo4j since the - # last time this entity was indexed. - # - # Given updated JSON for the OpenSearch document of this entity, replace the snapshot of - # this entity's JSON in every OpenSearch document it appears in. - # Each document for an identifier in the 'ancestors' list of this entity will need to have one - # member of its 'descendants' list updated. Similarly, each OpenSearch document for a descendant - # entity will need one member of its 'ancestors' list updated. - # 'immediate_ancestors' will be updated for every entity which is an 'immediate_descendant'. - # 'immediate_descendants' will be updated for every entity which is an 'immediate_ancestor'. - - # Retrieve the entity details - # This returned entity dict (if Dataset) has removed ingest_metadata.files and - # ingest_metadata.metadata sub fields with empty string values when call_entity_api() gets called - revised_entity_doc_dict = self.call_entity_api(entity_id=entity_id - , endpoint_base='documents') - - painless_query = f'for (prop in )' \ - f' {{if (ctx._source.containsKey(prop))' \ - f' {{for (int i = 0; i < ctx._source[prop].length; ++i)' \ - f' {{if (ctx._source[prop][i][\'uuid\'] == params.modified_entity_uuid)' \ - f' {{ctx._source[prop][i] = params.revised_related_entity}} }} }} }}' - QDSL_UPDATE_ENDPOINT_WITH_ID_PARAM = \ - f'{{\"script\": {{' \ - f' \"lang\": \"painless\",' \ - f' \"source\": \"{painless_query}\",' \ - f' \"params\": {{' \ - f' \"retry_on_conflict\": {self.es_retry_on_conflict_param_value},' \ - f' \"modified_entity_uuid\": \"\",' \ - f' \"revised_related_entity\": ' \ - f' }}' \ - f' }} }}' - # Eliminate taking advantage of our knowledge that an ancestor only needs its descendants lists - # updated and a descendant only needs its ancestor lists updated. Instead, focus upon consolidating - # updates into a single query for the related entity's document to avoid HTTP 409 Conflict - # problems if too many queries post for a single document. - related_entity_target_elements = [ 'immediate_descendants' - , 'descendants' - , 'immediate_ancestors' - , 'ancestors' - , 'source_samples' - , 'origin_samples' - , 'datasets'] - - related_entity_ids = neo4j_ancestor_ids + neo4j_descendant_ids + neo4j_collection_ids + neo4j_upload_ids - - for related_entity_id in related_entity_ids: - qdsl_update_payload_string = QDSL_UPDATE_ENDPOINT_WITH_ID_PARAM \ - .replace('', entity_id) \ - .replace('', str(related_entity_target_elements)) \ - .replace('', json.dumps(revised_entity_doc_dict)) - json_query_dict = json.loads(qdsl_update_payload_string) - - opensearch_response = execute_opensearch_query(query_against=f"_update/{related_entity_id}" - , request=None - , index=es_index - , es_url=es_url - , query=json_query_dict) - # Expect an HTTP 200 on a successful update, and an HTTP 404 if es_index does not - # contain a document for related_entity_id. Other response codes are errors. - if opensearch_response.status_code not in [200, 404]: - logger.error(f"Unable to directly update elements of document with" - f" related_entity_target_elements={related_entity_target_elements}," - f" related_entity_id={related_entity_id}." - f" Got status_code={opensearch_response.status_code} at" - f" es_url={es_url}," - f" endoint '_update/{related_entity_id}' with" - f" qdsl_update_payload_string={qdsl_update_payload_string}.") - if opensearch_response.text: - logger.error(f"OpenSearch message for {opensearch_response.status_code} code:" - f" '{opensearch_response.text}'.") - raise Exception(f"OpenSearch query returned a status code of " - f" '{opensearch_response.status_code}'. See logs.") - elif opensearch_response.status_code == 404: - logger.info(f"Call to QDSL _update got HTTP response code" - f" {opensearch_response.status_code}, which is ignored because it" - f" should indicate" - f" related_entity_target_elements={related_entity_target_elements}" - f" is not in es_index={es_index}.") - - def _exec_reindex_entity_to_index_group_by_id(self, entity_id: str(32), index_group: str): - logger.info(f"Start executing _exec_reindex_entity_to_index_group_by_id() on" - f" entity_id: {entity_id}, index_group: {index_group}") - - entity = self.call_entity_api(entity_id=entity_id - , endpoint_base='documents') - if entity['entity_type'] == 'Collection': - self.translate_collection(entity_id=entity_id, reindex=True) - elif entity['entity_type'] == 'Upload': - self.translate_upload(entity_id=entity_id, reindex=True) - else: - self._transform_and_write_entity_to_index_group(entity=entity - , index_group=index_group) - logger.info(f"Finished executing _exec_reindex_entity_to_index_group_by_id()") + + # # Commented out by Zhou to avoid 409 conflicts - 7/20/2024 + # def _relationships_changed_since_indexed( self, neo4j_ancestor_ids:list[str], neo4j_descendant_ids:list[str], + # existing_oss_doc:json): + # # Start with the safe assumption that relationships have changed, and + # # only toggle if verified unchanged below + # relationships_changed = True + + # # Get the ancestors and descendants of this entity as they exist in OpenSearch. + # oss_ancestor_ids = [] + # if existing_oss_doc and 'fields' in existing_oss_doc and 'ancestor_ids' in existing_oss_doc['fields']: + # oss_ancestor_ids = existing_oss_doc['fields']['ancestor_ids'] + # oss_descendant_ids = [] + # if existing_oss_doc and 'fields' in existing_oss_doc and 'descendant_ids' in existing_oss_doc['fields']: + # oss_descendant_ids = existing_oss_doc['fields']['descendant_ids'] + + # # If the ancestor list and descendant list on the OpenSearch document for this entity are + # # not both exactly the same set of IDs as in Neo4j, relationships have changed and this + # # entity must be re-indexed rather than just updating existing documents for associated entities. + # # + # # These lists are implicitly sets, as they do not have duplicates and order does not mean anything. + # # Leave algorithmic efficiency to Python's implementation of sets. + # neo4j_descendant_id_set = frozenset(neo4j_descendant_ids) + # oss_descendant_id_set = frozenset(oss_descendant_ids) + + # if not neo4j_descendant_id_set.symmetric_difference(oss_descendant_id_set): + # # Since the descendants are unchanged, check the ancestors to decide if re-indexing must be done. + # neo4j_ancestor_id_set = frozenset(neo4j_ancestor_ids) + # oss_ancestor_id_set = frozenset(oss_ancestor_ids) + + # if not neo4j_ancestor_id_set.symmetric_difference(oss_ancestor_id_set): + # relationships_changed = False + + # return relationships_changed + + + # # Commented out by Zhou to avoid 409 conflicts - 7/20/2024 + # def _get_existing_entity_relationships(self, entity_uuid:str, es_url:str, es_index:str): + # # Form a simple match query, and retrieve an existing OpenSearch document for entity_id, if it exists. + # # N.B. This query does not pass through the AWS Gateway, so we will not have to retrieve the + # # result from an AWS S3 Bucket. If it is larger than 10Mb, we will get it directly. + # QDSL_SEARCH_ENDPOINT_MATCH_UUID_PATTERN =( + # '{ ' + \ + # '"query": { "bool": { "filter": [ {"terms": {"uuid": [""]}} ] } }' + \ + # ', "fields": ["ancestor_ids", "descendant_ids"] ,"_source": false' + \ + # ' }') + + # qdsl_search_query_payload_string = QDSL_SEARCH_ENDPOINT_MATCH_UUID_PATTERN.replace('' + # , entity_uuid) + # json_query_dict = json.loads(qdsl_search_query_payload_string) + # opensearch_response = execute_opensearch_query(query_against='_search' + # , request=None + # , index=es_index + # , es_url=es_url + # , query=json_query_dict + # , request_params={'filter_path': 'hits.hits'}) + + # # Verify the expected response was returned. If no document was returned, proceed with a re-indexing. + # # If exactly one document is returned, distill it down to JSON used to update document fields. + # if opensearch_response.status_code != 200: + # logger.error(f"Unable to return ['hits']['hits'] content of opensearch_response for" + # f" es_url={es_url}, with" + # f" status_code={opensearch_response.status_code}.") + # raise Exception(f"OpenSearch query return a status code of '{opensearch_response.status_code}'." + # f" See logs.") + + # resp_json = opensearch_response.json() + + # if not resp_json or \ + # 'hits' not in resp_json or \ + # 'hits' not in resp_json['hits'] or \ + # len(resp_json['hits']['hits']) == 0: + # # If OpenSearch does not have an existing document for this entity, drop down to reindexing. + # # Anything else Falsy JSON could be an unexpected result for an existing entity, but fall back to + # # reindexing under those circumstances, too. + # pass + # elif len(resp_json['hits']['hits']) != 1: + # # From the index populated with everything, raise an exception if exactly one document for the + # # current entity is not what is returned. + # logger.error(f"Found {len(resp_json['hits']['hits'])} documents instead" + # f" of a single document searching resp_json['hits']['hits'] from opensearch_response with" + # f" es_url={es_url}," + # f" json_query_dict={json_query_dict}.") + # raise Exception(f"Unexpected response to OpenSearch query for a single entity document." + # f" See logs.") + # elif 'fields' not in resp_json['hits']['hits'][0]: + # # The QDSL query may return exactly one resp_json['hits']['hits'] if called for an + # # entity which has a document but not the fields searched for e.g. a Donor being + # # created with no ancestors or descendants yet. Return empty + # # JSON rather than indicating this is an error. + # return {} + # else: + # # Strip away whatever remains of OpenSearch artifacts, such as _source, to get to the + # # exact JSON of this entity's existing, so that can become a part of the other documents which + # # retain a snapshot of this entity, such as this entity's ancestors, this entity's descendants, + # # Collection entity's containing this entity, etc. + # # N.B. Many such artifacts should have already been stripped through usage of the filter_path. + # return resp_json['hits']['hits'][0] + + + + # # Commented out by Zhou to avoid 409 conflicts - 7/20/2024 + # def _directly_modify_related_entities( self, es_url:str, es_index:str, entity_id:str + # , neo4j_ancestor_ids:list[str], neo4j_descendant_ids:list[str] + # , neo4j_collection_ids:list[str], neo4j_upload_ids:list[str]): + # # Directly update the OpenSearch documents for each associated entity with a current snapshot of + # # this entity, since relationships graph of this entity is unchanged in Neo4j since the + # # last time this entity was indexed. + # # + # # Given updated JSON for the OpenSearch document of this entity, replace the snapshot of + # # this entity's JSON in every OpenSearch document it appears in. + # # Each document for an identifier in the 'ancestors' list of this entity will need to have one + # # member of its 'descendants' list updated. Similarly, each OpenSearch document for a descendant + # # entity will need one member of its 'ancestors' list updated. + # # 'immediate_ancestors' will be updated for every entity which is an 'immediate_descendant'. + # # 'immediate_descendants' will be updated for every entity which is an 'immediate_ancestor'. + + # # Retrieve the entity details + # # This returned entity dict (if Dataset) has removed ingest_metadata.files and + # # ingest_metadata.metadata sub fields with empty string values when call_entity_api() gets called + # revised_entity_doc_dict = self.call_entity_api(entity_id=entity_id + # , endpoint_base='documents') + + # painless_query = f'for (prop in )' \ + # f' {{if (ctx._source.containsKey(prop))' \ + # f' {{for (int i = 0; i < ctx._source[prop].length; ++i)' \ + # f' {{if (ctx._source[prop][i][\'uuid\'] == params.modified_entity_uuid)' \ + # f' {{ctx._source[prop][i] = params.revised_related_entity}} }} }} }}' + # QDSL_UPDATE_ENDPOINT_WITH_ID_PARAM = \ + # f'{{\"script\": {{' \ + # f' \"lang\": \"painless\",' \ + # f' \"source\": \"{painless_query}\",' \ + # f' \"params\": {{' \ + # f' \"modified_entity_uuid\": \"\",' \ + # f' \"revised_related_entity\": ' \ + # f' }}' \ + # f' }} }}' + # # Eliminate taking advantage of our knowledge that an ancestor only needs its descendants lists + # # updated and a descendant only needs its ancestor lists updated. Instead, focus upon consolidating + # # updates into a single query for the related entity's document to avoid HTTP 409 Conflict + # # problems if too many queries post for a single document. + # related_entity_target_elements = [ 'immediate_descendants' + # , 'descendants' + # , 'immediate_ancestors' + # , 'ancestors' + # , 'source_samples' + # , 'origin_samples' + # , 'datasets'] + + # related_entity_ids = neo4j_ancestor_ids + neo4j_descendant_ids + neo4j_collection_ids + neo4j_upload_ids + + # for related_entity_id in related_entity_ids: + # qdsl_update_payload_string = QDSL_UPDATE_ENDPOINT_WITH_ID_PARAM \ + # .replace('', entity_id) \ + # .replace('', str(related_entity_target_elements)) \ + # .replace('', json.dumps(revised_entity_doc_dict)) + # json_query_dict = json.loads(qdsl_update_payload_string) + + # # Try to avoid 409 + # query_params = { + # 'retry_on_conflict': str(self.es_retry_on_conflict_param_value), + # 'refresh': 'true' + # } + + # opensearch_response = execute_opensearch_query(query_against=f"_update/{related_entity_id}" + # , request=None + # , index=es_index + # , es_url=es_url + # , query=json_query_dict + # , request_params=query_params) + # # Expect an HTTP 200 on a successful update, and an HTTP 404 if es_index does not + # # contain a document for related_entity_id. Other response codes are errors. + # if opensearch_response.status_code not in [200, 404]: + # logger.error(f"Unable to directly update document of {related_entity_id} with using the latest version of {entity_id} in" + # f" related_entity_target_elements={related_entity_target_elements}," + # f" endpoint '{es_index}/_update/{related_entity_id}'" + # f" Got status_code={opensearch_response.status_code}.") + + # if opensearch_response.text: + # logger.error(f"OpenSearch message for {opensearch_response.status_code} code:" + # f" '{opensearch_response.text}'.") + # raise Exception(f"OpenSearch query returned a status code of " + # f" '{opensearch_response.status_code}'. See logs.") + # elif opensearch_response.status_code == 404: + # logger.info(f"Call to QDSL _update got HTTP response code" + # f" {opensearch_response.status_code}, which is ignored because it" + # f" should indicate" + # f" related_entity_target_elements={related_entity_target_elements}" + # f" is not in es_index={es_index}.") + + + + # # Commented out by Zhou to avoid 409 conflicts - 7/20/2024 + # def _exec_reindex_entity_to_index_group_by_id(self, entity_id: str(32), index_group: str): + # logger.info(f"Start executing _exec_reindex_entity_to_index_group_by_id() on" + # f" entity_id: {entity_id}, index_group: {index_group}") + + # entity = self.call_entity_api(entity_id=entity_id + # , endpoint_base='documents') + # if entity['entity_type'] == 'Collection': + # self.translate_collection(entity_id=entity_id, reindex=True) + # elif entity['entity_type'] == 'Upload': + # self.translate_upload(entity_id=entity_id, reindex=True) + # else: + # self._transform_and_write_entity_to_index_group(entity=entity + # , index_group=index_group) + + # logger.info(f"Finished executing _exec_reindex_entity_to_index_group_by_id()") + def _transform_and_write_entity_to_index_group(self, entity:dict, index_group:str): logger.info(f"Start executing direct '{index_group}' updates for" @@ -503,8 +521,7 @@ def translate(self, entity_id): # Retrieve the entity details # This returned entity dict (if Dataset) has removed ingest_metadata.files and # ingest_metadata.metadata sub fields with empty string values when call_entity_api() gets called - entity = self.call_entity_api(entity_id=entity_id - , endpoint_base='documents') + entity = self.call_entity_api(entity_id=entity_id, endpoint_base='documents') logger.info(f"Start executing translate() on {entity['entity_type']} of uuid: {entity_id}") @@ -513,126 +530,171 @@ def translate(self, entity_id): # have a DOI. But entity-api may still request such Collections be indexed, particularly right # after the Collection becomes visible to the public. try: - self.translate_collection( entity_id - ,reindex=True) + self.translate_collection(entity_id ,reindex=True) except Exception as e: logger.error(f"Unable to index {entity['entity_type']} due to e={str(e)}") elif entity['entity_type'] == 'Upload': self.translate_upload(entity_id, reindex=True) else: - # For newly created entities and entities whose relationships in Neo4j have changed since the - # entity was indexed into OpenSearch, use "reindex" code to bring the OpenSearch document - # up-to-date for the entity and all the entities it relates to. - # - # For entities previously indexed into OpenSearch whose relationships in Neo4j have not changed, - # just index the document for the entity. Then update fields belong to related entities which - # refer to the entity i.e. the 'ancestors' list of this entity's 'descendants', the 'descendants' - # list of this entity's 'ancestors', etc. - # N.B. As of Spring '24, this shortcut can only be done for the 'entities' indices, not for - # the 'portal' indices, which hold transformed content. - - # get URL for the OpenSearch server - es_url = self.INDICES['indices']['entities']['elasticsearch']['url'].strip('/') - - # Get the ancestors and descendants of this entity as they exist in Neo4j, and as they - # exist in OpenSearch. - neo4j_ancestor_ids = self.call_entity_api(entity_id=entity_id - , endpoint_base='ancestors' - , endpoint_suffix=None - , url_property='uuid') - neo4j_descendant_ids = self.call_entity_api(entity_id=entity_id - , endpoint_base='descendants' - , endpoint_suffix=None - , url_property='uuid') - # If working with a Dataset, it may be copied into ElasticSearch documents for - # Collections and Uploads, so identify any of those which must be reindexed. - neo4j_collection_ids = [] - neo4j_upload_ids = [] - if entity['entity_type'] == 'Dataset': - neo4j_collection_ids = self.call_entity_api(entity_id=entity_id - , endpoint_base='entities' - , endpoint_suffix='collections' + # BEGIN - Below block is the original implementation prior to the direct document update + # against Elasticsearch. Added back by Zhou to avoid 409 conflicts - 7/20/2024 + + # Reindex the entity itself first + self._call_indexer(entity=entity, delete_existing_doc_first=True) + + previous_revision_ids = [] + next_revision_ids = [] + + # Get the ancestors and descendants of this entity as they exist in Neo4j + ancestor_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='ancestors' + , endpoint_suffix=None + , url_property='uuid') + descendant_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='descendants' + , endpoint_suffix=None + , url_property='uuid') + + # Only Dataset/Publication entities may have previous/next revisions + if entity['entity_type'] in ['Dataset', 'Publication']: + previous_revision_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='previous_revisions' + , endpoint_suffix=None , url_property='uuid') - neo4j_upload_ids = self.call_entity_api(entity_id=entity_id - , endpoint_base='entities' - , endpoint_suffix='uploads' + next_revision_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='next_revisions' + , endpoint_suffix=None , url_property='uuid') - # Use the index with documents for all entities to determine the relationships of the - # current entity as stored in OpenSearch. Consider it safe to assume documents in other - # indices for the same entity have exactly the same relationships unless there was an - # indexing problem. - # - # "Changed relationships" only applies to differences in the ancestors and descendants of - # an entity. Uploads and Collections which reference a Dataset entity, for example, do not - # indicate a change of relationships which would result in reindexing instead of directly updating. - index_with_everything = self.INDICES['indices']['entities']['private'] - existing_entity_json = self._get_existing_entity_relationships(entity_uuid=entity['uuid'] - , es_url=es_url - , es_index=index_with_everything) - - relationships_changed = self._relationships_changed_since_indexed(neo4j_ancestor_ids=neo4j_ancestor_ids - , neo4j_descendant_ids=neo4j_descendant_ids - , existing_oss_doc=existing_entity_json) - - # Now that it has been determined whether relationships have changed for this entity, - # reindex the entity itself first before dealing with other documents for related entities. - self._call_indexer(entity=entity - , delete_existing_doc_first=True) - - if relationships_changed: - logger.info(f"Related entities for {entity_id} have changed in Neo4j. Reindexing") - # Since the entity is new or the Neo4j relationships with related entities have changed, - # reindex the current entity - self._reindex_related_entities(entity_id=entity_id - , entity_type=entity['entity_type'] - , neo4j_ancestor_ids=neo4j_ancestor_ids - , neo4j_descendant_ids=neo4j_descendant_ids - , neo4j_collection_ids=neo4j_collection_ids - , neo4j_upload_ids=neo4j_upload_ids) - else: - logger.info(f"Related entities for {entity_id} are unchanged in Neo4j." - f" Directly updating index docs of related entities.") - # Since the entity's relationships are identical in Neo4j and OpenSearch, just update - # documents in the entities indices with a copy of the current entity. - for es_index in [ self.INDICES['indices']['entities']['private'] - ,self.INDICES['indices']['entities']['public'] ]: - # Since _directly_modify_related_entities() will only _update documents which already - # exist in an index, no need to test if this entity belongs in the public index. - self._directly_modify_related_entities( es_url=es_url - , es_index=es_index - , entity_id=entity_id - , neo4j_ancestor_ids=neo4j_ancestor_ids - , neo4j_descendant_ids=neo4j_descendant_ids - , neo4j_collection_ids= neo4j_collection_ids - , neo4j_upload_ids=neo4j_upload_ids) - - # Until the portal indices support direct updates using correctly transformed documents, - # continue doing a reindex for the updated entity and all the related entities which - # copy data from it for their documents. - previous_revision_ids = [] - next_revision_ids = [] - - # Only Dataset/Publication entities may have previous/next revisions - if entity['entity_type'] in ['Dataset', 'Publication']: - previous_revision_ids = self.call_entity_api(entity_id=entity_id, - endpoint_base='previous_revisions', - endpoint_suffix=None, url_property='uuid') - next_revision_ids = self.call_entity_api(entity_id=entity_id, - endpoint_base='next_revisions', - endpoint_suffix=None, url_property='uuid') - - # All unique entity ids in the path excluding the entity itself - target_ids = set(neo4j_ancestor_ids + neo4j_descendant_ids + previous_revision_ids + - next_revision_ids + neo4j_collection_ids + neo4j_upload_ids) - - # Reindex the entity, and all related entities which have details of - # this entity in their document. - self._transform_and_write_entity_to_index_group(entity=entity - , index_group='portal') - with concurrent.futures.ThreadPoolExecutor() as executor: - futures_list = [executor.submit(self._exec_reindex_entity_to_index_group_by_id, related_entity_uuid, 'portal') for related_entity_uuid in target_ids] + # All unique entity ids in the path excluding the entity itself + target_ids = set(ancestor_ids + descendant_ids + previous_revision_ids + next_revision_ids) + + # Reindex the rest of the entities in the list + with concurrent.futures.ThreadPoolExecutor() as executor: + futures_list = [executor.submit(self.reindex_entity, uuid) for uuid in target_ids] + for f in concurrent.futures.as_completed(futures_list): + result = f.result() + + # END - Above block is the original implementation prior to the direct document update + # against Elasticsearch. Added back by Zhou to avoid 409 conflicts - 7/20/2024 + + + + # # Commented out by Zhou to avoid 409 conflicts - 7/20/2024 + # # For newly created entities and entities whose relationships in Neo4j have changed since the + # # entity was indexed into OpenSearch, use "reindex" code to bring the OpenSearch document + # # up-to-date for the entity and all the entities it relates to. + # # + # # For entities previously indexed into OpenSearch whose relationships in Neo4j have not changed, + # # just index the document for the entity. Then update fields belong to related entities which + # # refer to the entity i.e. the 'ancestors' list of this entity's 'descendants', the 'descendants' + # # list of this entity's 'ancestors', etc. + # # N.B. As of Spring '24, this shortcut can only be done for the 'entities' indices, not for + # # the 'portal' indices, which hold transformed content. + + # # get URL for the OpenSearch server + # es_url = self.INDICES['indices']['entities']['elasticsearch']['url'].strip('/') + + # # Get the ancestors and descendants of this entity as they exist in Neo4j, and as they + # # exist in OpenSearch. + # neo4j_ancestor_ids = self.call_entity_api(entity_id=entity_id + # , endpoint_base='ancestors' + # , endpoint_suffix=None + # , url_property='uuid') + # neo4j_descendant_ids = self.call_entity_api(entity_id=entity_id + # , endpoint_base='descendants' + # , endpoint_suffix=None + # , url_property='uuid') + # # If working with a Dataset, it may be copied into ElasticSearch documents for + # # Collections and Uploads, so identify any of those which must be reindexed. + # neo4j_collection_ids = [] + # neo4j_upload_ids = [] + # if entity['entity_type'] == 'Dataset': + # neo4j_collection_ids = self.call_entity_api(entity_id=entity_id + # , endpoint_base='entities' + # , endpoint_suffix='collections' + # , url_property='uuid') + # neo4j_upload_ids = self.call_entity_api(entity_id=entity_id + # , endpoint_base='entities' + # , endpoint_suffix='uploads' + # , url_property='uuid') + + # # Use the index with documents for all entities to determine the relationships of the + # # current entity as stored in OpenSearch. Consider it safe to assume documents in other + # # indices for the same entity have exactly the same relationships unless there was an + # # indexing problem. + # # + # # "Changed relationships" only applies to differences in the ancestors and descendants of + # # an entity. Uploads and Collections which reference a Dataset entity, for example, do not + # # indicate a change of relationships which would result in reindexing instead of directly updating. + # index_with_everything = self.INDICES['indices']['entities']['private'] + # existing_entity_json = self._get_existing_entity_relationships(entity_uuid=entity['uuid'] + # , es_url=es_url + # , es_index=index_with_everything) + + # relationships_changed = self._relationships_changed_since_indexed(neo4j_ancestor_ids=neo4j_ancestor_ids + # , neo4j_descendant_ids=neo4j_descendant_ids + # , existing_oss_doc=existing_entity_json) + + # # Now that it has been determined whether relationships have changed for this entity, + # # reindex the entity itself first before dealing with other documents for related entities. + # self._call_indexer(entity=entity + # , delete_existing_doc_first=True) + + # if relationships_changed: + # logger.info(f"Related entities for {entity_id} have changed in Neo4j. Reindexing") + # # Since the entity is new or the Neo4j relationships with related entities have changed, + # # reindex the current entity + # self._reindex_related_entities(entity_id=entity_id + # , entity_type=entity['entity_type'] + # , neo4j_ancestor_ids=neo4j_ancestor_ids + # , neo4j_descendant_ids=neo4j_descendant_ids + # , neo4j_collection_ids=neo4j_collection_ids + # , neo4j_upload_ids=neo4j_upload_ids) + # else: + # logger.info(f"Related entities for {entity_id} are unchanged in Neo4j." + # f" Directly updating index docs of related entities.") + # # Since the entity's relationships are identical in Neo4j and OpenSearch, just update + # # documents in the entities indices with a copy of the current entity. + # for es_index in [ self.INDICES['indices']['entities']['private'] + # ,self.INDICES['indices']['entities']['public'] ]: + # # Since _directly_modify_related_entities() will only _update documents which already + # # exist in an index, no need to test if this entity belongs in the public index. + # self._directly_modify_related_entities( es_url=es_url + # , es_index=es_index + # , entity_id=entity_id + # , neo4j_ancestor_ids=neo4j_ancestor_ids + # , neo4j_descendant_ids=neo4j_descendant_ids + # , neo4j_collection_ids= neo4j_collection_ids + # , neo4j_upload_ids=neo4j_upload_ids) + + # # Until the portal indices support direct updates using correctly transformed documents, + # # continue doing a reindex for the updated entity and all the related entities which + # # copy data from it for their documents. + # previous_revision_ids = [] + # next_revision_ids = [] + + # # Only Dataset/Publication entities may have previous/next revisions + # if entity['entity_type'] in ['Dataset', 'Publication']: + # previous_revision_ids = self.call_entity_api(entity_id=entity_id, + # endpoint_base='previous_revisions', + # endpoint_suffix=None, url_property='uuid') + # next_revision_ids = self.call_entity_api(entity_id=entity_id, + # endpoint_base='next_revisions', + # endpoint_suffix=None, url_property='uuid') + + # # All unique entity ids in the path excluding the entity itself + # target_ids = set(neo4j_ancestor_ids + neo4j_descendant_ids + previous_revision_ids + + # next_revision_ids + neo4j_collection_ids + neo4j_upload_ids) + + # # Reindex the entity, and all related entities which have details of + # # this entity in their document. + # self._transform_and_write_entity_to_index_group(entity=entity + # , index_group='portal') + # with concurrent.futures.ThreadPoolExecutor() as executor: + # futures_list = [executor.submit(self._exec_reindex_entity_to_index_group_by_id, related_entity_uuid, 'portal') for related_entity_uuid in target_ids] + logger.info(f"Finished executing translate() on {entity['entity_type']} of uuid: {entity_id}") except Exception: @@ -825,8 +887,7 @@ def translate_upload(self, entity_id, reindex=False): default_private_index = self.INDICES['indices'][self.DEFAULT_INDEX_WITHOUT_PREFIX]['private'] # Retrieve the upload entity details - upload = self.call_entity_api(entity_id=entity_id - , endpoint_base='documents') + upload = self.call_entity_api(entity_id=entity_id, endpoint_base='documents') self.add_datasets_to_entity(upload) self._entity_keys_rename(upload) @@ -835,9 +896,9 @@ def translate_upload(self, entity_id, reindex=False): self.add_calculated_fields(upload) self._index_doc_directly_to_es_index(entity=upload - ,document=json.dumps(upload) - ,es_index=default_private_index - ,delete_existing_doc_first=reindex) + , document=json.dumps(upload) + , es_index=default_private_index + , delete_existing_doc_first=reindex) logger.info(f"Finished executing translate_upload() for {entity_id}") except Exception as e: @@ -898,7 +959,10 @@ def translate_donor_tree(self, entity_id): try: logger.info(f"Start executing translate_donor_tree() for donor of uuid: {entity_id}") - descendant_uuids = self.call_entity_api(entity_id=entity_id, endpoint_base='descendants', endpoint_suffix=None, url_property='uuid') + descendant_uuids = self.call_entity_api(entity_id=entity_id + , endpoint_base='descendants' + , endpoint_suffix=None + , url_property='uuid') # Index the donor entity itself donor = self.call_entity_api(entity_id, 'documents') @@ -928,8 +992,7 @@ def reindex_entity(self, uuid): logger.info(f"Start executing reindex_entity() on uuid: {uuid}") entity_dict = self.call_entity_api(uuid, 'documents') - self._call_indexer(entity=entity_dict - , delete_existing_doc_first=True) + self._call_indexer(entity=entity_dict, delete_existing_doc_first=True) logger.info(f"Finished executing reindex_entity() on uuid: {uuid}") @@ -980,29 +1043,33 @@ def create_request_headers_for_auth(self, token): return headers_dict - def _reindex_related_entities( self, entity_id:str, entity_type:str, neo4j_ancestor_ids:list[str] - , neo4j_descendant_ids:list[str], neo4j_collection_ids:list[str] - , neo4j_upload_ids:list[str]): - # If entity is new or Neo4j relationships for entity have changed, do a reindex with each ID - # which has entity as an ancestor or descendant. This is a costlier operation than - # directly updating documents for related entities. - previous_revision_ids = [] - next_revision_ids = [] + + # # Commented out by Zhou to avoid 409 conflicts - 7/20/2024 + # def _reindex_related_entities( self, entity_id:str, entity_type:str, neo4j_ancestor_ids:list[str] + # , neo4j_descendant_ids:list[str], neo4j_collection_ids:list[str] + # , neo4j_upload_ids:list[str]): + # # If entity is new or Neo4j relationships for entity have changed, do a reindex with each ID + # # which has entity as an ancestor or descendant. This is a costlier operation than + # # directly updating documents for related entities. + # previous_revision_ids = [] + # next_revision_ids = [] + + # # Only Dataset/Publication entities may have previous/next revisions + # if entity_type in ['Dataset', 'Publication']: + # previous_revision_ids = self.call_entity_api(entity_id=entity_id, endpoint_base='previous_revisions', endpoint_suffix=None, url_property='uuid') + # next_revision_ids = self.call_entity_api(entity_id=entity_id, endpoint_base='next_revisions', endpoint_suffix=None, url_property='uuid') + + # # All unique entity ids which might refer to the entity of entity_id + # target_ids = set(neo4j_ancestor_ids + neo4j_descendant_ids + previous_revision_ids + next_revision_ids + + # neo4j_collection_ids + neo4j_upload_ids) - # Only Dataset/Publication entities may have previous/next revisions - if entity_type in ['Dataset', 'Publication']: - previous_revision_ids = self.call_entity_api(entity_id=entity_id, endpoint_base='previous_revisions', endpoint_suffix=None, url_property='uuid') - next_revision_ids = self.call_entity_api(entity_id=entity_id, endpoint_base='next_revisions', endpoint_suffix=None, url_property='uuid') + # # Reindex the rest of the entities in the list + # with concurrent.futures.ThreadPoolExecutor() as executor: + # futures_list = [executor.submit(self.reindex_entity, uuid) for uuid in target_ids] + # for f in concurrent.futures.as_completed(futures_list): + # result = f.result() - # All unique entity ids which might refer to the entity of entity_id - target_ids = set(neo4j_ancestor_ids + neo4j_descendant_ids + previous_revision_ids + next_revision_ids + - neo4j_collection_ids + neo4j_upload_ids) - # Reindex the rest of the entities in the list - with concurrent.futures.ThreadPoolExecutor() as executor: - futures_list = [executor.submit(self.reindex_entity, uuid) for uuid in target_ids] - for f in concurrent.futures.as_completed(futures_list): - result = f.result() # Note: this entity dict input (if Dataset) has already removed ingest_metadata.files and # ingest_metadata.metadata sub fields with empty string values from previous call @@ -1222,7 +1289,10 @@ def generate_doc(self, entity, return_type): # Do not call /ancestors/ directly to avoid performance/timeout issue # Get back a list of ancestor uuids first - ancestor_ids = self.call_entity_api(entity_id=entity_id, endpoint_base='ancestors', endpoint_suffix=None, url_property='uuid') + ancestor_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='ancestors' + , endpoint_suffix=None + , url_property='uuid') for ancestor_uuid in ancestor_ids: # No need to call self.prepare_dataset() here because # self.call_entity_api() already handled that @@ -1236,17 +1306,26 @@ def generate_doc(self, entity, return_type): donor = copy.copy(a) break - descendant_ids = self.call_entity_api(entity_id=entity_id, endpoint_base='descendants', endpoint_suffix=None, url_property='uuid') + descendant_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='descendants' + , endpoint_suffix=None + , url_property='uuid') for descendant_uuid in descendant_ids: descendant_dict = self.call_entity_api(descendant_uuid, 'documents') descendants.append(descendant_dict) - immediate_ancestor_ids = self.call_entity_api(entity_id=entity_id, endpoint_base='parents', endpoint_suffix=None, url_property='uuid') + immediate_ancestor_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='parents' + , endpoint_suffix=None + , url_property='uuid') for immediate_ancestor_uuid in immediate_ancestor_ids: immediate_ancestor_dict = self.call_entity_api(immediate_ancestor_uuid, 'documents') immediate_ancestors.append(immediate_ancestor_dict) - immediate_descendant_ids = self.call_entity_api(entity_id=entity_id, endpoint_base='children', endpoint_suffix=None, url_property='uuid') + immediate_descendant_ids = self.call_entity_api(entity_id=entity_id + , endpoint_base='children' + , endpoint_suffix=None + , url_property='uuid') for immediate_descendant_uuid in immediate_descendant_ids: immediate_descendant_dict = self.call_entity_api(immediate_descendant_uuid, 'documents') immediate_descendants.append(immediate_descendant_dict) @@ -1287,11 +1366,14 @@ def generate_doc(self, entity, return_type): e = entity while entity['source_samples'] is None: - parent_uuids = self.call_entity_api(entity_id=e['uuid'], endpoint_base='parents', endpoint_suffix=None, url_property='uuid') + parent_uuids = self.call_entity_api(entity_id=e['uuid'] + , endpoint_base='parents' + , endpoint_suffix=None + , url_property='uuid') parents = [] for parent_uuid in parent_uuids: parent_entity_doc = self.call_entity_api(entity_id = parent_uuid - , endpoint_base='documents') + , endpoint_base='documents') parents.append(parent_entity_doc) try: diff --git a/src/instance/search-config.yaml.example b/src/instance/search-config.yaml.example index 21fcc470..5117efc6 100644 --- a/src/instance/search-config.yaml.example +++ b/src/instance/search-config.yaml.example @@ -6,8 +6,10 @@ ingest_api_soft_assay_url: https://ingest-api.dev.hubmapconsortium.org/assaytype/ # default index name for endpoints that don't specify an index default_index: entities + +# Commented out by Zhou to avoid 409 conflicts - 7/20/2024 # ElasticSearch retry_on_conflict value. Positive integer or zero. -es_retry_on_conflict_param_value: 5 +# es_retry_on_conflict_param_value: 5 # specify multiple indices indices: diff --git a/src/search-adaptor b/src/search-adaptor index 1bc75073..ab6ead92 160000 --- a/src/search-adaptor +++ b/src/search-adaptor @@ -1 +1 @@ -Subproject commit 1bc75073e0cf90425d67d9e8e7a331ec170ea06b +Subproject commit ab6ead92f7b2bb6ab42ea6a5f1b2436b5f277d00