Skip to content

Commit

Permalink
Merge pull request #714 from hubmapconsortium/karlburke/SearchAPIRein…
Browse files Browse the repository at this point in the history
…dexCollectionSupport

Enable re-index of Collections during POST and PUT calls, in this ser…
  • Loading branch information
yuanzhou authored Oct 20, 2023
2 parents cc115ff + 9193958 commit af7e38b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 31 deletions.
71 changes: 42 additions & 29 deletions src/hubmap_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def translate_all(self):

donor_uuids_list = get_uuids_by_entity_type("donor", self.request_headers, self.DEFAULT_ENTITY_API_URL)
upload_uuids_list = get_uuids_by_entity_type("upload", self.request_headers, self.DEFAULT_ENTITY_API_URL)
public_collection_uuids_list = get_uuids_by_entity_type("collection", self.request_headers, self.DEFAULT_ENTITY_API_URL)
collection_uuids_list = get_uuids_by_entity_type("collection", self.request_headers, self.DEFAULT_ENTITY_API_URL)

# Only need this comparision for the live /rindex-all PUT call
if not self.skip_comparision:
Expand All @@ -114,7 +114,7 @@ def translate_all(self):
dataset_uuids_list = get_uuids_by_entity_type("dataset", self.request_headers, self.DEFAULT_ENTITY_API_URL)

# Merge into a big list that with no duplicates
all_entities_uuids = set(donor_uuids_list + sample_uuids_list + dataset_uuids_list + upload_uuids_list + public_collection_uuids_list)
all_entities_uuids = set(donor_uuids_list + sample_uuids_list + dataset_uuids_list + upload_uuids_list + collection_uuids_list)

es_uuids = []
index_names = get_all_reindex_enabled_indice_names(self.INDICES)
Expand Down Expand Up @@ -142,11 +142,11 @@ def translate_all(self):
logger.info(f"The number of worker threads being used by default: {executor._max_workers}")

# Submit tasks to the thread pool
public_collection_futures_list = [executor.submit(self.translate_public_collection, uuid, reindex=True) for uuid in public_collection_uuids_list]
collection_futures_list = [executor.submit(self.translate_collection, uuid, reindex=True) for uuid in collection_uuids_list]
upload_futures_list = [executor.submit(self.translate_upload, uuid, reindex=True) for uuid in upload_uuids_list]

# Append the above lists into one
futures_list = public_collection_futures_list + upload_futures_list
futures_list = collection_futures_list + upload_futures_list

# The target function runs the task logs more details when f.result() gets executed
for f in concurrent.futures.as_completed(futures_list):
Expand All @@ -171,7 +171,7 @@ def translate_all_collections(self):
logger.info("Start executing translate_all_collections()")

start = time.time()
public_collection_uuids_list = get_uuids_by_entity_type("collection", self.request_headers, self.DEFAULT_ENTITY_API_URL)
collection_uuids_list = get_uuids_by_entity_type("collection", self.request_headers, self.DEFAULT_ENTITY_API_URL)

with concurrent.futures.ThreadPoolExecutor() as executor:
# The default number of threads in the ThreadPoolExecutor is calculated as:
Expand All @@ -180,10 +180,10 @@ def translate_all_collections(self):
logger.info(f"The number of worker threads being used by default: {executor._max_workers}")

# Submit tasks to the thread pool
public_collection_futures_list = [executor.submit(self.translate_public_collection, uuid, reindex=True) for uuid in public_collection_uuids_list]
collection_futures_list = [executor.submit(self.translate_collection, uuid, reindex=True) for uuid in collection_uuids_list]

# The target function runs the task logs more details when f.result() gets executed
for f in concurrent.futures.as_completed(public_collection_uuids_list):
for f in concurrent.futures.as_completed(collection_uuids_list):
result = f.result()

end = time.time()
Expand Down Expand Up @@ -237,7 +237,15 @@ def translate(self, entity_id):
logger.info(f"Start executing translate() on {entity['entity_type']} of uuid: {entity_id}")

if entity['entity_type'] == 'Collection':
self.translate_public_collection(entity_id, reindex=True)
# Expect entity-api to stop update of Collections which should not be modified e.g. those which
# have a DOI. But entity-api may still request such Collections be indexed, particularly right
# after the Collection becomes visible to the public.
try:
self.translate_collection( entity_id
,reindex=True)
except Exception as e:
logger.error(f"Unable to index Collection due to e={str(e)}")

elif entity['entity_type'] == 'Upload':
self.translate_upload(entity_id, reindex=True)
else:
Expand Down Expand Up @@ -274,7 +282,7 @@ def update(self, entity_id, document, index=None, scope=None):
if index is not None and index == 'files':
# The "else clause" is the dominion of the original flavor of OpenSearch indices, for which search-api
# was created. This clause is specific to 'files' indices, by virtue of the conditions and the
# following assumption that dataset_uuid is on the JSON body. @TODO-KBKBKB right?
# following assumption that dataset_uuid is on the JSON body.
scope_list = self.__get_scope_list(entity_id, document, index, scope)

response = ''
Expand Down Expand Up @@ -302,7 +310,7 @@ def add(self, entity_id, document, index=None, scope=None):
if index is not None and index == 'files':
# The "else clause" is the dominion of the original flavor of OpenSearch indices, for which search-api
# was created. This clause is specific to 'files' indices, by virtue of the conditions and the
# following assumption that dataset_uuid is on the JSON body. @TODO-KBKBKB right?
# following assumption that dataset_uuid is on the JSON body.
scope_list = self.__get_scope_list(entity_id, document, index, scope)

response = ''
Expand All @@ -326,8 +334,8 @@ def add(self, entity_id, document, index=None, scope=None):
response += self.indexer.index(entity_id, json.dumps(document), private_index, False)
return response

# Collection doesn't actually have this `data_access_level` property
# This method is only applied to Donor/Sample/Dataset/File
# This method is only applied to Collection/Donor/Sample/Dataset/File
# Collection uses entity-api's logic for "visibility" to determine if a Collection is public or nonpublic
# For File, if the Dataset of the dataset_uuid element has status=='Published', it may go in a public index
# For Dataset, if status=='Published', it goes into the public index
# For Donor/Sample, `data`if any dataset down in the tree is 'Published', they should have `data_access_level` as public,
Expand All @@ -349,6 +357,12 @@ def is_public(self, document):
else:
# Log as an error to be fixed in Neo4j
logger.error(f"{document['entity_type']} of uuid: {document['uuid']} missing 'status' property, treat as not public, verify and set the status.")
elif document['entity_type'] in ['Collection']:
# If this Collection meets entity-api's criteria for visibility to the world by
# returning the value of its schema_constants.py DataVisibilityEnum.PUBLIC,
# the Collection can be in the public index and retrieved by users who are not logged in.
entity_visibility = self.call_entity_api(document['uuid'], 'visibility')
is_public = (entity_visibility == "public")
else:
# In case 'data_access_level' not set
if 'data_access_level' in document:
Expand All @@ -357,7 +371,6 @@ def is_public(self, document):
else:
# Log as an error to be fixed in Neo4j
logger.error(f"{document['entity_type']} of uuid: {document['uuid']} missing 'data_access_level' property, treat as not public, verify and set the data_access_level.")

return is_public

def delete_docs(self, index, scope, entity_id):
Expand Down Expand Up @@ -464,16 +477,15 @@ def translate_upload(self, entity_id, reindex=False):
except Exception as e:
logger.error(e)


def translate_public_collection(self, entity_id, reindex=False):
logger.info(f"Start executing translate_public_collection() for {entity_id}")
def translate_collection(self, entity_id, reindex=False):
logger.info(f"Start executing translate_collection() for {entity_id}")

# The entity-api returns public collection with a list of connected public/published datasets, for either
# - a valid token but not in HuBMAP-Read group or
# - no token at all
# Here we do NOT send over the token
try:
collection = self.get_public_collection(entity_id)
collection = self.get_collection(entity_id=entity_id)

self.add_datasets_to_entity(collection)
self.entity_keys_rename(collection)
Expand All @@ -495,18 +507,21 @@ def translate_public_collection(self, entity_id, reindex=False):
else:
json_data = json.dumps(collection)

self.call_indexer(collection, reindex, json_data, public_index)
# If this Collection meets entity-api's criteria for visibility to the world by
# returning the value of its schema_constants.py DataVisibilityEnum.PUBLIC, put
# the Collection in the public index.
if self.is_public(collection):
self.call_indexer(collection, reindex, json_data, public_index)
self.call_indexer(collection, reindex, json_data, private_index)

logger.info(f"Finished executing translate_public_collection() for {entity_id}")
logger.info(f"Finished executing translate_collection() for {entity_id}")
except requests.exceptions.RequestException as e:
logger.exception(e)
# Log the error and will need fix later and reindex, rather than sys.exit()
logger.error(f"translate_public_collection() failed to get public collection of uuid: {entity_id} via entity-api")
logger.error(f"translate_collection() failed to get collection of uuid: {entity_id} via entity-api")
except Exception as e:
logger.error(e)


def translate_donor_tree(self, entity_id):
try:
logger.info(f"Start executing translate_donor_tree() for donor of uuid: {entity_id}")
Expand Down Expand Up @@ -536,7 +551,6 @@ def index_entity(self, uuid):

logger.info(f"Finished executing index_entity() on uuid: {uuid}")


# Used by individual PUT /reindex/<id> call
def reindex_entity(self, uuid):
logger.info(f"Start executing reindex_entity() on uuid: {uuid}")
Expand All @@ -546,7 +560,6 @@ def reindex_entity(self, uuid):

logger.info(f"Finished executing reindex_entity() on uuid: {uuid}")


def init_transformers(self):
logger.info("Start executing init_transformers()")

Expand Down Expand Up @@ -1071,8 +1084,8 @@ def call_entity_api(self, entity_id, endpoint, url_property = None):
return self.prepare_dataset(response.json())


def get_public_collection(self, entity_id):
logger.info(f"Start executing get_public_collection() on uuid: {entity_id}")
def get_collection(self, entity_id):
logger.info(f"Start executing get_collection() on uuid: {entity_id}")

# The entity-api returns public collection with a list of connected public/published datasets, for either
# - a valid token but not in HuBMAP-Read group or
Expand All @@ -1082,15 +1095,15 @@ def get_public_collection(self, entity_id):
response = requests.get(url, headers=self.request_headers, verify=False)

if response.status_code != 200:
msg = f"get_public_collection() failed to get entity of uuid {entity_id} via entity-api"
msg = f"get_collection() failed to get entity of uuid {entity_id} via entity-api"

# Log the full stack trace, prepend a line with our message
logger.exception(msg)

logger.debug("======get_public_collection() status code from entity-api======")
logger.debug("======get_collection() status code from entity-api======")
logger.debug(response.status_code)

logger.debug("======get_public_collection() response text from entity-api======")
logger.debug("======get_collection() response text from entity-api======")
logger.debug(response.text)

# Bubble up the error message from entity-api instead of sys.exit(msg)
Expand All @@ -1100,7 +1113,7 @@ def get_public_collection(self, entity_id):

collection_dict = response.json()

logger.info(f"Finished executing get_public_collection() on uuid: {entity_id}")
logger.info(f"Finished executing get_collection() on uuid: {entity_id}")

return collection_dict

Expand Down
2 changes: 1 addition & 1 deletion src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ git+https://github.com/hubmapconsortium/[email protected]#egg=port
# Use the branch name of commons from github for testing new changes made in commons from different branch
# Default is main branch specified in search-api's docker-compose.development.yml if not set
# git+https://github.com/hubmapconsortium/commons.git@${COMMONS_BRANCH}#egg=hubmap-commons
hubmap-commons==2.1.11
hubmap-commons==2.1.12

# The use of `-r` lets us specify the transitive requirements in one place
-r search-adaptor/src/requirements.txt
Expand Down
2 changes: 1 addition & 1 deletion src/search-adaptor

0 comments on commit af7e38b

Please sign in to comment.